LangGraph runtime¶
LangGraph 运行时 ¶
Pregel implements LangGraph's runtime, managing the execution of LangGraph applications.
Pregel 实现 LangGraph 的运行时,管理 LangGraph 应用程序的执行。
Compiling a StateGraph or creating an entrypoint produces a Pregel instance that can be invoked with input.
编译 StateGraph 或创建入口点会生成一个 Pregel 实例,可以通过输入调用该实例。
This guide explains the runtime at a high level and provides instructions for directly implementing applications with Pregel.
本指南从高层次解释了运行时,并提供了使用 Pregel 直接实现应用程序的说明。
Note: The Pregel runtime is named after Google's Pregel algorithm, which describes an efficient method for large-scale parallel computation using graphs.
Pregel 运行时以 Google 的 Pregel 算法命名,该算法描述了一种使用图形进行大规模并行计算的有效方法。
Overview¶ 概述 ¶
In LangGraph, Pregel combines actors and channels into a single application. Actors read data from channels and write data to channels. Pregel organizes the execution of the application into multiple steps, following the Pregel Algorithm/Bulk Synchronous Parallel model.
在 LangGraph 中,Pregel 将演员和通道组合到一个应用程序中。Actor 从通道读取数据并将数据写入通道。Pregel 将应用程序的执行组织成多个步骤,遵循 Pregel 算法 / 批量同步并行模型。
Each step consists of three phases:
每个步骤包括三个阶段:
- Plan: Determine which actors to execute in this step. For example, in the first step, select the actors that subscribe to the special input channels; in subsequent steps, select the actors that subscribe to channels updated in the previous step.
计划 :确定在此步骤中执行哪些参与者 。例如,在第一步中,选择订阅特殊输入通道的参与者 ;在后续步骤中,选择订阅在上一步中更新的通道的参与者 。 - Execution: Execute all selected actors in parallel, until all complete, or one fails, or a timeout is reached. During this phase, channel updates are invisible to actors until the next step.
执行 :并行执行所有选定的参与者 ,直到所有完成,或一个失败,或超时。在此阶段,通道更新对参与者不可见,直到下一步。 - Update: Update the channels with the values written by the actors in this step.
更新 :使用参与者在此步骤中编写的值更新通道。
Repeat until no actors are selected for execution, or a maximum number of steps is reached.
重复此操作,直到没有选择执行元,或者达到最大步骤数。
Actors¶ 演员 ¶
An actor is a PregelNode. It subscribes to channels, reads data from them, and writes data to them. It can be thought of as an actor in the Pregel algorithm. PregelNodes implement LangChain's Runnable interface.
演员是一个 PregelNode。它订阅通道,从通道读取数据,并向通道写入数据。它可以被认为是 Pregel 算法中的演员 。PregelNodes 实现了 LangChain 的 Runnable 接口。
Channels¶ 通道 ¶
Channels are used to communicate between actors (PregelNodes). Each channel has a value type, an update type, and an update function – which takes a sequence of updates and modifies the stored value.
通道用于参与者之间的通信(PregelNode)。每个通道都有一个值类型、一个更新类型和一个更新函数,更新函数接受一系列更新并修改存储的值。
Channels can be used to send data from one chain to another, or to send data from a chain to itself in a future step. LangGraph provides a number of built-in channels:
通道可用于将数据从一个链发送到另一个链,或者在未来的步骤中将数据从一个链发送到其本身。LangGraph 提供了许多内置通道:
- LastValue: The default channel, stores the last value sent to the channel, useful for input and output values, or for sending data from one step to the next.
LastValue:默认通道,存储发送到通道的最后一个值,用于输入和输出值,或用于将数据从一个步骤发送到下一个步骤。 - Topic: A configurable PubSub Topic, useful for sending multiple values between actors, or for accumulating output. Can be configured to deduplicate values or to accumulate values over the course of multiple steps.
Topic:一个可配置的 PubSub Topic,用于在参与者之间发送多个值,或累积输出。可以配置为消除重复值或在多个步骤的过程中累积值。 - BinaryOperatorAggregate: stores a persistent value, updated by applying a binary operator to the current value and each update sent to the channel, useful for computing aggregates over multiple steps; e.g.,
total = BinaryOperatorAggregate(int, operator.add)
BinaryOperatorAggregate:存储持久值,通过将二元运算符应用于当前值和发送到通道的每个更新来更新,用于计算多个步骤的聚合;例如,total = BinaryOperatorAggregate(int, operator.add)
Examples¶ 示例 ¶
While most users will interact with Pregel through the StateGraph API or
the entrypoint decorator, it is possible to interact with Pregel directly.
虽然大多数用户将通过 StateGraphAPI 或入口点装饰器与 Pregel 交互,但也可以直接与 Pregel 交互。
Below are a few different examples to give you a sense of the Pregel API.
下面是几个不同的例子,让您对 Pregel API 有一个了解。
from langgraph.channels import EphemeralValue
from langgraph.pregel import Pregel, NodeBuilder
node1 = (
NodeBuilder().subscribe_only("a")
.do(lambda x: x + x)
.write_to("b")
)
app = Pregel(
nodes={"node1": node1},
channels={
"a": EphemeralValue(str),
"b": EphemeralValue(str),
},
input_channels=["a"],
output_channels=["b"],
)
app.invoke({"a": "foo"})
from langgraph.channels import LastValue, EphemeralValue
from langgraph.pregel import Pregel, NodeBuilder
node1 = (
NodeBuilder().subscribe_only("a")
.do(lambda x: x + x)
.write_to("b")
)
node2 = (
NodeBuilder().subscribe_only("b")
.do(lambda x: x + x)
.write_to("c")
)
app = Pregel(
nodes={"node1": node1, "node2": node2},
channels={
"a": EphemeralValue(str),
"b": LastValue(str),
"c": EphemeralValue(str),
},
input_channels=["a"],
output_channels=["b", "c"],
)
app.invoke({"a": "foo"})
from langgraph.channels import EphemeralValue, Topic
from langgraph.pregel import Pregel, NodeBuilder
node1 = (
NodeBuilder().subscribe_only("a")
.do(lambda x: x + x)
.write_to("b", "c")
)
node2 = (
NodeBuilder().subscribe_to("b")
.do(lambda x: x["b"] + x["b"])
.write_to("c")
)
app = Pregel(
nodes={"node1": node1, "node2": node2},
channels={
"a": EphemeralValue(str),
"b": EphemeralValue(str),
"c": Topic(str, accumulate=True),
},
input_channels=["a"],
output_channels=["c"],
)
app.invoke({"a": "foo"})
This examples demonstrates how to use the BinaryOperatorAggregate channel to implement a reducer.
from langgraph.channels import EphemeralValue, BinaryOperatorAggregate
from langgraph.pregel import Pregel, NodeBuilder
node1 = (
NodeBuilder().subscribe_only("a")
.do(lambda x: x + x)
.write_to("b", "c")
)
node2 = (
NodeBuilder().subscribe_only("b")
.do(lambda x: x + x)
.write_to("c")
)
def reducer(current, update):
if current:
return current + " | " + update
else:
return update
app = Pregel(
nodes={"node1": node1, "node2": node2},
channels={
"a": EphemeralValue(str),
"b": EphemeralValue(str),
"c": BinaryOperatorAggregate(str, operator=reducer),
},
input_channels=["a"],
output_channels=["c"],
)
app.invoke({"a": "foo"})
This example demonstrates how to introduce a cycle in the graph, by having a chain write to a channel it subscribes to. Execution will continue until a None value is written to the channel.
from langgraph.channels import EphemeralValue
from langgraph.pregel import Pregel, NodeBuilder, ChannelWriteEntry
example_node = (
NodeBuilder().subscribe_only("value")
.do(lambda x: x + x if len(x) < 10 else None)
.write_to(ChannelWriteEntry("value", skip_none=True))
)
app = Pregel(
nodes={"example_node": example_node},
channels={
"value": EphemeralValue(str),
},
input_channels=["value"],
output_channels=["value"],
)
app.invoke({"value": "a"})
High-level API¶
高级 API¶
LangGraph provides two high-level APIs for creating a Pregel application: the StateGraph (Graph API) and the Functional API.
LangGraph 为创建 Pregel 应用程序提供了两个高级 API:StateGraph(Graph API) 和 Functional API。
The StateGraph (Graph API) is a higher-level abstraction that simplifies the creation of Pregel applications. It allows you to define a graph of nodes and edges. When you compile the graph, the StateGraph API automatically creates the Pregel application for you.
StateGraph(Graph API) 是一个高级抽象,它简化了 Pregel 应用程序的创建。它允许您定义节点和边的图形。编译图形时,StateGraph API 会自动为您创建 Pregel 应用程序。
from typing import TypedDict, Optional
from langgraph.constants import START
from langgraph.graph import StateGraph
class Essay(TypedDict):
topic: str
content: Optional[str]
score: Optional[float]
def write_essay(essay: Essay):
return {
"content": f"Essay about {essay['topic']}",
}
def score_essay(essay: Essay):
return {
"score": 10
}
builder = StateGraph(Essay)
builder.add_node(write_essay)
builder.add_node(score_essay)
builder.add_edge(START, "write_essay")
# Compile the graph.
# This will return a Pregel instance.
graph = builder.compile()
The compiled Pregel instance will be associated with a list of nodes and channels. You can inspect the nodes and channels by printing them.
编译后的 Pregel 实例将与节点和通道列表相关联。您可以通过打印节点和通道来检查它们。
You will see something like this:
你会看到这样的东西:
{'__start__': <langgraph.pregel.read.PregelNode at 0x7d05e3ba1810>,
'write_essay': <langgraph.pregel.read.PregelNode at 0x7d05e3ba14d0>,
'score_essay': <langgraph.pregel.read.PregelNode at 0x7d05e3ba1710>}
You should see something like this
你应该看看这个
{'topic': <langgraph.channels.last_value.LastValue at 0x7d05e3294d80>,
'content': <langgraph.channels.last_value.LastValue at 0x7d05e3295040>,
'score': <langgraph.channels.last_value.LastValue at 0x7d05e3295980>,
'__start__': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e3297e00>,
'write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e32960c0>,
'score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8ab80>,
'branch:__start__:__self__:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e32941c0>,
'branch:__start__:__self__:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d88800>,
'branch:write_essay:__self__:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e3295ec0>,
'branch:write_essay:__self__:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8ac00>,
'branch:score_essay:__self__:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d89700>,
'branch:score_essay:__self__:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8b400>,
'start:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8b280>}
In the Functional API, you can use an entrypoint to create
a Pregel application. The entrypoint decorator allows you to define a function that takes input and returns output.
from typing import TypedDict, Optional
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.func import entrypoint
class Essay(TypedDict):
topic: str
content: Optional[str]
score: Optional[float]
checkpointer = InMemorySaver()
@entrypoint(checkpointer=checkpointer)
def write_essay(essay: Essay):
return {
"content": f"Essay about {essay['topic']}",
}
print("Nodes: ")
print(write_essay.nodes)
print("Channels: ")
print(write_essay.channels)
Nodes:
{'write_essay': <langgraph.pregel.read.PregelNode object at 0x7d05e2f9aad0>}
Channels:
{'__start__': <langgraph.channels.ephemeral_value.EphemeralValue object at 0x7d05e2c906c0>, '__end__': <langgraph.channels.last_value.LastValue object at 0x7d05e2c90c40>, '__previous__': <langgraph.channels.last_value.LastValue object at 0x7d05e1007280>}