import asyncio
import threading
from collections import defaultdict
from collections.abc import Awaitable, Mapping, Sequence
from functools import partial
from itertools import groupby
from typing import (
Any,
Callable,
Optional,
TypeVar,
Union,
)
from pydantic import ConfigDict
from langchain_core._api.beta_decorator import beta
from langchain_core.runnables.base import (
Runnable,
RunnableSerializable,
coerce_to_runnable,
)
from langchain_core.runnables.config import RunnableConfig, ensure_config, patch_config
from langchain_core.runnables.utils import ConfigurableFieldSpec, Input, Output
T = TypeVar("T")
Values = dict[Union[asyncio.Event, threading.Event], Any]
CONTEXT_CONFIG_PREFIX = "__context__/"
CONTEXT_CONFIG_SUFFIX_GET = "/get"
CONTEXT_CONFIG_SUFFIX_SET = "/set"
async def _asetter(done: asyncio.Event, values: Values, value: T) -> T:
values[done] = value
done.set()
return value
async def _agetter(done: asyncio.Event, values: Values) -> Any:
await done.wait()
return values[done]
def _setter(done: threading.Event, values: Values, value: T) -> T:
values[done] = value
done.set()
return value
def _getter(done: threading.Event, values: Values) -> Any:
done.wait()
return values[done]
def _key_from_id(id_: str) -> str:
wout_prefix = id_.split(CONTEXT_CONFIG_PREFIX, maxsplit=1)[1]
if wout_prefix.endswith(CONTEXT_CONFIG_SUFFIX_GET):
return wout_prefix[: -len(CONTEXT_CONFIG_SUFFIX_GET)]
elif wout_prefix.endswith(CONTEXT_CONFIG_SUFFIX_SET):
return wout_prefix[: -len(CONTEXT_CONFIG_SUFFIX_SET)]
else:
msg = f"Invalid context config id {id_}"
raise ValueError(msg)
def _config_with_context(
config: RunnableConfig,
steps: list[Runnable],
setter: Callable,
getter: Callable,
event_cls: Union[type[threading.Event], type[asyncio.Event]],
) -> RunnableConfig:
if any(k.startswith(CONTEXT_CONFIG_PREFIX) for k in config.get("configurable", {})):
return config
context_specs = [
(spec, i)
for i, step in enumerate(steps)
for spec in step.config_specs
if spec.id.startswith(CONTEXT_CONFIG_PREFIX)
]
grouped_by_key = {
key: list(group)
for key, group in groupby(
sorted(context_specs, key=lambda s: s[0].id),
key=lambda s: _key_from_id(s[0].id),
)
}
deps_by_key = {
key: {
_key_from_id(dep) for spec in group for dep in (spec[0].dependencies or [])
}
for key, group in grouped_by_key.items()
}
values: Values = {}
events: defaultdict[str, Union[asyncio.Event, threading.Event]] = defaultdict(
event_cls
)
context_funcs: dict[str, Callable[[], Any]] = {}
for key, group in grouped_by_key.items():
getters = [s for s in group if s[0].id.endswith(CONTEXT_CONFIG_SUFFIX_GET)]
setters = [s for s in group if s[0].id.endswith(CONTEXT_CONFIG_SUFFIX_SET)]
for dep in deps_by_key[key]:
if key in deps_by_key[dep]:
msg = f"Deadlock detected between context keys {key} and {dep}"
raise ValueError(msg)
if len(setters) != 1:
msg = f"Expected exactly one setter for context key {key}"
raise ValueError(msg)
setter_idx = setters[0][1]
if any(getter_idx < setter_idx for _, getter_idx in getters):
msg = f"Context setter for key {key} must be defined after all getters."
raise ValueError(msg)
if getters:
context_funcs[getters[0][0].id] = partial(getter, events[key], values)
context_funcs[setters[0][0].id] = partial(setter, events[key], values)
return patch_config(config, configurable=context_funcs)
[docs]
def aconfig_with_context(
config: RunnableConfig,
steps: list[Runnable],
) -> RunnableConfig:
"""Asynchronously patch a runnable config with context getters and setters.
Args:
config: The runnable config.
steps: The runnable steps.
Returns:
The patched runnable config.
"""
return _config_with_context(config, steps, _asetter, _agetter, asyncio.Event)
[docs]
def config_with_context(
config: RunnableConfig,
steps: list[Runnable],
) -> RunnableConfig:
"""Patch a runnable config with context getters and setters.
Args:
config: The runnable config.
steps: The runnable steps.
Returns:
The patched runnable config.
"""
return _config_with_context(config, steps, _setter, _getter, threading.Event)
[docs]
@beta()
class ContextGet(RunnableSerializable):
"""Get a context value."""
prefix: str = ""
key: Union[str, list[str]]
def __str__(self) -> str:
return f"ContextGet({_print_keys(self.key)})"
@property
def ids(self) -> list[str]:
prefix = self.prefix + "/" if self.prefix else ""
keys = self.key if isinstance(self.key, list) else [self.key]
return [
f"{CONTEXT_CONFIG_PREFIX}{prefix}{k}{CONTEXT_CONFIG_SUFFIX_GET}"
for k in keys
]
@property
def config_specs(self) -> list[ConfigurableFieldSpec]:
return super().config_specs + [
ConfigurableFieldSpec(
id=id_,
annotation=Callable[[], Any],
)
for id_ in self.ids
]
[docs]
def invoke(
self, input: Any, config: Optional[RunnableConfig] = None, **kwargs: Any
) -> Any:
config = ensure_config(config)
configurable = config.get("configurable", {})
if isinstance(self.key, list):
return {key: configurable[id_]() for key, id_ in zip(self.key, self.ids)}
else:
return configurable[self.ids[0]]()
[docs]
async def ainvoke(
self, input: Any, config: Optional[RunnableConfig] = None, **kwargs: Any
) -> Any:
config = ensure_config(config)
configurable = config.get("configurable", {})
if isinstance(self.key, list):
values = await asyncio.gather(*(configurable[id_]() for id_ in self.ids))
return dict(zip(self.key, values))
else:
return await configurable[self.ids[0]]()
SetValue = Union[
Runnable[Input, Output],
Callable[[Input], Output],
Callable[[Input], Awaitable[Output]],
Any,
]
def _coerce_set_value(value: SetValue) -> Runnable[Input, Output]:
if not isinstance(value, Runnable) and not callable(value):
return coerce_to_runnable(lambda _: value)
return coerce_to_runnable(value)
[docs]
@beta()
class ContextSet(RunnableSerializable):
"""Set a context value."""
prefix: str = ""
keys: Mapping[str, Optional[Runnable]]
model_config = ConfigDict(
arbitrary_types_allowed=True,
)
def __init__(
self,
key: Optional[str] = None,
value: Optional[SetValue] = None,
prefix: str = "",
**kwargs: SetValue,
):
if key is not None:
kwargs[key] = value
super().__init__( # type: ignore[call-arg]
keys={
k: _coerce_set_value(v) if v is not None else None
for k, v in kwargs.items()
},
prefix=prefix,
)
def __str__(self) -> str:
return f"ContextSet({_print_keys(list(self.keys.keys()))})"
@property
def ids(self) -> list[str]:
prefix = self.prefix + "/" if self.prefix else ""
return [
f"{CONTEXT_CONFIG_PREFIX}{prefix}{key}{CONTEXT_CONFIG_SUFFIX_SET}"
for key in self.keys
]
@property
def config_specs(self) -> list[ConfigurableFieldSpec]:
mapper_config_specs = [
s
for mapper in self.keys.values()
if mapper is not None
for s in mapper.config_specs
]
for spec in mapper_config_specs:
if spec.id.endswith(CONTEXT_CONFIG_SUFFIX_GET):
getter_key = spec.id.split("/")[1]
if getter_key in self.keys:
msg = f"Circular reference in context setter for key {getter_key}"
raise ValueError(msg)
return super().config_specs + [
ConfigurableFieldSpec(
id=id_,
annotation=Callable[[], Any],
)
for id_ in self.ids
]
[docs]
def invoke(
self, input: Any, config: Optional[RunnableConfig] = None, **kwargs: Any
) -> Any:
config = ensure_config(config)
configurable = config.get("configurable", {})
for id_, mapper in zip(self.ids, self.keys.values()):
if mapper is not None:
configurable[id_](mapper.invoke(input, config))
else:
configurable[id_](input)
return input
[docs]
async def ainvoke(
self, input: Any, config: Optional[RunnableConfig] = None, **kwargs: Any
) -> Any:
config = ensure_config(config)
configurable = config.get("configurable", {})
for id_, mapper in zip(self.ids, self.keys.values()):
if mapper is not None:
await configurable[id_](await mapper.ainvoke(input, config))
else:
await configurable[id_](input)
return input
[docs]
class Context:
"""
Context for a runnable.
The `Context` class provides methods for creating context scopes,
getters, and setters within a runnable. It allows for managing
and accessing contextual information throughout the execution
of a program.
Example:
.. code-block:: python
from langchain_core.beta.runnables.context import Context
from langchain_core.runnables.passthrough import RunnablePassthrough
from langchain_core.prompts.prompt import PromptTemplate
from langchain_core.output_parsers.string import StrOutputParser
from tests.unit_tests.fake.llm import FakeListLLM
chain = (
Context.setter("input")
| {
"context": RunnablePassthrough()
| Context.setter("context"),
"question": RunnablePassthrough(),
}
| PromptTemplate.from_template("{context} {question}")
| FakeListLLM(responses=["hello"])
| StrOutputParser()
| {
"result": RunnablePassthrough(),
"context": Context.getter("context"),
"input": Context.getter("input"),
}
)
# Use the chain
output = chain.invoke("What's your name?")
print(output["result"]) # Output: "hello"
print(output["context"]) # Output: "What's your name?"
print(output["input"]) # Output: "What's your name?
"""
[docs]
@staticmethod
def create_scope(scope: str, /) -> "PrefixContext":
"""Create a context scope.
Args:
scope: The scope.
Returns:
The context scope.
"""
return PrefixContext(prefix=scope)
[docs]
@staticmethod
def getter(key: Union[str, list[str]], /) -> ContextGet:
return ContextGet(key=key)
[docs]
@staticmethod
def setter(
_key: Optional[str] = None,
_value: Optional[SetValue] = None,
/,
**kwargs: SetValue,
) -> ContextSet:
return ContextSet(_key, _value, prefix="", **kwargs)
[docs]
class PrefixContext:
"""Context for a runnable with a prefix."""
prefix: str = ""
[docs]
def __init__(self, prefix: str = ""):
self.prefix = prefix
[docs]
def getter(self, key: Union[str, list[str]], /) -> ContextGet:
return ContextGet(key=key, prefix=self.prefix)
[docs]
def setter(
self,
_key: Optional[str] = None,
_value: Optional[SetValue] = None,
/,
**kwargs: SetValue,
) -> ContextSet:
return ContextSet(_key, _value, prefix=self.prefix, **kwargs)
def _print_keys(keys: Union[str, Sequence[str]]) -> str:
if isinstance(keys, str):
return f"'{keys}'"
else:
return ", ".join(f"'{k}'" for k in keys)