[docs]classRunnableRetry(RunnableBindingBase[Input,Output]):"""Retry a Runnable if it fails. RunnableRetry can be used to add retry logic to any object that subclasses the base Runnable. Such retries are especially useful for network calls that may fail due to transient errors. The RunnableRetry is implemented as a RunnableBinding. The easiest way to use it is through the `.with_retry()` method on all Runnables. Example: Here's an example that uses a RunnableLambda to raise an exception .. code-block:: python import time def foo(input) -> None: '''Fake function that raises an exception.''' raise ValueError(f"Invoking foo failed. At time {time.time()}") runnable = RunnableLambda(foo) runnable_with_retries = runnable.with_retry( retry_if_exception_type=(ValueError,), # Retry only on ValueError wait_exponential_jitter=True, # Add jitter to the exponential backoff stop_after_attempt=2, # Try twice ) # The method invocation above is equivalent to the longer form below: runnable_with_retries = RunnableRetry( bound=runnable, retry_exception_types=(ValueError,), max_attempt_number=2, wait_exponential_jitter=True ) This logic can be used to retry any Runnable, including a chain of Runnables, but in general it's best practice to keep the scope of the retry as small as possible. For example, if you have a chain of Runnables, you should only retry the Runnable that is likely to fail, not the entire chain. Example: .. code-block:: python from langchain_core.chat_models import ChatOpenAI from langchain_core.prompts import PromptTemplate template = PromptTemplate.from_template("tell me a joke about {topic}.") model = ChatOpenAI(temperature=0.5) # Good chain = template | model.with_retry() # Bad chain = template | model retryable_chain = chain.with_retry() """retry_exception_types:Tuple[Type[BaseException],...]=(Exception,)"""The exception types to retry on. By default all exceptions are retried. In general you should only retry on exceptions that are likely to be transient, such as network errors. Good exceptions to retry are all server errors (5xx) and selected client errors (4xx) such as 429 Too Many Requests. """wait_exponential_jitter:bool=True"""Whether to add jitter to the exponential backoff."""max_attempt_number:int=3"""The maximum number of attempts to retry the Runnable."""@classmethoddefget_lc_namespace(cls)->List[str]:"""Get the namespace of the langchain object."""return["langchain","schema","runnable"]@propertydef_kwargs_retrying(self)->Dict[str,Any]:kwargs:Dict[str,Any]=dict()ifself.max_attempt_number:kwargs["stop"]=stop_after_attempt(self.max_attempt_number)ifself.wait_exponential_jitter:kwargs["wait"]=wait_exponential_jitter()ifself.retry_exception_types:kwargs["retry"]=retry_if_exception_type(self.retry_exception_types)returnkwargsdef_sync_retrying(self,**kwargs:Any)->Retrying:returnRetrying(**self._kwargs_retrying,**kwargs)def_async_retrying(self,**kwargs:Any)->AsyncRetrying:returnAsyncRetrying(**self._kwargs_retrying,**kwargs)def_patch_config(self,config:RunnableConfig,run_manager:"T",retry_state:RetryCallState,)->RunnableConfig:attempt=retry_state.attempt_numbertag=f"retry:attempt:{attempt}"ifattempt>1elseNonereturnpatch_config(config,callbacks=run_manager.get_child(tag))def_patch_config_list(self,config:List[RunnableConfig],run_manager:List["T"],retry_state:RetryCallState,)->List[RunnableConfig]:return[self._patch_config(c,rm,retry_state)forc,rminzip(config,run_manager)]def_invoke(self,input:Input,run_manager:"CallbackManagerForChainRun",config:RunnableConfig,**kwargs:Any,)->Output:forattemptinself._sync_retrying(reraise=True):withattempt:result=super().invoke(input,self._patch_config(config,run_manager,attempt.retry_state),**kwargs,)ifattempt.retry_state.outcomeandnotattempt.retry_state.outcome.failed:attempt.retry_state.set_result(result)returnresult
def_batch(self,inputs:List[Input],run_manager:List["CallbackManagerForChainRun"],config:List[RunnableConfig],**kwargs:Any,)->List[Union[Output,Exception]]:results_map:Dict[int,Output]={}defpending(iterable:List[U])->List[U]:return[itemforidx,iteminenumerate(iterable)ifidxnotinresults_map]not_set:List[Output]=[]result=not_settry:forattemptinself._sync_retrying():withattempt:# Get the results of the inputs that have not succeeded yet.result=super().batch(pending(inputs),self._patch_config_list(pending(config),pending(run_manager),attempt.retry_state),return_exceptions=True,**kwargs,)# Register the results of the inputs that have succeeded.first_exception=Nonefori,rinenumerate(result):ifisinstance(r,Exception):ifnotfirst_exception:first_exception=rcontinueresults_map[i]=r# If any exception occurred, raise it, to retry the failed onesiffirst_exception:raisefirst_exceptionif(attempt.retry_state.outcomeandnotattempt.retry_state.outcome.failed):attempt.retry_state.set_result(result)exceptRetryErrorase:ifresultisnot_set:result=cast(List[Output],[e]*len(inputs))outputs:List[Union[Output,Exception]]=[]foridx,_inenumerate(inputs):ifidxinresults_map:outputs.append(results_map[idx])else:outputs.append(result.pop(0))returnoutputs
asyncdef_abatch(self,inputs:List[Input],run_manager:List["AsyncCallbackManagerForChainRun"],config:List[RunnableConfig],**kwargs:Any,)->List[Union[Output,Exception]]:results_map:Dict[int,Output]={}defpending(iterable:List[U])->List[U]:return[itemforidx,iteminenumerate(iterable)ifidxnotinresults_map]not_set:List[Output]=[]result=not_settry:asyncforattemptinself._async_retrying():withattempt:# Get the results of the inputs that have not succeeded yet.result=awaitsuper().abatch(pending(inputs),self._patch_config_list(pending(config),pending(run_manager),attempt.retry_state),return_exceptions=True,**kwargs,)# Register the results of the inputs that have succeeded.first_exception=Nonefori,rinenumerate(result):ifisinstance(r,Exception):ifnotfirst_exception:first_exception=rcontinueresults_map[i]=r# If any exception occurred, raise it, to retry the failed onesiffirst_exception:raisefirst_exceptionif(attempt.retry_state.outcomeandnotattempt.retry_state.outcome.failed):attempt.retry_state.set_result(result)exceptRetryErrorase:ifresultisnot_set:result=cast(List[Output],[e]*len(inputs))outputs:List[Union[Output,Exception]]=[]foridx,_inenumerate(inputs):ifidxinresults_map:outputs.append(results_map[idx])else:outputs.append(result.pop(0))returnoutputs