Source code for langchain_core.beta.runnables.context
importasyncioimportthreadingfromcollectionsimportdefaultdictfromcollections.abcimportAwaitable,Mapping,Sequencefromfunctoolsimportpartialfromitertoolsimportgroupbyfromtypingimport(Any,Callable,Optional,TypeVar,Union,)frompydanticimportConfigDictfromlangchain_core._api.beta_decoratorimportbetafromlangchain_core.runnables.baseimport(Runnable,RunnableSerializable,coerce_to_runnable,)fromlangchain_core.runnables.configimportRunnableConfig,ensure_config,patch_configfromlangchain_core.runnables.utilsimportConfigurableFieldSpec,Input,OutputT=TypeVar("T")Values=dict[Union[asyncio.Event,threading.Event],Any]CONTEXT_CONFIG_PREFIX="__context__/"CONTEXT_CONFIG_SUFFIX_GET="/get"CONTEXT_CONFIG_SUFFIX_SET="/set"asyncdef_asetter(done:asyncio.Event,values:Values,value:T)->T:values[done]=valuedone.set()returnvalueasyncdef_agetter(done:asyncio.Event,values:Values)->Any:awaitdone.wait()returnvalues[done]def_setter(done:threading.Event,values:Values,value:T)->T:values[done]=valuedone.set()returnvaluedef_getter(done:threading.Event,values:Values)->Any:done.wait()returnvalues[done]def_key_from_id(id_:str)->str:wout_prefix=id_.split(CONTEXT_CONFIG_PREFIX,maxsplit=1)[1]ifwout_prefix.endswith(CONTEXT_CONFIG_SUFFIX_GET):returnwout_prefix[:-len(CONTEXT_CONFIG_SUFFIX_GET)]elifwout_prefix.endswith(CONTEXT_CONFIG_SUFFIX_SET):returnwout_prefix[:-len(CONTEXT_CONFIG_SUFFIX_SET)]else:msg=f"Invalid context config id {id_}"raiseValueError(msg)def_config_with_context(config:RunnableConfig,steps:list[Runnable],setter:Callable,getter:Callable,event_cls:Union[type[threading.Event],type[asyncio.Event]],)->RunnableConfig:ifany(k.startswith(CONTEXT_CONFIG_PREFIX)forkinconfig.get("configurable",{})):returnconfigcontext_specs=[(spec,i)fori,stepinenumerate(steps)forspecinstep.config_specsifspec.id.startswith(CONTEXT_CONFIG_PREFIX)]grouped_by_key={key:list(group)forkey,groupingroupby(sorted(context_specs,key=lambdas:s[0].id),key=lambdas:_key_from_id(s[0].id),)}deps_by_key={key:{_key_from_id(dep)forspecingroupfordepin(spec[0].dependenciesor[])}forkey,groupingrouped_by_key.items()}values:Values={}events:defaultdict[str,Union[asyncio.Event,threading.Event]]=defaultdict(event_cls)context_funcs:dict[str,Callable[[],Any]]={}forkey,groupingrouped_by_key.items():getters=[sforsingroupifs[0].id.endswith(CONTEXT_CONFIG_SUFFIX_GET)]setters=[sforsingroupifs[0].id.endswith(CONTEXT_CONFIG_SUFFIX_SET)]fordepindeps_by_key[key]:ifkeyindeps_by_key[dep]:msg=f"Deadlock detected between context keys {key} and {dep}"raiseValueError(msg)iflen(setters)!=1:msg=f"Expected exactly one setter for context key {key}"raiseValueError(msg)setter_idx=setters[0][1]ifany(getter_idx<setter_idxfor_,getter_idxingetters):msg=f"Context setter for key {key} must be defined after all getters."raiseValueError(msg)ifgetters:context_funcs[getters[0][0].id]=partial(getter,events[key],values)context_funcs[setters[0][0].id]=partial(setter,events[key],values)returnpatch_config(config,configurable=context_funcs)
[docs]defaconfig_with_context(config:RunnableConfig,steps:list[Runnable],)->RunnableConfig:"""Asynchronously patch a runnable config with context getters and setters. Args: config: The runnable config. steps: The runnable steps. Returns: The patched runnable config. """return_config_with_context(config,steps,_asetter,_agetter,asyncio.Event)
[docs]defconfig_with_context(config:RunnableConfig,steps:list[Runnable],)->RunnableConfig:"""Patch a runnable config with context getters and setters. Args: config: The runnable config. steps: The runnable steps. Returns: The patched runnable config. """return_config_with_context(config,steps,_setter,_getter,threading.Event)
[docs]@beta()classContextGet(RunnableSerializable):"""Get a context value."""prefix:str=""key:Union[str,list[str]]def__str__(self)->str:returnf"ContextGet({_print_keys(self.key)})"@propertydefids(self)->list[str]:prefix=self.prefix+"/"ifself.prefixelse""keys=self.keyifisinstance(self.key,list)else[self.key]return[f"{CONTEXT_CONFIG_PREFIX}{prefix}{k}{CONTEXT_CONFIG_SUFFIX_GET}"forkinkeys]@propertydefconfig_specs(self)->list[ConfigurableFieldSpec]:returnsuper().config_specs+[ConfigurableFieldSpec(id=id_,annotation=Callable[[],Any],)forid_inself.ids]
[docs]@beta()classContextSet(RunnableSerializable):"""Set a context value."""prefix:str=""keys:Mapping[str,Optional[Runnable]]model_config=ConfigDict(arbitrary_types_allowed=True,)def__init__(self,key:Optional[str]=None,value:Optional[SetValue]=None,prefix:str="",**kwargs:SetValue,):ifkeyisnotNone:kwargs[key]=valuesuper().__init__(# type: ignore[call-arg]keys={k:_coerce_set_value(v)ifvisnotNoneelseNonefork,vinkwargs.items()},prefix=prefix,)def__str__(self)->str:returnf"ContextSet({_print_keys(list(self.keys.keys()))})"@propertydefids(self)->list[str]:prefix=self.prefix+"/"ifself.prefixelse""return[f"{CONTEXT_CONFIG_PREFIX}{prefix}{key}{CONTEXT_CONFIG_SUFFIX_SET}"forkeyinself.keys]@propertydefconfig_specs(self)->list[ConfigurableFieldSpec]:mapper_config_specs=[sformapperinself.keys.values()ifmapperisnotNoneforsinmapper.config_specs]forspecinmapper_config_specs:ifspec.id.endswith(CONTEXT_CONFIG_SUFFIX_GET):getter_key=spec.id.split("/")[1]ifgetter_keyinself.keys:msg=f"Circular reference in context setter for key {getter_key}"raiseValueError(msg)returnsuper().config_specs+[ConfigurableFieldSpec(id=id_,annotation=Callable[[],Any],)forid_inself.ids]
[docs]classContext:"""Context for a runnable. The `Context` class provides methods for creating context scopes, getters, and setters within a runnable. It allows for managing and accessing contextual information throughout the execution of a program. Example: .. code-block:: python from langchain_core.beta.runnables.context import Context from langchain_core.runnables.passthrough import RunnablePassthrough from langchain_core.prompts.prompt import PromptTemplate from langchain_core.output_parsers.string import StrOutputParser from tests.unit_tests.fake.llm import FakeListLLM chain = ( Context.setter("input") | { "context": RunnablePassthrough() | Context.setter("context"), "question": RunnablePassthrough(), } | PromptTemplate.from_template("{context} {question}") | FakeListLLM(responses=["hello"]) | StrOutputParser() | { "result": RunnablePassthrough(), "context": Context.getter("context"), "input": Context.getter("input"), } ) # Use the chain output = chain.invoke("What's your name?") print(output["result"]) # Output: "hello" print(output["context"]) # Output: "What's your name?" print(output["input"]) # Output: "What's your name? """
[docs]@staticmethoddefcreate_scope(scope:str,/)->"PrefixContext":"""Create a context scope. Args: scope: The scope. Returns: The context scope. """returnPrefixContext(prefix=scope)