Source code for langchain_core.beta.runnables.context
"""Context management for runnables."""importasyncioimportthreadingfromcollectionsimportdefaultdictfromcollections.abcimportAwaitable,Mapping,Sequencefromfunctoolsimportpartialfromitertoolsimportgroupbyfromtypingimport(Any,Callable,Optional,TypeVar,Union,)frompydanticimportConfigDictfromtyping_extensionsimportoverridefromlangchain_core._api.beta_decoratorimportbetafromlangchain_core.runnables.baseimport(Runnable,RunnableSerializable,coerce_to_runnable,)fromlangchain_core.runnables.configimportRunnableConfig,ensure_config,patch_configfromlangchain_core.runnables.utilsimportConfigurableFieldSpec,Input,OutputT=TypeVar("T")Values=dict[Union[asyncio.Event,threading.Event],Any]CONTEXT_CONFIG_PREFIX="__context__/"CONTEXT_CONFIG_SUFFIX_GET="/get"CONTEXT_CONFIG_SUFFIX_SET="/set"asyncdef_asetter(done:asyncio.Event,values:Values,value:T)->T:values[done]=valuedone.set()returnvalueasyncdef_agetter(done:asyncio.Event,values:Values)->Any:awaitdone.wait()returnvalues[done]def_setter(done:threading.Event,values:Values,value:T)->T:values[done]=valuedone.set()returnvaluedef_getter(done:threading.Event,values:Values)->Any:done.wait()returnvalues[done]def_key_from_id(id_:str)->str:wout_prefix=id_.split(CONTEXT_CONFIG_PREFIX,maxsplit=1)[1]ifwout_prefix.endswith(CONTEXT_CONFIG_SUFFIX_GET):returnwout_prefix[:-len(CONTEXT_CONFIG_SUFFIX_GET)]ifwout_prefix.endswith(CONTEXT_CONFIG_SUFFIX_SET):returnwout_prefix[:-len(CONTEXT_CONFIG_SUFFIX_SET)]msg=f"Invalid context config id {id_}"raiseValueError(msg)def_config_with_context(config:RunnableConfig,steps:list[Runnable],setter:Callable,getter:Callable,event_cls:Union[type[threading.Event],type[asyncio.Event]],)->RunnableConfig:ifany(k.startswith(CONTEXT_CONFIG_PREFIX)forkinconfig.get("configurable",{})):returnconfigcontext_specs=[(spec,i)fori,stepinenumerate(steps)forspecinstep.config_specsifspec.id.startswith(CONTEXT_CONFIG_PREFIX)]grouped_by_key={key:list(group)forkey,groupingroupby(sorted(context_specs,key=lambdas:s[0].id),key=lambdas:_key_from_id(s[0].id),)}deps_by_key={key:{_key_from_id(dep)forspecingroupfordepin(spec[0].dependenciesor[])}forkey,groupingrouped_by_key.items()}values:Values={}events:defaultdict[str,Union[asyncio.Event,threading.Event]]=defaultdict(event_cls)context_funcs:dict[str,Callable[[],Any]]={}forkey,groupingrouped_by_key.items():getters=[sforsingroupifs[0].id.endswith(CONTEXT_CONFIG_SUFFIX_GET)]setters=[sforsingroupifs[0].id.endswith(CONTEXT_CONFIG_SUFFIX_SET)]fordepindeps_by_key[key]:ifkeyindeps_by_key[dep]:msg=f"Deadlock detected between context keys {key} and {dep}"raiseValueError(msg)iflen(setters)!=1:msg=f"Expected exactly one setter for context key {key}"raiseValueError(msg)setter_idx=setters[0][1]ifany(getter_idx<setter_idxfor_,getter_idxingetters):msg=f"Context setter for key {key} must be defined after all getters."raiseValueError(msg)ifgetters:context_funcs[getters[0][0].id]=partial(getter,events[key],values)context_funcs[setters[0][0].id]=partial(setter,events[key],values)returnpatch_config(config,configurable=context_funcs)
[docs]defaconfig_with_context(config:RunnableConfig,steps:list[Runnable],)->RunnableConfig:"""Asynchronously patch a runnable config with context getters and setters. Args: config: The runnable config. steps: The runnable steps. Returns: The patched runnable config. """return_config_with_context(config,steps,_asetter,_agetter,asyncio.Event)
[docs]defconfig_with_context(config:RunnableConfig,steps:list[Runnable],)->RunnableConfig:"""Patch a runnable config with context getters and setters. Args: config: The runnable config. steps: The runnable steps. Returns: The patched runnable config. """return_config_with_context(config,steps,_setter,_getter,threading.Event)
[docs]@beta()classContextGet(RunnableSerializable):"""Get a context value."""prefix:str=""key:Union[str,list[str]]@overridedef__str__(self)->str:returnf"ContextGet({_print_keys(self.key)})"@propertydefids(self)->list[str]:"""The context getter ids."""prefix=self.prefix+"/"ifself.prefixelse""keys=self.keyifisinstance(self.key,list)else[self.key]return[f"{CONTEXT_CONFIG_PREFIX}{prefix}{k}{CONTEXT_CONFIG_SUFFIX_GET}"forkinkeys]@property@overridedefconfig_specs(self)->list[ConfigurableFieldSpec]:returnsuper().config_specs+[ConfigurableFieldSpec(id=id_,annotation=Callable[[],Any],)forid_inself.ids]
[docs]classContext:"""Context for a runnable. The `Context` class provides methods for creating context scopes, getters, and setters within a runnable. It allows for managing and accessing contextual information throughout the execution of a program. Example: .. code-block:: python from langchain_core.beta.runnables.context import Context from langchain_core.runnables.passthrough import RunnablePassthrough from langchain_core.prompts.prompt import PromptTemplate from langchain_core.output_parsers.string import StrOutputParser from tests.unit_tests.fake.llm import FakeListLLM chain = ( Context.setter("input") | { "context": RunnablePassthrough() | Context.setter("context"), "question": RunnablePassthrough(), } | PromptTemplate.from_template("{context} {question}") | FakeListLLM(responses=["hello"]) | StrOutputParser() | { "result": RunnablePassthrough(), "context": Context.getter("context"), "input": Context.getter("input"), } ) # Use the chain output = chain.invoke("What's your name?") print(output["result"]) # Output: "hello" print(output["context"]) # Output: "What's your name?" print(output["input"]) # Output: "What's your name? """
[docs]@staticmethoddefcreate_scope(scope:str,/)->"PrefixContext":"""Create a context scope. Args: scope: The scope. Returns: The context scope. """returnPrefixContext(prefix=scope)
[docs]@staticmethoddefgetter(key:Union[str,list[str]],/)->ContextGet:"""Return a context getter. Args: key: The context getter key. """returnContextGet(key=key)
[docs]@staticmethoddefsetter(_key:Optional[str]=None,_value:Optional[SetValue]=None,/,**kwargs:SetValue,)->ContextSet:"""Return a context setter. Args: _key: The context setter key. _value: The context setter value. **kwargs: Additional context setter key-value pairs. """returnContextSet(_key,_value,prefix="",**kwargs)
[docs]classPrefixContext:"""Context for a runnable with a prefix."""prefix:str=""
[docs]def__init__(self,prefix:str=""):"""Create a prefix context. Args: prefix: The prefix. """self.prefix=prefix
[docs]defgetter(self,key:Union[str,list[str]],/)->ContextGet:"""Return a prefixed context getter. Args: key: The context getter key. """returnContextGet(key=key,prefix=self.prefix)
[docs]defsetter(self,_key:Optional[str]=None,_value:Optional[SetValue]=None,/,**kwargs:SetValue,)->ContextSet:"""Return a prefixed context setter. Args: _key: The context setter key. _value: The context setter value. **kwargs: Additional context setter key-value pairs. """returnContextSet(_key,_value,prefix=self.prefix,**kwargs)