AI
一个案例
tokenizer 说明
加载数据集
DatasetDict 类
logits 说明
langchain(一)
关键词提取
RAG与向量检索
RAG与向量检索(二)
TF-IDF 算法
本文档使用 MrDoc 发布
-
+
首页
langchain(一)
### 基础案例 ``` from langchain_core.output_parsers import StrOutputParser from langchain_core.prompts import PromptTemplate from langchain_openai import OpenAI prompt_template = "Tell me a {adjective} joke" prompt = PromptTemplate( input_variables=["adjective"], template=prompt_template, ) llm = OpenAI() chain = prompt | llm | StrOutputParser() chain.invoke("your adjective here") ``` ### 管道运算 之所以能使用管道运算符,是因为这些对象都继承自`Runnable`类,而这个类实现了`__or__`方法。 ``` class Runnable(Generic[Input, Output], ABC): """A unit of work that can be invoked, batched, streamed, transformed and composed. Key Methods =========== - **invoke/ainvoke**: Transforms a single input into an output. - **batch/abatch**: Efficiently transforms multiple inputs into outputs. - **stream/astream**: Streams output from a single input as it's produced. - **astream_log**: Streams output and selected intermediate results from an input. Built-in optimizations: - **Batch**: By default, batch runs invoke() in parallel using a thread pool executor. Override to optimize batching. - **Async**: Methods with "a" suffix are asynchronous. By default, they execute the sync counterpart using asyncio's thread pool. Override for native async. All methods accept an optional config argument, which can be used to configure execution, add tags and metadata for tracing and debugging etc. Runnables expose schematic information about their input, output and config via the input_schema property, the output_schema property and config_schema method. LCEL and Composition ==================== The LangChain Expression Language (LCEL) is a declarative way to compose Runnables into chains. Any chain constructed this way will automatically have sync, async, batch, and streaming support. The main composition primitives are RunnableSequence and RunnableParallel. **RunnableSequence** invokes a series of runnables sequentially, with one Runnable's output serving as the next's input. Construct using the `|` operator or by passing a list of runnables to RunnableSequence. **RunnableParallel** invokes runnables concurrently, providing the same input to each. Construct it using a dict literal within a sequence or by passing a dict to RunnableParallel. For example, .. code-block:: python from langchain_core.runnables import RunnableLambda # A RunnableSequence constructed using the `|` operator sequence = RunnableLambda(lambda x: x + 1) | RunnableLambda(lambda x: x * 2) sequence.invoke(1) # 4 sequence.batch([1, 2, 3]) # [4, 6, 8] # A sequence that contains a RunnableParallel constructed using a dict literal sequence = RunnableLambda(lambda x: x + 1) | { 'mul_2': RunnableLambda(lambda x: x * 2), 'mul_5': RunnableLambda(lambda x: x * 5) } sequence.invoke(1) # {'mul_2': 4, 'mul_5': 10} Standard Methods ================ All Runnables expose additional methods that can be used to modify their behavior (e.g., add a retry policy, add lifecycle listeners, make them configurable, etc.). These methods will work on any Runnable, including Runnable chains constructed by composing other Runnables. See the individual methods for details. For example, .. code-block:: python from langchain_core.runnables import RunnableLambda import random def add_one(x: int) -> int: return x + 1 def buggy_double(y: int) -> int: '''Buggy code that will fail 70% of the time''' if random.random() > 0.3: print('This code failed, and will probably be retried!') # noqa: T201 raise ValueError('Triggered buggy code') return y * 2 sequence = ( RunnableLambda(add_one) | RunnableLambda(buggy_double).with_retry( # Retry on failure stop_after_attempt=10, wait_exponential_jitter=False ) ) print(sequence.input_schema.schema()) # Show inferred input schema print(sequence.output_schema.schema()) # Show inferred output schema print(sequence.invoke(2)) # invoke the sequence (note the retry above!!) Debugging and tracing ===================== As the chains get longer, it can be useful to be able to see intermediate results to debug and trace the chain. You can set the global debug flag to True to enable debug output for all chains: .. code-block:: python from langchain_core.globals import set_debug set_debug(True) Alternatively, you can pass existing or custom callbacks to any given chain: .. code-block:: python from langchain_core.tracers import ConsoleCallbackHandler chain.invoke( ..., config={'callbacks': [ConsoleCallbackHandler()]} ) For a UI (and much more) checkout LangSmith: https://docs.smith.langchain.com/ """ # noqa: E501 ``` `管道运算` 返回的结果是 `RunnableSequence`,这个类也实现了`__or__`方法,用于接收更多的步骤(每个步骤是一个`Runnable`对象)。这个类还有一个`invoke`方法,调用后开始执行整个流程。 ``` class RunnableSequence(RunnableSerializable[Input, Output]): """Sequence of Runnables, where the output of each is the input of the next. **RunnableSequence** is the most important composition operator in LangChain as it is used in virtually every chain. A RunnableSequence can be instantiated(实例化) directly or more commonly by using the `|` operator where either the left or right operands (or both) must be a Runnable. Any RunnableSequence automatically supports sync, async, batch. The default implementations of `batch` and `abatch` utilize threadpools and asyncio gather and will be faster than naive invocation of invoke or ainvoke for IO bound Runnables. Batching is implemented by invoking the batch method on each component of the RunnableSequence in order. A RunnableSequence preserves the streaming properties of its components, so if all components of the sequence implement a `transform` method -- which is the method that implements the logic to map a streaming input to a streaming output -- then the sequence will be able to stream input to output! If any component of the sequence does not implement transform then the streaming will only begin after this component is run. If there are multiple blocking components, streaming begins after the last one. Please note: RunnableLambdas do not support `transform` by default! So if you need to use a RunnableLambdas be careful about where you place them in a RunnableSequence (if you need to use the .stream()/.astream() methods). If you need arbitrary logic and need streaming, you can subclass Runnable, and implement `transform` for whatever logic you need. Here is a simple example that uses simple functions to illustrate the use of RunnableSequence: .. code-block:: python from langchain_core.runnables import RunnableLambda def add_one(x: int) -> int: return x + 1 def mul_two(x: int) -> int: return x * 2 runnable_1 = RunnableLambda(add_one) runnable_2 = RunnableLambda(mul_two) sequence = runnable_1 | runnable_2 # Or equivalently: # sequence = RunnableSequence(first=runnable_1, last=runnable_2) sequence.invoke(1) await sequence.ainvoke(1) sequence.batch([1, 2, 3]) await sequence.abatch([1, 2, 3]) Here's an example that uses streams JSON output generated by an LLM: .. code-block:: python from langchain_core.output_parsers.json import SimpleJsonOutputParser from langchain_openai import ChatOpenAI prompt = PromptTemplate.from_template( 'In JSON format, give me a list of {topic} and their ' 'corresponding names in French, Spanish and in a ' 'Cat Language.' ) model = ChatOpenAI() chain = prompt | model | SimpleJsonOutputParser() async for chunk in chain.astream({'topic': 'colors'}): print('-') # noqa: T201 print(chunk, sep='', flush=True) # noqa: T201 """ ```
gaojian
2024年8月27日 13:32
分享文档
收藏文档
上一篇
下一篇
微信扫一扫
复制链接
手机扫一扫进行分享
复制链接
关于 MrDoc
觅思文档MrDoc
是
州的先生
开发并开源的在线文档系统,其适合作为个人和小型团队的云笔记、文档和知识库管理工具。
如果觅思文档给你或你的团队带来了帮助,欢迎对作者进行一些打赏捐助,这将有力支持作者持续投入精力更新和维护觅思文档,感谢你的捐助!
>>>捐助鸣谢列表
微信
支付宝
QQ
PayPal
Markdown文件
分享
链接
类型
密码
更新密码