Source code for langchain_core.runnables.passthrough
"""Implementation of the RunnablePassthrough."""from__future__importannotationsimportasyncioimportinspectimportthreadingfromcollections.abcimportAwaitablefromtypingimport(TYPE_CHECKING,Any,Callable,Optional,Union,cast,)frompydanticimportBaseModel,RootModelfromtyping_extensionsimportoverridefromlangchain_core.runnables.baseimport(Other,Runnable,RunnableParallel,RunnableSerializable,)fromlangchain_core.runnables.configimport(RunnableConfig,acall_func_with_variable_args,call_func_with_variable_args,ensure_config,get_executor_for_config,patch_config,)fromlangchain_core.runnables.utilsimport(AddableDict,ConfigurableFieldSpec,)fromlangchain_core.utils.aiterimportatee,py_anextfromlangchain_core.utils.iterimportsafeteefromlangchain_core.utils.pydanticimportcreate_model_v2ifTYPE_CHECKING:fromcollections.abcimportAsyncIterator,Iterator,Mappingfromlangchain_core.callbacks.managerimport(AsyncCallbackManagerForChainRun,CallbackManagerForChainRun,)fromlangchain_core.runnables.graphimportGraph
[docs]defidentity(x:Other)->Other:"""Identity function. Args: x (Other): input. Returns: Other: output. """returnx
[docs]classRunnablePassthrough(RunnableSerializable[Other,Other]):"""Runnable to passthrough inputs unchanged or with additional keys. This Runnable behaves almost like the identity function, except that it can be configured to add additional keys to the output, if the input is a dict. The examples below demonstrate this Runnable works using a few simple chains. The chains rely on simple lambdas to make the examples easy to execute and experiment with. Parameters: func (Callable[[Other], None], optional): Function to be called with the input. afunc (Callable[[Other], Awaitable[None]], optional): Async function to be called with the input. input_type (Optional[Type[Other]], optional): Type of the input. **kwargs (Any): Additional keyword arguments. Examples: .. code-block:: python from langchain_core.runnables import ( RunnableLambda, RunnableParallel, RunnablePassthrough, ) runnable = RunnableParallel( origin=RunnablePassthrough(), modified=lambda x: x+1 ) runnable.invoke(1) # {'origin': 1, 'modified': 2} def fake_llm(prompt: str) -> str: # Fake LLM for the example return "completion" chain = RunnableLambda(fake_llm) | { 'original': RunnablePassthrough(), # Original LLM output 'parsed': lambda text: text[::-1] # Parsing logic } chain.invoke('hello') # {'original': 'completion', 'parsed': 'noitelpmoc'} In some cases, it may be useful to pass the input through while adding some keys to the output. In this case, you can use the `assign` method: .. code-block:: python from langchain_core.runnables import RunnablePassthrough def fake_llm(prompt: str) -> str: # Fake LLM for the example return "completion" runnable = { 'llm1': fake_llm, 'llm2': fake_llm, } | RunnablePassthrough.assign( total_chars=lambda inputs: len(inputs['llm1'] + inputs['llm2']) ) runnable.invoke('hello') # {'llm1': 'completion', 'llm2': 'completion', 'total_chars': 20} """input_type:Optional[type[Other]]=Nonefunc:Optional[Union[Callable[[Other],None],Callable[[Other,RunnableConfig],None]]]=Noneafunc:Optional[Union[Callable[[Other],Awaitable[None]],Callable[[Other,RunnableConfig],Awaitable[None]],]]=Nonedef__repr_args__(self)->Any:# Without this repr(self) raises a RecursionError# See https://github.com/pydantic/pydantic/issues/7327return[]def__init__(self,func:Optional[Union[Union[Callable[[Other],None],Callable[[Other,RunnableConfig],None]],Union[Callable[[Other],Awaitable[None]],Callable[[Other,RunnableConfig],Awaitable[None]],],]]=None,afunc:Optional[Union[Callable[[Other],Awaitable[None]],Callable[[Other,RunnableConfig],Awaitable[None]],]]=None,*,input_type:Optional[type[Other]]=None,**kwargs:Any,)->None:ifinspect.iscoroutinefunction(func):afunc=funcfunc=Nonesuper().__init__(func=func,afunc=afunc,input_type=input_type,**kwargs)# type: ignore[call-arg]@classmethoddefis_lc_serializable(cls)->bool:returnTrue@classmethoddefget_lc_namespace(cls)->list[str]:"""Get the namespace of the langchain object."""return["langchain","schema","runnable"]@property@overridedefInputType(self)->Any:returnself.input_typeorAny@property@overridedefOutputType(self)->Any:returnself.input_typeorAny@classmethoddefassign(cls,**kwargs:Union[Runnable[dict[str,Any],Any],Callable[[dict[str,Any]],Any],Mapping[str,Union[Runnable[dict[str,Any],Any],Callable[[dict[str,Any]],Any]],],],)->RunnableAssign:"""Merge the Dict input with the output produced by the mapping argument. Args: **kwargs: Runnable, Callable or a Mapping from keys to Runnables or Callables. Returns: A Runnable that merges the Dict input with the output produced by the mapping argument. """returnRunnableAssign(RunnableParallel[dict[str,Any]](kwargs))
deftransform(self,input:Iterator[Other],config:Optional[RunnableConfig]=None,**kwargs:Any,)->Iterator[Other]:ifself.funcisNone:forchunkinself._transform_stream_with_config(input,identity,config):yieldchunkelse:final:Othergot_first_chunk=Falseforchunkinself._transform_stream_with_config(input,identity,config):yieldchunkifnotgot_first_chunk:final=chunkgot_first_chunk=Trueelse:try:final=final+chunk# type: ignore[operator]exceptTypeError:final=chunkifgot_first_chunk:call_func_with_variable_args(self.func,final,ensure_config(config),**kwargs)asyncdefatransform(self,input:AsyncIterator[Other],config:Optional[RunnableConfig]=None,**kwargs:Any,)->AsyncIterator[Other]:ifself.afuncisNoneandself.funcisNone:asyncforchunkinself._atransform_stream_with_config(input,identity,config):yieldchunkelse:got_first_chunk=Falseasyncforchunkinself._atransform_stream_with_config(input,identity,config):yieldchunk# By definitions, a function will operate on the aggregated# input. So we'll aggregate the input until we get to the last# chunk.# If the input is not addable, then we'll assume that we can# only operate on the last chunk.ifnotgot_first_chunk:final=chunkgot_first_chunk=Trueelse:try:final=final+chunk# type: ignore[operator]exceptTypeError:final=chunkifgot_first_chunk:config=ensure_config(config)ifself.afuncisnotNone:awaitacall_func_with_variable_args(self.afunc,final,config,**kwargs)elifself.funcisnotNone:call_func_with_variable_args(self.func,final,config,**kwargs)
[docs]classRunnableAssign(RunnableSerializable[dict[str,Any],dict[str,Any]]):"""Runnable that assigns key-value pairs to Dict[str, Any] inputs. The `RunnableAssign` class takes input dictionaries and, through a `RunnableParallel` instance, applies transformations, then combines these with the original data, introducing new key-value pairs based on the mapper's logic. Parameters: mapper (RunnableParallel[Dict[str, Any]]): A `RunnableParallel` instance that will be used to transform the input dictionary. Examples: .. code-block:: python # This is a RunnableAssign from typing import Dict from langchain_core.runnables.passthrough import ( RunnableAssign, RunnableParallel, ) from langchain_core.runnables.base import RunnableLambda def add_ten(x: Dict[str, int]) -> Dict[str, int]: return {"added": x["input"] + 10} mapper = RunnableParallel( {"add_step": RunnableLambda(add_ten),} ) runnable_assign = RunnableAssign(mapper) # Synchronous example runnable_assign.invoke({"input": 5}) # returns {'input': 5, 'add_step': {'added': 15}} # Asynchronous example await runnable_assign.ainvoke({"input": 5}) # returns {'input': 5, 'add_step': {'added': 15}} """mapper:RunnableParalleldef__init__(self,mapper:RunnableParallel[dict[str,Any]],**kwargs:Any)->None:super().__init__(mapper=mapper,**kwargs)# type: ignore[call-arg]@classmethoddefis_lc_serializable(cls)->bool:returnTrue@classmethoddefget_lc_namespace(cls)->list[str]:"""Get the namespace of the langchain object."""return["langchain","schema","runnable"]defget_name(self,suffix:Optional[str]=None,*,name:Optional[str]=None)->str:name=(nameorself.nameorf"RunnableAssign<{','.join(self.mapper.steps__.keys())}>")returnsuper().get_name(suffix,name=name)defget_input_schema(self,config:Optional[RunnableConfig]=None)->type[BaseModel]:map_input_schema=self.mapper.get_input_schema(config)ifnotissubclass(map_input_schema,RootModel):# ie. it's a dictreturnmap_input_schemareturnsuper().get_input_schema(config)defget_output_schema(self,config:Optional[RunnableConfig]=None)->type[BaseModel]:map_input_schema=self.mapper.get_input_schema(config)map_output_schema=self.mapper.get_output_schema(config)ifnotissubclass(map_input_schema,RootModel)andnotissubclass(map_output_schema,RootModel):fields={}forname,field_infoinmap_input_schema.model_fields.items():fields[name]=(field_info.annotation,field_info.default)forname,field_infoinmap_output_schema.model_fields.items():fields[name]=(field_info.annotation,field_info.default)returncreate_model_v2(# type: ignore[call-overload]"RunnableAssignOutput",field_definitions=fields)elifnotissubclass(map_output_schema,RootModel):# ie. only map output is a dict# ie. input type is either unknown or inferred incorrectlyreturnmap_output_schemareturnsuper().get_output_schema(config)@propertydefconfig_specs(self)->list[ConfigurableFieldSpec]:returnself.mapper.config_specsdefget_graph(self,config:RunnableConfig|None=None)->Graph:# get graph from mappergraph=self.mapper.get_graph(config)# add passthrough node and edgesinput_node=graph.first_node()output_node=graph.last_node()ifinput_nodeisnotNoneandoutput_nodeisnotNone:passthrough_node=graph.add_node(_graph_passthrough)graph.add_edge(input_node,passthrough_node)graph.add_edge(passthrough_node,output_node)returngraphdef_invoke(self,input:dict[str,Any],run_manager:CallbackManagerForChainRun,config:RunnableConfig,**kwargs:Any,)->dict[str,Any]:ifnotisinstance(input,dict):msg="The input to RunnablePassthrough.assign() must be a dict."raiseValueError(msg)# noqa: TRY004return{**input,**self.mapper.invoke(input,patch_config(config,callbacks=run_manager.get_child()),**kwargs,),}
asyncdef_ainvoke(self,input:dict[str,Any],run_manager:AsyncCallbackManagerForChainRun,config:RunnableConfig,**kwargs:Any,)->dict[str,Any]:ifnotisinstance(input,dict):msg="The input to RunnablePassthrough.assign() must be a dict."raiseValueError(msg)# noqa: TRY004return{**input,**awaitself.mapper.ainvoke(input,patch_config(config,callbacks=run_manager.get_child()),**kwargs,),}
def_transform(self,input:Iterator[dict[str,Any]],run_manager:CallbackManagerForChainRun,config:RunnableConfig,**kwargs:Any,)->Iterator[dict[str,Any]]:# collect mapper keysmapper_keys=set(self.mapper.steps__.keys())# create two streams, one for the map and one for the passthroughfor_passthrough,for_map=safetee(input,2,lock=threading.Lock())# create map output streammap_output=self.mapper.transform(for_map,patch_config(config,callbacks=run_manager.get_child(),),**kwargs,)# get executor to start map output stream in backgroundwithget_executor_for_config(config)asexecutor:# start map output streamfirst_map_chunk_future=executor.submit(next,map_output,# type: ignoreNone,)# consume passthrough streamforchunkinfor_passthrough:ifnotisinstance(chunk,dict):msg="The input to RunnablePassthrough.assign() must be a dict."raiseValueError(msg)# noqa: TRY004# remove mapper keys from passthrough chunk, to be overwritten by mapfiltered=AddableDict({k:vfork,vinchunk.items()ifknotinmapper_keys})iffiltered:yieldfiltered# yield map outputyieldcast("dict[str, Any]",first_map_chunk_future.result())forchunkinmap_output:yieldchunkdeftransform(self,input:Iterator[dict[str,Any]],config:Optional[RunnableConfig]=None,**kwargs:Any|None,)->Iterator[dict[str,Any]]:yield fromself._transform_stream_with_config(input,self._transform,config,**kwargs)asyncdef_atransform(self,input:AsyncIterator[dict[str,Any]],run_manager:AsyncCallbackManagerForChainRun,config:RunnableConfig,**kwargs:Any,)->AsyncIterator[dict[str,Any]]:# collect mapper keysmapper_keys=set(self.mapper.steps__.keys())# create two streams, one for the map and one for the passthroughfor_passthrough,for_map=atee(input,2,lock=asyncio.Lock())# create map output streammap_output=self.mapper.atransform(for_map,patch_config(config,callbacks=run_manager.get_child(),),**kwargs,)# start map output streamfirst_map_chunk_task:asyncio.Task=asyncio.create_task(py_anext(map_output,None),# type: ignore[arg-type])# consume passthrough streamasyncforchunkinfor_passthrough:ifnotisinstance(chunk,dict):msg="The input to RunnablePassthrough.assign() must be a dict."raiseValueError(msg)# noqa: TRY004# remove mapper keys from passthrough chunk, to be overwritten by map outputfiltered=AddableDict({k:vfork,vinchunk.items()ifknotinmapper_keys})iffiltered:yieldfiltered# yield map outputyieldawaitfirst_map_chunk_taskasyncforchunkinmap_output:yieldchunkasyncdefatransform(self,input:AsyncIterator[dict[str,Any]],config:Optional[RunnableConfig]=None,**kwargs:Any,)->AsyncIterator[dict[str,Any]]:asyncforchunkinself._atransform_stream_with_config(input,self._atransform,config,**kwargs):yieldchunk
[docs]classRunnablePick(RunnableSerializable[dict[str,Any],dict[str,Any]]):"""Runnable that picks keys from Dict[str, Any] inputs. RunnablePick class represents a Runnable that selectively picks keys from a dictionary input. It allows you to specify one or more keys to extract from the input dictionary. It returns a new dictionary containing only the selected keys. Parameters: keys (Union[str, List[str]]): A single key or a list of keys to pick from the input dictionary. Example : .. code-block:: python from langchain_core.runnables.passthrough import RunnablePick input_data = { 'name': 'John', 'age': 30, 'city': 'New York', 'country': 'USA' } runnable = RunnablePick(keys=['name', 'age']) output_data = runnable.invoke(input_data) print(output_data) # Output: {'name': 'John', 'age': 30} """keys:Union[str,list[str]]def__init__(self,keys:Union[str,list[str]],**kwargs:Any)->None:super().__init__(keys=keys,**kwargs)# type: ignore[call-arg]@classmethoddefis_lc_serializable(cls)->bool:returnTrue@classmethoddefget_lc_namespace(cls)->list[str]:"""Get the namespace of the langchain object."""return["langchain","schema","runnable"]defget_name(self,suffix:Optional[str]=None,*,name:Optional[str]=None)->str:name=(nameorself.nameorf"RunnablePick<{','.join([self.keys]ifisinstance(self.keys,str)elseself.keys)}>"# noqa: E501)returnsuper().get_name(suffix,name=name)def_pick(self,input:dict[str,Any])->Any:ifnotisinstance(input,dict):msg="The input to RunnablePassthrough.assign() must be a dict."raiseValueError(msg)# noqa: TRY004ifisinstance(self.keys,str):returninput.get(self.keys)else:picked={k:input.get(k)forkinself.keysifkininput}ifpicked:returnAddableDict(picked)else:returnNonedef_invoke(self,input:dict[str,Any],)->dict[str,Any]:returnself._pick(input)