RunnableGenerator#
- class langchain_core.runnables.base.RunnableGenerator(transform: Callable[[Iterator[Input]], Iterator[Output]] | Callable[[AsyncIterator[Input]], AsyncIterator[Output]], atransform: Callable[[AsyncIterator[Input]], AsyncIterator[Output]] | None = None, *, name: str | None = None)[source]#
Runnable that runs a generator function.
RunnableGenerators can be instantiated directly or by using a generator within a sequence.
RunnableGenerators can be used to implement custom behavior, such as custom output parsers, while preserving streaming capabilities. Given a generator function with a signature Iterator[A] -> Iterator[B], wrapping it in a RunnableGenerator allows it to emit output chunks as soon as they are streamed in from the previous step.
Note that if a generator function has a signature A -> Iterator[B], such that it requires its input from the previous step to be completed before emitting chunks (e.g., most LLMs need the entire prompt available to start generating), it can instead be wrapped in a RunnableLambda.
Here is an example to show the basic mechanics of a RunnableGenerator:
from typing import Any, AsyncIterator, Iterator from langchain_core.runnables import RunnableGenerator def gen(input: Iterator[Any]) -> Iterator[str]: for token in ["Have", " a", " nice", " day"]: yield token runnable = RunnableGenerator(gen) runnable.invoke(None) # "Have a nice day" list(runnable.stream(None)) # ["Have", " a", " nice", " day"] runnable.batch([None, None]) # ["Have a nice day", "Have a nice day"] # Async version: async def agen(input: AsyncIterator[Any]) -> AsyncIterator[str]: for token in ["Have", " a", " nice", " day"]: yield token runnable = RunnableGenerator(agen) await runnable.ainvoke(None) # "Have a nice day" [p async for p in runnable.astream(None)] # ["Have", " a", " nice", " day"]
RunnableGenerator makes it easy to implement custom behavior within a streaming context. Below we show an example:
from langchain_core.prompts import ChatPromptTemplate from langchain_core.runnables import RunnableGenerator, RunnableLambda from langchain_openai import ChatOpenAI from langchain_core.output_parsers import StrOutputParser model = ChatOpenAI() chant_chain = ( ChatPromptTemplate.from_template("Give me a 3 word chant about {topic}") | model | StrOutputParser() ) def character_generator(input: Iterator[str]) -> Iterator[str]: for token in input: if "," in token or "." in token: yield "π" + token else: yield token runnable = chant_chain | character_generator assert type(runnable.last) is RunnableGenerator "".join(runnable.stream({"topic": "waste"})) # Reduceπ, Reuseπ, Recycleπ. # Note that RunnableLambda can be used to delay streaming of one step in a # sequence until the previous step is finished: def reverse_generator(input: str) -> Iterator[str]: # Yield characters of input in reverse order. for character in input[::-1]: yield character runnable = chant_chain | RunnableLambda(reverse_generator) "".join(runnable.stream({"topic": "waste"})) # ".elcycer ,esuer ,ecudeR"
Initialize a RunnableGenerator.
- Parameters:
transform (Union[Callable[[Iterator[Input]], Iterator[Output]], Callable[[AsyncIterator[Input]], AsyncIterator[Output]]]) β The transform function.
atransform (Optional[Callable[[AsyncIterator[Input]], AsyncIterator[Output]]]) β The async transform function. Defaults to None.
name (str | None)
- Raises:
TypeError β If the transform is not a generator function.
Note
RunnableGenerator implements the standard Runnable Interface
. π
The
Runnable Interface
has additional methods that are available on runnables, such aswith_types
,with_retry
,assign
,bind
,get_graph
, and more.
Attributes
Methods
|
Initialize self. |
Examples using RunnableGenerator