[docs]classRunnableWithFallbacks(RunnableSerializable[Input,Output]):"""Runnable that can fallback to other Runnables if it fails. External APIs (e.g., APIs for a language model) may at times experience degraded performance or even downtime. In these cases, it can be useful to have a fallback Runnable that can be used in place of the original Runnable (e.g., fallback to another LLM provider). Fallbacks can be defined at the level of a single Runnable, or at the level of a chain of Runnables. Fallbacks are tried in order until one succeeds or all fail. While you can instantiate a ``RunnableWithFallbacks`` directly, it is usually more convenient to use the ``with_fallbacks`` method on a Runnable. Example: .. code-block:: python from langchain_core.chat_models.openai import ChatOpenAI from langchain_core.chat_models.anthropic import ChatAnthropic model = ChatAnthropic( model="claude-3-haiku-20240307" ).with_fallbacks([ChatOpenAI(model="gpt-3.5-turbo-0125")]) # Will usually use ChatAnthropic, but fallback to ChatOpenAI # if ChatAnthropic fails. model.invoke('hello') # And you can also use fallbacks at the level of a chain. # Here if both LLM providers fail, we'll fallback to a good hardcoded # response. from langchain_core.prompts import PromptTemplate from langchain_core.output_parser import StrOutputParser from langchain_core.runnables import RunnableLambda def when_all_is_lost(inputs): return ("Looks like our LLM providers are down. " "Here's a nice 🦜️ emoji for you instead.") chain_with_fallback = ( PromptTemplate.from_template('Tell me a joke about {topic}') | model | StrOutputParser() ).with_fallbacks([RunnableLambda(when_all_is_lost)]) """runnable:Runnable[Input,Output]"""The Runnable to run first."""fallbacks:Sequence[Runnable[Input,Output]]"""A sequence of fallbacks to try."""exceptions_to_handle:tuple[type[BaseException],...]=(Exception,)"""The exceptions on which fallbacks should be tried. Any exception that is not a subclass of these exceptions will be raised immediately. """exception_key:Optional[str]=None"""If string is specified then handled exceptions will be passed to fallbacks as part of the input under the specified key. If None, exceptions will not be passed to fallbacks. If used, the base Runnable and its fallbacks must accept a dictionary as input."""model_config=ConfigDict(arbitrary_types_allowed=True,)@property@overridedefInputType(self)->type[Input]:returnself.runnable.InputType@property@overridedefOutputType(self)->type[Output]:returnself.runnable.OutputTypedefget_input_schema(self,config:Optional[RunnableConfig]=None)->type[BaseModel]:returnself.runnable.get_input_schema(config)defget_output_schema(self,config:Optional[RunnableConfig]=None)->type[BaseModel]:returnself.runnable.get_output_schema(config)@propertydefconfig_specs(self)->list[ConfigurableFieldSpec]:returnget_unique_config_specs(specforstepin[self.runnable,*self.fallbacks]forspecinstep.config_specs)@classmethoddefis_lc_serializable(cls)->bool:returnTrue@classmethoddefget_lc_namespace(cls)->list[str]:"""Get the namespace of the langchain object."""return["langchain","schema","runnable"]@propertydefrunnables(self)->Iterator[Runnable[Input,Output]]:yieldself.runnableyield fromself.fallbacks
[docs]definvoke(self,input:Input,config:Optional[RunnableConfig]=None,**kwargs:Any)->Output:ifself.exception_keyisnotNoneandnotisinstance(input,dict):msg=("If 'exception_key' is specified then input must be a dictionary."f"However found a type of {type(input)} for input")raiseValueError(msg)# setup callbacksconfig=ensure_config(config)callback_manager=get_callback_manager_for_config(config)# start the root runrun_manager=callback_manager.on_chain_start(None,input,name=config.get("run_name")orself.get_name(),run_id=config.pop("run_id",None),)first_error=Nonelast_error=Noneforrunnableinself.runnables:try:ifself.exception_keyandlast_errorisnotNone:input[self.exception_key]=last_errorchild_config=patch_config(config,callbacks=run_manager.get_child())withset_config_context(child_config)ascontext:output=context.run(runnable.invoke,input,config,**kwargs,)exceptself.exceptions_to_handlease:iffirst_errorisNone:first_error=elast_error=eexceptBaseExceptionase:run_manager.on_chain_error(e)raiseelse:run_manager.on_chain_end(output)returnoutputiffirst_errorisNone:msg="No error stored at end of fallbacks."raiseValueError(msg)run_manager.on_chain_error(first_error)raisefirst_error
[docs]asyncdefainvoke(self,input:Input,config:Optional[RunnableConfig]=None,**kwargs:Optional[Any],)->Output:ifself.exception_keyisnotNoneandnotisinstance(input,dict):msg=("If 'exception_key' is specified then input must be a dictionary."f"However found a type of {type(input)} for input")raiseValueError(msg)# setup callbacksconfig=ensure_config(config)callback_manager=get_async_callback_manager_for_config(config)# start the root runrun_manager=awaitcallback_manager.on_chain_start(None,input,name=config.get("run_name")orself.get_name(),run_id=config.pop("run_id",None),)first_error=Nonelast_error=Noneforrunnableinself.runnables:try:ifself.exception_keyandlast_errorisnotNone:input[self.exception_key]=last_errorchild_config=patch_config(config,callbacks=run_manager.get_child())withset_config_context(child_config)ascontext:coro=context.run(runnable.ainvoke,input,config,**kwargs)ifasyncio_accepts_context():output=awaitasyncio.create_task(coro,context=context)# type: ignoreelse:output=awaitcoroexceptself.exceptions_to_handlease:iffirst_errorisNone:first_error=elast_error=eexceptBaseExceptionase:awaitrun_manager.on_chain_error(e)raiseelse:awaitrun_manager.on_chain_end(output)returnoutputiffirst_errorisNone:msg="No error stored at end of fallbacks."raiseValueError(msg)awaitrun_manager.on_chain_error(first_error)raisefirst_error
[docs]defbatch(self,inputs:list[Input],config:Optional[Union[RunnableConfig,list[RunnableConfig]]]=None,*,return_exceptions:bool=False,**kwargs:Optional[Any],)->list[Output]:fromlangchain_core.callbacks.managerimportCallbackManagerifself.exception_keyisnotNoneandnotall(isinstance(input,dict)forinputininputs):msg=("If 'exception_key' is specified then inputs must be dictionaries."f"However found a type of {type(inputs[0])} for input")raiseValueError(msg)ifnotinputs:return[]# setup callbacksconfigs=get_config_list(config,len(inputs))callback_managers=[CallbackManager.configure(inheritable_callbacks=config.get("callbacks"),local_callbacks=None,verbose=False,inheritable_tags=config.get("tags"),local_tags=None,inheritable_metadata=config.get("metadata"),local_metadata=None,)forconfiginconfigs]# start the root runs, one per inputrun_managers=[cm.on_chain_start(None,inputifisinstance(input,dict)else{"input":input},name=config.get("run_name")orself.get_name(),run_id=config.pop("run_id",None),)forcm,input,configinzip(callback_managers,inputs,configs)]to_return:dict[int,Any]={}run_again=dict(enumerate(inputs))handled_exceptions:dict[int,BaseException]={}first_to_raise=Noneforrunnableinself.runnables:outputs=runnable.batch([inputfor_,inputinsorted(run_again.items())],[# each step a child run of the corresponding root runpatch_config(configs[i],callbacks=run_managers[i].get_child())foriinsorted(run_again)],return_exceptions=True,**kwargs,)for(i,input),outputinzip(sorted(run_again.copy().items()),outputs):ifisinstance(output,BaseException)andnotisinstance(output,self.exceptions_to_handle):ifnotreturn_exceptions:first_to_raise=first_to_raiseoroutputelse:handled_exceptions[i]=cast("BaseException",output)run_again.pop(i)elifisinstance(output,self.exceptions_to_handle):ifself.exception_key:input[self.exception_key]=output# type: ignorehandled_exceptions[i]=cast("BaseException",output)else:run_managers[i].on_chain_end(output)to_return[i]=outputrun_again.pop(i)handled_exceptions.pop(i,None)iffirst_to_raise:raisefirst_to_raiseifnotrun_again:breaksorted_handled_exceptions=sorted(handled_exceptions.items())fori,errorinsorted_handled_exceptions:run_managers[i].on_chain_error(error)ifnotreturn_exceptionsandsorted_handled_exceptions:raisesorted_handled_exceptions[0][1]to_return.update(handled_exceptions)return[outputfor_,outputinsorted(to_return.items())]
[docs]asyncdefabatch(self,inputs:list[Input],config:Optional[Union[RunnableConfig,list[RunnableConfig]]]=None,*,return_exceptions:bool=False,**kwargs:Optional[Any],)->list[Output]:fromlangchain_core.callbacks.managerimportAsyncCallbackManagerifself.exception_keyisnotNoneandnotall(isinstance(input,dict)forinputininputs):msg=("If 'exception_key' is specified then inputs must be dictionaries."f"However found a type of {type(inputs[0])} for input")raiseValueError(msg)ifnotinputs:return[]# setup callbacksconfigs=get_config_list(config,len(inputs))callback_managers=[AsyncCallbackManager.configure(inheritable_callbacks=config.get("callbacks"),local_callbacks=None,verbose=False,inheritable_tags=config.get("tags"),local_tags=None,inheritable_metadata=config.get("metadata"),local_metadata=None,)forconfiginconfigs]# start the root runs, one per inputrun_managers:list[AsyncCallbackManagerForChainRun]=awaitasyncio.gather(*(cm.on_chain_start(None,input,name=config.get("run_name")orself.get_name(),run_id=config.pop("run_id",None),)forcm,input,configinzip(callback_managers,inputs,configs)))to_return={}run_again=dict(enumerate(inputs))handled_exceptions:dict[int,BaseException]={}first_to_raise=Noneforrunnableinself.runnables:outputs=awaitrunnable.abatch([inputfor_,inputinsorted(run_again.items())],[# each step a child run of the corresponding root runpatch_config(configs[i],callbacks=run_managers[i].get_child())foriinsorted(run_again)],return_exceptions=True,**kwargs,)for(i,input),outputinzip(sorted(run_again.copy().items()),outputs):ifisinstance(output,BaseException)andnotisinstance(output,self.exceptions_to_handle):ifnotreturn_exceptions:first_to_raise=first_to_raiseoroutputelse:handled_exceptions[i]=cast("BaseException",output)run_again.pop(i)elifisinstance(output,self.exceptions_to_handle):ifself.exception_key:input[self.exception_key]=output# type: ignorehandled_exceptions[i]=cast("BaseException",output)else:to_return[i]=outputawaitrun_managers[i].on_chain_end(output)run_again.pop(i)handled_exceptions.pop(i,None)iffirst_to_raise:raisefirst_to_raiseifnotrun_again:breaksorted_handled_exceptions=sorted(handled_exceptions.items())awaitasyncio.gather(*(run_managers[i].on_chain_error(error)fori,errorinsorted_handled_exceptions))ifnotreturn_exceptionsandsorted_handled_exceptions:raisesorted_handled_exceptions[0][1]to_return.update(handled_exceptions)return[outputfor_,outputinsorted(to_return.items())]# type: ignore
[docs]defstream(self,input:Input,config:Optional[RunnableConfig]=None,**kwargs:Optional[Any],)->Iterator[Output]:""""""ifself.exception_keyisnotNoneandnotisinstance(input,dict):msg=("If 'exception_key' is specified then input must be a dictionary."f"However found a type of {type(input)} for input")raiseValueError(msg)# setup callbacksconfig=ensure_config(config)callback_manager=get_callback_manager_for_config(config)# start the root runrun_manager=callback_manager.on_chain_start(None,input,name=config.get("run_name")orself.get_name(),run_id=config.pop("run_id",None),)first_error=Nonelast_error=Noneforrunnableinself.runnables:try:ifself.exception_keyandlast_errorisnotNone:input[self.exception_key]=last_errorchild_config=patch_config(config,callbacks=run_manager.get_child())withset_config_context(child_config)ascontext:stream=context.run(runnable.stream,input,**kwargs,)chunk:Output=context.run(next,stream)# type: ignoreexceptself.exceptions_to_handlease:first_error=eiffirst_errorisNoneelsefirst_errorlast_error=eexceptBaseExceptionase:run_manager.on_chain_error(e)raiseelse:first_error=Nonebreakiffirst_error:run_manager.on_chain_error(first_error)raisefirst_erroryieldchunkoutput:Optional[Output]=chunktry:forchunkinstream:yieldchunktry:output=output+chunk# type: ignoreexceptTypeError:output=NoneexceptBaseExceptionase:run_manager.on_chain_error(e)raiserun_manager.on_chain_end(output)
[docs]asyncdefastream(self,input:Input,config:Optional[RunnableConfig]=None,**kwargs:Optional[Any],)->AsyncIterator[Output]:ifself.exception_keyisnotNoneandnotisinstance(input,dict):msg=("If 'exception_key' is specified then input must be a dictionary."f"However found a type of {type(input)} for input")raiseValueError(msg)# setup callbacksconfig=ensure_config(config)callback_manager=get_async_callback_manager_for_config(config)# start the root runrun_manager=awaitcallback_manager.on_chain_start(None,input,name=config.get("run_name")orself.get_name(),run_id=config.pop("run_id",None),)first_error=Nonelast_error=Noneforrunnableinself.runnables:try:ifself.exception_keyandlast_errorisnotNone:input[self.exception_key]=last_errorchild_config=patch_config(config,callbacks=run_manager.get_child())withset_config_context(child_config)ascontext:stream=runnable.astream(input,child_config,**kwargs,)ifasyncio_accepts_context():chunk:Output=awaitasyncio.create_task(# type: ignore[call-arg]py_anext(stream),# type: ignore[arg-type]context=context,)else:chunk=cast("Output",awaitpy_anext(stream))exceptself.exceptions_to_handlease:first_error=eiffirst_errorisNoneelsefirst_errorlast_error=eexceptBaseExceptionase:awaitrun_manager.on_chain_error(e)raiseelse:first_error=Nonebreakiffirst_error:awaitrun_manager.on_chain_error(first_error)raisefirst_erroryieldchunkoutput:Optional[Output]=chunktry:asyncforchunkinstream:yieldchunktry:output=output+chunk# type: ignoreexceptTypeError:output=NoneexceptBaseExceptionase:awaitrun_manager.on_chain_error(e)raiseawaitrun_manager.on_chain_end(output)
def__getattr__(self,name:str)->Any:"""Get an attribute from the wrapped Runnable and its fallbacks. Returns: If the attribute is anything other than a method that outputs a Runnable, returns getattr(self.runnable, name). If the attribute is a method that does return a new Runnable (e.g. llm.bind_tools([...]) outputs a new RunnableBinding) then self.runnable and each of the runnables in self.fallbacks is replaced with getattr(x, name). Example: .. code-block:: python from langchain_openai import ChatOpenAI from langchain_anthropic import ChatAnthropic gpt_4o = ChatOpenAI(model="gpt-4o") claude_3_sonnet = ChatAnthropic(model="claude-3-sonnet-20240229") llm = gpt_4o.with_fallbacks([claude_3_sonnet]) llm.model_name # -> "gpt-4o" # .bind_tools() is called on both ChatOpenAI and ChatAnthropic # Equivalent to: # gpt_4o.bind_tools([...]).with_fallbacks([claude_3_sonnet.bind_tools([...])]) llm.bind_tools([...]) # -> RunnableWithFallbacks( runnable=RunnableBinding(bound=ChatOpenAI(...), kwargs={"tools": [...]}), fallbacks=[RunnableBinding(bound=ChatAnthropic(...), kwargs={"tools": [...]})], ) """# noqa: E501attr=getattr(self.runnable,name)if_returns_runnable(attr):@wraps(attr)defwrapped(*args:Any,**kwargs:Any)->Any:new_runnable=attr(*args,**kwargs)new_fallbacks=[]forfallbackinself.fallbacks:fallback_attr=getattr(fallback,name)new_fallbacks.append(fallback_attr(*args,**kwargs))returnself.__class__(**{**self.model_dump(),"runnable":new_runnable,"fallbacks":new_fallbacks,})returnwrappedreturnattr