[docs]classRouterInput(TypedDict):"""Router input. Attributes: key: The key to route on. input: The input to pass to the selected Runnable. """key:strinput:Any
[docs]classRouterRunnable(RunnableSerializable[RouterInput,Output]):"""Runnable that routes to a set of Runnables based on Input['key']. Returns the output of the selected Runnable. Parameters: runnables: A mapping of keys to Runnables. For example, .. code-block:: python from langchain_core.runnables.router import RouterRunnable from langchain_core.runnables import RunnableLambda add = RunnableLambda(func=lambda x: x + 1) square = RunnableLambda(func=lambda x: x**2) router = RouterRunnable(runnables={"add": add, "square": square}) router.invoke({"key": "square", "input": 3}) """runnables:Mapping[str,Runnable[Any,Output]]@propertydefconfig_specs(self)->list[ConfigurableFieldSpec]:returnget_unique_config_specs(specforstepinself.runnables.values()forspecinstep.config_specs)def__init__(self,runnables:Mapping[str,Union[Runnable[Any,Output],Callable[[Any],Output]]],)->None:super().__init__(# type: ignore[call-arg]runnables={key:coerce_to_runnable(r)forkey,rinrunnables.items()})model_config=ConfigDict(arbitrary_types_allowed=True,)@classmethoddefis_lc_serializable(cls)->bool:"""Return whether this class is serializable."""returnTrue@classmethoddefget_lc_namespace(cls)->list[str]:"""Get the namespace of the langchain object."""return["langchain","schema","runnable"]
[docs]definvoke(self,input:RouterInput,config:Optional[RunnableConfig]=None,**kwargs:Any)->Output:key=input["key"]actual_input=input["input"]ifkeynotinself.runnables:msg=f"No runnable associated with key '{key}'"raiseValueError(msg)runnable=self.runnables[key]returnrunnable.invoke(actual_input,config)
[docs]asyncdefainvoke(self,input:RouterInput,config:Optional[RunnableConfig]=None,**kwargs:Optional[Any],)->Output:key=input["key"]actual_input=input["input"]ifkeynotinself.runnables:msg=f"No runnable associated with key '{key}'"raiseValueError(msg)runnable=self.runnables[key]returnawaitrunnable.ainvoke(actual_input,config)
[docs]defbatch(self,inputs:list[RouterInput],config:Optional[Union[RunnableConfig,list[RunnableConfig]]]=None,*,return_exceptions:bool=False,**kwargs:Optional[Any],)->list[Output]:ifnotinputs:return[]keys=[input["key"]forinputininputs]actual_inputs=[input["input"]forinputininputs]ifany(keynotinself.runnablesforkeyinkeys):msg="One or more keys do not have a corresponding runnable"raiseValueError(msg)definvoke(runnable:Runnable,input:Input,config:RunnableConfig)->Union[Output,Exception]:ifreturn_exceptions:try:returnrunnable.invoke(input,config,**kwargs)exceptExceptionase:returneelse:returnrunnable.invoke(input,config,**kwargs)runnables=[self.runnables[key]forkeyinkeys]configs=get_config_list(config,len(inputs))withget_executor_for_config(configs[0])asexecutor:returncast("list[Output]",list(executor.map(invoke,runnables,actual_inputs,configs)),)
[docs]asyncdefabatch(self,inputs:list[RouterInput],config:Optional[Union[RunnableConfig,list[RunnableConfig]]]=None,*,return_exceptions:bool=False,**kwargs:Optional[Any],)->list[Output]:ifnotinputs:return[]keys=[input["key"]forinputininputs]actual_inputs=[input["input"]forinputininputs]ifany(keynotinself.runnablesforkeyinkeys):msg="One or more keys do not have a corresponding runnable"raiseValueError(msg)asyncdefainvoke(runnable:Runnable,input:Input,config:RunnableConfig)->Union[Output,Exception]:ifreturn_exceptions:try:returnawaitrunnable.ainvoke(input,config,**kwargs)exceptExceptionase:returneelse:returnawaitrunnable.ainvoke(input,config,**kwargs)runnables=[self.runnables[key]forkeyinkeys]configs=get_config_list(config,len(inputs))returnawaitgather_with_concurrency(configs[0].get("max_concurrency"),*starmap(ainvoke,zip(runnables,actual_inputs,configs)),)
[docs]defstream(self,input:RouterInput,config:Optional[RunnableConfig]=None,**kwargs:Optional[Any],)->Iterator[Output]:key=input["key"]actual_input=input["input"]ifkeynotinself.runnables:msg=f"No runnable associated with key '{key}'"raiseValueError(msg)runnable=self.runnables[key]yield fromrunnable.stream(actual_input,config)
[docs]asyncdefastream(self,input:RouterInput,config:Optional[RunnableConfig]=None,**kwargs:Optional[Any],)->AsyncIterator[Output]:key=input["key"]actual_input=input["input"]ifkeynotinself.runnables:msg=f"No runnable associated with key '{key}'"raiseValueError(msg)runnable=self.runnables[key]asyncforoutputinrunnable.astream(actual_input,config):yieldoutput