from__future__importannotationsimportasyncioimportuuidimportwarningsfromcollections.abcimportAwaitable,Generator,Iterable,Iterator,Sequencefromconcurrent.futuresimportExecutor,Future,ThreadPoolExecutorfromcontextlibimportcontextmanagerfromcontextvarsimportContext,ContextVar,Token,copy_contextfromfunctoolsimportpartialfromtypingimportTYPE_CHECKING,Any,Callable,Optional,TypeVar,Union,castfromtyping_extensionsimportParamSpec,TypedDictfromlangchain_core.runnables.utilsimport(Input,Output,accepts_config,accepts_run_manager,)ifTYPE_CHECKING:fromlangchain_core.callbacks.baseimportBaseCallbackManager,Callbacksfromlangchain_core.callbacks.managerimport(AsyncCallbackManager,AsyncCallbackManagerForChainRun,CallbackManager,CallbackManagerForChainRun,)else:# Pydantic validates through typed dicts, but# the callbacks need forward refs updatedCallbacks=Optional[Union[list,Any]]
[docs]classRunnableConfig(TypedDict,total=False):"""Configuration for a Runnable."""tags:list[str]""" Tags for this call and any sub-calls (eg. a Chain calling an LLM). You can use these to filter calls. """metadata:dict[str,Any]""" Metadata for this call and any sub-calls (eg. a Chain calling an LLM). Keys should be strings, values should be JSON-serializable. """callbacks:Callbacks""" Callbacks for this call and any sub-calls (eg. a Chain calling an LLM). Tags are passed to all callbacks, metadata is passed to handle*Start callbacks. """run_name:str""" Name for the tracer run for this call. Defaults to the name of the class. """max_concurrency:Optional[int]""" Maximum number of parallel calls to make. If not provided, defaults to ThreadPoolExecutor's default. """recursion_limit:int""" Maximum number of times a call can recurse. If not provided, defaults to 25. """configurable:dict[str,Any]""" Runtime values for attributes previously made configurable on this Runnable, or sub-Runnables, through .configurable_fields() or .configurable_alternatives(). Check .output_schema() for a description of the attributes that have been made configurable. """run_id:Optional[uuid.UUID]""" Unique identifier for the tracer run for this call. If not provided, a new UUID will be generated. """
CONFIG_KEYS=["tags","metadata","callbacks","run_name","max_concurrency","recursion_limit","configurable","run_id",]COPIABLE_KEYS=["tags","metadata","callbacks","configurable",]DEFAULT_RECURSION_LIMIT=25var_child_runnable_config:ContextVar[RunnableConfig|None]=ContextVar("child_runnable_config",default=None)# This is imported and used in langgraph, so don't break.def_set_config_context(config:RunnableConfig,)->tuple[Token[Optional[RunnableConfig]],Optional[dict[str,Any]]]:"""Set the child Runnable config + tracing context. Args: config (RunnableConfig): The config to set. """fromlangchain_core.tracers.langchainimportLangChainTracerconfig_token=var_child_runnable_config.set(config)current_context=Noneif((callbacks:=config.get("callbacks"))and(parent_run_id:=getattr(callbacks,"parent_run_id",None))# Is callback managerand(tracer:=next((handlerforhandleringetattr(callbacks,"handlers",[])ifisinstance(handler,LangChainTracer)),None,))and(run:=tracer.run_map.get(str(parent_run_id)))):fromlangsmith.run_helpersimport_set_tracing_context,get_tracing_contextcurrent_context=get_tracing_context()_set_tracing_context({"parent":run})returnconfig_token,current_context
[docs]@contextmanagerdefset_config_context(config:RunnableConfig)->Generator[Context,None,None]:"""Set the child Runnable config + tracing context. Args: config (RunnableConfig): The config to set. """fromlangsmith.run_helpersimport_set_tracing_contextctx=copy_context()config_token,_=ctx.run(_set_config_context,config)try:yieldctxfinally:ctx.run(var_child_runnable_config.reset,config_token)ctx.run(_set_tracing_context,{"parent":None,"project_name":None,"tags":None,"metadata":None,"enabled":None,"client":None,},)
[docs]defensure_config(config:Optional[RunnableConfig]=None)->RunnableConfig:"""Ensure that a config is a dict with all keys present. Args: config (Optional[RunnableConfig], optional): The config to ensure. Defaults to None. Returns: RunnableConfig: The ensured config. """empty=RunnableConfig(tags=[],metadata={},callbacks=None,recursion_limit=DEFAULT_RECURSION_LIMIT,configurable={},)ifvar_config:=var_child_runnable_config.get():empty.update(cast(RunnableConfig,{k:v.copy()ifkinCOPIABLE_KEYSelsev# type: ignore[attr-defined]fork,vinvar_config.items()ifvisnotNone},))ifconfigisnotNone:empty.update(cast(RunnableConfig,{k:v.copy()ifkinCOPIABLE_KEYSelsev# type: ignore[attr-defined]fork,vinconfig.items()ifvisnotNoneandkinCONFIG_KEYS},))ifconfigisnotNone:fork,vinconfig.items():ifknotinCONFIG_KEYSandvisnotNone:empty["configurable"][k]=vforkey,valueinempty.get("configurable",{}).items():if(notkey.startswith("__")andisinstance(value,(str,int,float,bool))andkeynotinempty["metadata"]):empty["metadata"][key]=valuereturnempty
[docs]defget_config_list(config:Optional[Union[RunnableConfig,Sequence[RunnableConfig]]],length:int)->list[RunnableConfig]:"""Get a list of configs from a single config or a list of configs. It is useful for subclasses overriding batch() or abatch(). Args: config (Optional[Union[RunnableConfig, List[RunnableConfig]]]): The config or list of configs. length (int): The length of the list. Returns: List[RunnableConfig]: The list of configs. Raises: ValueError: If the length of the list is not equal to the length of the inputs. """iflength<0:msg=f"length must be >= 0, but got {length}"raiseValueError(msg)ifisinstance(config,Sequence)andlen(config)!=length:msg=(f"config must be a list of the same length as inputs, "f"but got {len(config)} configs for {length} inputs")raiseValueError(msg)ifisinstance(config,Sequence):returnlist(map(ensure_config,config))iflength>1andisinstance(config,dict)andconfig.get("run_id")isnotNone:warnings.warn("Provided run_id be used only for the first element of the batch.",category=RuntimeWarning,stacklevel=3,)subsequent=cast(RunnableConfig,{k:vfork,vinconfig.items()ifk!="run_id"})return[ensure_config(subsequent)ifielseensure_config(config)foriinrange(length)]return[ensure_config(config)foriinrange(length)]
[docs]defpatch_config(config:Optional[RunnableConfig],*,callbacks:Optional[BaseCallbackManager]=None,recursion_limit:Optional[int]=None,max_concurrency:Optional[int]=None,run_name:Optional[str]=None,configurable:Optional[dict[str,Any]]=None,)->RunnableConfig:"""Patch a config with new values. Args: config (Optional[RunnableConfig]): The config to patch. callbacks (Optional[BaseCallbackManager], optional): The callbacks to set. Defaults to None. recursion_limit (Optional[int], optional): The recursion limit to set. Defaults to None. max_concurrency (Optional[int], optional): The max concurrency to set. Defaults to None. run_name (Optional[str], optional): The run name to set. Defaults to None. configurable (Optional[Dict[str, Any]], optional): The configurable to set. Defaults to None. Returns: RunnableConfig: The patched config. """config=ensure_config(config)ifcallbacksisnotNone:# If we're replacing callbacks, we need to unset run_name# As that should apply only to the same run as the original callbacksconfig["callbacks"]=callbacksif"run_name"inconfig:delconfig["run_name"]if"run_id"inconfig:delconfig["run_id"]ifrecursion_limitisnotNone:config["recursion_limit"]=recursion_limitifmax_concurrencyisnotNone:config["max_concurrency"]=max_concurrencyifrun_nameisnotNone:config["run_name"]=run_nameifconfigurableisnotNone:config["configurable"]={**config.get("configurable",{}),**configurable}returnconfig
[docs]defmerge_configs(*configs:Optional[RunnableConfig])->RunnableConfig:"""Merge multiple configs into one. Args: *configs (Optional[RunnableConfig]): The configs to merge. Returns: RunnableConfig: The merged config. """base:RunnableConfig={}# Even though the keys aren't literals, this is correct# because both dicts are the same typeforconfigin(ensure_config(c)forcinconfigsifcisnotNone):forkeyinconfig:ifkey=="metadata":base[key]={# type: ignore**base.get(key,{}),# type: ignore**(config.get(key)or{}),# type: ignore}elifkey=="tags":base[key]=sorted(# type: ignoreset(base.get(key,[])+(config.get(key)or[])),# type: ignore)elifkey=="configurable":base[key]={# type: ignore**base.get(key,{}),# type: ignore**(config.get(key)or{}),# type: ignore}elifkey=="callbacks":base_callbacks=base.get("callbacks")these_callbacks=config["callbacks"]# callbacks can be either None, list[handler] or manager# so merging two callbacks values has 6 casesifisinstance(these_callbacks,list):ifbase_callbacksisNone:base["callbacks"]=these_callbacks.copy()elifisinstance(base_callbacks,list):base["callbacks"]=base_callbacks+these_callbackselse:# base_callbacks is a managermngr=base_callbacks.copy()forcallbackinthese_callbacks:mngr.add_handler(callback,inherit=True)base["callbacks"]=mngrelifthese_callbacksisnotNone:# these_callbacks is a managerifbase_callbacksisNone:base["callbacks"]=these_callbacks.copy()elifisinstance(base_callbacks,list):mngr=these_callbacks.copy()forcallbackinbase_callbacks:mngr.add_handler(callback,inherit=True)base["callbacks"]=mngrelse:# base_callbacks is also a managerbase["callbacks"]=base_callbacks.merge(these_callbacks)elifkey=="recursion_limit":ifconfig["recursion_limit"]!=DEFAULT_RECURSION_LIMIT:base["recursion_limit"]=config["recursion_limit"]elifkeyinCOPIABLE_KEYSandconfig[key]isnotNone:# type: ignore[literal-required]base[key]=config[key].copy()# type: ignore[literal-required]else:base[key]=config[key]orbase.get(key)# type: ignorereturnbase
[docs]defcall_func_with_variable_args(func:Union[Callable[[Input],Output],Callable[[Input,RunnableConfig],Output],Callable[[Input,CallbackManagerForChainRun],Output],Callable[[Input,CallbackManagerForChainRun,RunnableConfig],Output],],input:Input,config:RunnableConfig,run_manager:Optional[CallbackManagerForChainRun]=None,**kwargs:Any,)->Output:"""Call function that may optionally accept a run_manager and/or config. Args: func (Union[Callable[[Input], Output], Callable[[Input, CallbackManagerForChainRun], Output], Callable[[Input, CallbackManagerForChainRun, RunnableConfig], Output]]): The function to call. input (Input): The input to the function. config (RunnableConfig): The config to pass to the function. run_manager (CallbackManagerForChainRun): The run manager to pass to the function. Defaults to None. **kwargs (Any): The keyword arguments to pass to the function. Returns: Output: The output of the function. """ifaccepts_config(func):ifrun_managerisnotNone:kwargs["config"]=patch_config(config,callbacks=run_manager.get_child())else:kwargs["config"]=configifrun_managerisnotNoneandaccepts_run_manager(func):kwargs["run_manager"]=run_managerreturnfunc(input,**kwargs)# type: ignore[call-arg]
[docs]defacall_func_with_variable_args(func:Union[Callable[[Input],Awaitable[Output]],Callable[[Input,RunnableConfig],Awaitable[Output]],Callable[[Input,AsyncCallbackManagerForChainRun],Awaitable[Output]],Callable[[Input,AsyncCallbackManagerForChainRun,RunnableConfig],Awaitable[Output],],],input:Input,config:RunnableConfig,run_manager:Optional[AsyncCallbackManagerForChainRun]=None,**kwargs:Any,)->Awaitable[Output]:"""Async call function that may optionally accept a run_manager and/or config. Args: func (Union[Callable[[Input], Awaitable[Output]], Callable[[Input, AsyncCallbackManagerForChainRun], Awaitable[Output]], Callable[[Input, AsyncCallbackManagerForChainRun, RunnableConfig], Awaitable[Output]]]): The function to call. input (Input): The input to the function. config (RunnableConfig): The config to pass to the function. run_manager (AsyncCallbackManagerForChainRun): The run manager to pass to the function. Defaults to None. **kwargs (Any): The keyword arguments to pass to the function. Returns: Output: The output of the function. """ifaccepts_config(func):ifrun_managerisnotNone:kwargs["config"]=patch_config(config,callbacks=run_manager.get_child())else:kwargs["config"]=configifrun_managerisnotNoneandaccepts_run_manager(func):kwargs["run_manager"]=run_managerreturnfunc(input,**kwargs)# type: ignore[call-arg]
[docs]defget_callback_manager_for_config(config:RunnableConfig)->CallbackManager:"""Get a callback manager for a config. Args: config (RunnableConfig): The config. Returns: CallbackManager: The callback manager. """fromlangchain_core.callbacks.managerimportCallbackManagerreturnCallbackManager.configure(inheritable_callbacks=config.get("callbacks"),inheritable_tags=config.get("tags"),inheritable_metadata=config.get("metadata"),)
[docs]defget_async_callback_manager_for_config(config:RunnableConfig,)->AsyncCallbackManager:"""Get an async callback manager for a config. Args: config (RunnableConfig): The config. Returns: AsyncCallbackManager: The async callback manager. """fromlangchain_core.callbacks.managerimportAsyncCallbackManagerreturnAsyncCallbackManager.configure(inheritable_callbacks=config.get("callbacks"),inheritable_tags=config.get("tags"),inheritable_metadata=config.get("metadata"),)
P=ParamSpec("P")T=TypeVar("T")
[docs]classContextThreadPoolExecutor(ThreadPoolExecutor):"""ThreadPoolExecutor that copies the context to the child thread."""
[docs]defsubmit(# type: ignore[override]self,func:Callable[P,T],*args:P.args,**kwargs:P.kwargs,)->Future[T]:"""Submit a function to the executor. Args: func (Callable[..., T]): The function to submit. *args (Any): The positional arguments to the function. **kwargs (Any): The keyword arguments to the function. Returns: Future[T]: The future for the function. """returnsuper().submit(cast(Callable[...,T],partial(copy_context().run,func,*args,**kwargs)))
[docs]defmap(self,fn:Callable[...,T],*iterables:Iterable[Any],timeout:float|None=None,chunksize:int=1,)->Iterator[T]:"""Map a function to multiple iterables. Args: fn (Callable[..., T]): The function to map. *iterables (Iterable[Any]): The iterables to map over. timeout (float | None, optional): The timeout for the map. Defaults to None. chunksize (int, optional): The chunksize for the map. Defaults to 1. Returns: Iterator[T]: The iterator for the mapped function. """contexts=[copy_context()for_inrange(len(iterables[0]))]# type: ignore[arg-type]def_wrapped_fn(*args:Any)->T:returncontexts.pop().run(fn,*args)returnsuper().map(_wrapped_fn,*iterables,timeout=timeout,chunksize=chunksize,)
[docs]@contextmanagerdefget_executor_for_config(config:Optional[RunnableConfig],)->Generator[Executor,None,None]:"""Get an executor for a config. Args: config (RunnableConfig): The config. Yields: Generator[Executor, None, None]: The executor. """config=configor{}withContextThreadPoolExecutor(max_workers=config.get("max_concurrency"))asexecutor:yieldexecutor
[docs]asyncdefrun_in_executor(executor_or_config:Optional[Union[Executor,RunnableConfig]],func:Callable[P,T],*args:P.args,**kwargs:P.kwargs,)->T:"""Run a function in an executor. Args: executor_or_config: The executor or config to run in. func (Callable[P, Output]): The function. *args (Any): The positional arguments to the function. **kwargs (Any): The keyword arguments to the function. Returns: Output: The output of the function. Raises: RuntimeError: If the function raises a StopIteration. """defwrapper()->T:try:returnfunc(*args,**kwargs)exceptStopIterationasexc:# StopIteration can't be set on an asyncio.Future# it raises a TypeError and leaves the Future pending forever# so we need to convert it to a RuntimeErrorraiseRuntimeErrorfromexcifexecutor_or_configisNoneorisinstance(executor_or_config,dict):# Use default executor with context copied from current contextreturnawaitasyncio.get_running_loop().run_in_executor(None,cast(Callable[...,T],partial(copy_context().run,wrapper)),)returnawaitasyncio.get_running_loop().run_in_executor(executor_or_config,wrapper)