Source code for langchain_core.runnables.history

from __future__ import annotations

import inspect
from collections.abc import Sequence
from types import GenericAlias
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Optional,
    Union,
)

from pydantic import BaseModel
from typing_extensions import override

from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.load.load import load
from langchain_core.runnables.base import Runnable, RunnableBindingBase, RunnableLambda
from langchain_core.runnables.passthrough import RunnablePassthrough
from langchain_core.runnables.utils import (
    ConfigurableFieldSpec,
    Output,
    get_unique_config_specs,
)
from langchain_core.utils.pydantic import create_model_v2

if TYPE_CHECKING:
    from langchain_core.language_models.base import LanguageModelLike
    from langchain_core.messages.base import BaseMessage
    from langchain_core.runnables.config import RunnableConfig
    from langchain_core.tracers.schemas import Run


MessagesOrDictWithMessages = Union[Sequence["BaseMessage"], dict[str, Any]]
GetSessionHistoryCallable = Callable[..., BaseChatMessageHistory]


[docs] class RunnableWithMessageHistory(RunnableBindingBase): """Runnable that manages chat message history for another Runnable. A chat message history is a sequence of messages that represent a conversation. RunnableWithMessageHistory wraps another Runnable and manages the chat message history for it; it is responsible for reading and updating the chat message history. The formats supported for the inputs and outputs of the wrapped Runnable are described below. RunnableWithMessageHistory must always be called with a config that contains the appropriate parameters for the chat message history factory. By default, the Runnable is expected to take a single configuration parameter called `session_id` which is a string. This parameter is used to create a new or look up an existing chat message history that matches the given session_id. In this case, the invocation would look like this: `with_history.invoke(..., config={"configurable": {"session_id": "bar"}})` ; e.g., ``{"configurable": {"session_id": "<SESSION_ID>"}}``. The configuration can be customized by passing in a list of ``ConfigurableFieldSpec`` objects to the ``history_factory_config`` parameter (see example below). In the examples, we will use a chat message history with an in-memory implementation to make it easy to experiment and see the results. For production use cases, you will want to use a persistent implementation of chat message history, such as ``RedisChatMessageHistory``. Parameters: get_session_history: Function that returns a new BaseChatMessageHistory. This function should either take a single positional argument `session_id` of type string and return a corresponding chat message history instance. input_messages_key: Must be specified if the base runnable accepts a dict as input. The key in the input dict that contains the messages. output_messages_key: Must be specified if the base Runnable returns a dict as output. The key in the output dict that contains the messages. history_messages_key: Must be specified if the base runnable accepts a dict as input and expects a separate key for historical messages. history_factory_config: Configure fields that should be passed to the chat history factory. See ``ConfigurableFieldSpec`` for more details. Example: Chat message history with an in-memory implementation for testing. .. code-block:: python from operator import itemgetter from typing import List from langchain_openai.chat_models import ChatOpenAI from langchain_core.chat_history import BaseChatMessageHistory from langchain_core.documents import Document from langchain_core.messages import BaseMessage, AIMessage from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from pydantic import BaseModel, Field from langchain_core.runnables import ( RunnableLambda, ConfigurableFieldSpec, RunnablePassthrough, ) from langchain_core.runnables.history import RunnableWithMessageHistory class InMemoryHistory(BaseChatMessageHistory, BaseModel): \"\"\"In memory implementation of chat message history.\"\"\" messages: List[BaseMessage] = Field(default_factory=list) def add_messages(self, messages: List[BaseMessage]) -> None: \"\"\"Add a list of messages to the store\"\"\" self.messages.extend(messages) def clear(self) -> None: self.messages = [] # Here we use a global variable to store the chat message history. # This will make it easier to inspect it to see the underlying results. store = {} def get_by_session_id(session_id: str) -> BaseChatMessageHistory: if session_id not in store: store[session_id] = InMemoryHistory() return store[session_id] history = get_by_session_id("1") history.add_message(AIMessage(content="hello")) print(store) # noqa: T201 Example where the wrapped Runnable takes a dictionary input: .. code-block:: python from typing import Optional from langchain_community.chat_models import ChatAnthropic from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_core.runnables.history import RunnableWithMessageHistory prompt = ChatPromptTemplate.from_messages([ ("system", "You're an assistant who's good at {ability}"), MessagesPlaceholder(variable_name="history"), ("human", "{question}"), ]) chain = prompt | ChatAnthropic(model="claude-2") chain_with_history = RunnableWithMessageHistory( chain, # Uses the get_by_session_id function defined in the example # above. get_by_session_id, input_messages_key="question", history_messages_key="history", ) print(chain_with_history.invoke( # noqa: T201 {"ability": "math", "question": "What does cosine mean?"}, config={"configurable": {"session_id": "foo"}} )) # Uses the store defined in the example above. print(store) # noqa: T201 print(chain_with_history.invoke( # noqa: T201 {"ability": "math", "question": "What's its inverse"}, config={"configurable": {"session_id": "foo"}} )) print(store) # noqa: T201 Example where the session factory takes two keys, user_id and conversation id): .. code-block:: python store = {} def get_session_history( user_id: str, conversation_id: str ) -> BaseChatMessageHistory: if (user_id, conversation_id) not in store: store[(user_id, conversation_id)] = InMemoryHistory() return store[(user_id, conversation_id)] prompt = ChatPromptTemplate.from_messages([ ("system", "You're an assistant who's good at {ability}"), MessagesPlaceholder(variable_name="history"), ("human", "{question}"), ]) chain = prompt | ChatAnthropic(model="claude-2") with_message_history = RunnableWithMessageHistory( chain, get_session_history=get_session_history, input_messages_key="question", history_messages_key="history", history_factory_config=[ ConfigurableFieldSpec( id="user_id", annotation=str, name="User ID", description="Unique identifier for the user.", default="", is_shared=True, ), ConfigurableFieldSpec( id="conversation_id", annotation=str, name="Conversation ID", description="Unique identifier for the conversation.", default="", is_shared=True, ), ], ) with_message_history.invoke( {"ability": "math", "question": "What does cosine mean?"}, config={"configurable": {"user_id": "123", "conversation_id": "1"}} ) """ get_session_history: GetSessionHistoryCallable input_messages_key: Optional[str] = None output_messages_key: Optional[str] = None history_messages_key: Optional[str] = None history_factory_config: Sequence[ConfigurableFieldSpec] @classmethod def get_lc_namespace(cls) -> list[str]: """Get the namespace of the langchain object.""" return ["langchain", "schema", "runnable"] def __init__( self, runnable: Union[ Runnable[ Union[MessagesOrDictWithMessages], Union[str, BaseMessage, MessagesOrDictWithMessages], ], LanguageModelLike, ], get_session_history: GetSessionHistoryCallable, *, input_messages_key: Optional[str] = None, output_messages_key: Optional[str] = None, history_messages_key: Optional[str] = None, history_factory_config: Optional[Sequence[ConfigurableFieldSpec]] = None, **kwargs: Any, ) -> None: """Initialize RunnableWithMessageHistory. Args: runnable: The base Runnable to be wrapped. Must take as input one of: 1. A sequence of BaseMessages 2. A dict with one key for all messages 3. A dict with one key for the current input string/message(s) and a separate key for historical messages. If the input key points to a string, it will be treated as a HumanMessage in history. Must return as output one of: 1. A string which can be treated as an AIMessage 2. A BaseMessage or sequence of BaseMessages 3. A dict with a key for a BaseMessage or sequence of BaseMessages get_session_history: Function that returns a new BaseChatMessageHistory. This function should either take a single positional argument `session_id` of type string and return a corresponding chat message history instance. .. code-block:: python def get_session_history( session_id: str, *, user_id: Optional[str]=None ) -> BaseChatMessageHistory: ... Or it should take keyword arguments that match the keys of `session_history_config_specs` and return a corresponding chat message history instance. .. code-block:: python def get_session_history( *, user_id: str, thread_id: str, ) -> BaseChatMessageHistory: ... input_messages_key: Must be specified if the base runnable accepts a dict as input. Default is None. output_messages_key: Must be specified if the base runnable returns a dict as output. Default is None. history_messages_key: Must be specified if the base runnable accepts a dict as input and expects a separate key for historical messages. history_factory_config: Configure fields that should be passed to the chat history factory. See ``ConfigurableFieldSpec`` for more details. Specifying these allows you to pass multiple config keys into the get_session_history factory. **kwargs: Arbitrary additional kwargs to pass to parent class ``RunnableBindingBase`` init. """ history_chain: Runnable = RunnableLambda( self._enter_history, self._aenter_history ).with_config(run_name="load_history") messages_key = history_messages_key or input_messages_key if messages_key: history_chain = RunnablePassthrough.assign( **{messages_key: history_chain} ).with_config(run_name="insert_history") runnable_sync: Runnable = runnable.with_listeners(on_end=self._exit_history) runnable_async: Runnable = runnable.with_alisteners(on_end=self._aexit_history) def _call_runnable_sync(_input: Any) -> Runnable: return runnable_sync async def _call_runnable_async(_input: Any) -> Runnable: return runnable_async bound: Runnable = ( history_chain | RunnableLambda( _call_runnable_sync, _call_runnable_async, ).with_config(run_name="check_sync_or_async") ).with_config(run_name="RunnableWithMessageHistory") if history_factory_config: _config_specs = history_factory_config else: # If not provided, then we'll use the default session_id field _config_specs = [ ConfigurableFieldSpec( id="session_id", annotation=str, name="Session ID", description="Unique identifier for a session.", default="", is_shared=True, ), ] super().__init__( get_session_history=get_session_history, input_messages_key=input_messages_key, output_messages_key=output_messages_key, bound=bound, history_messages_key=history_messages_key, history_factory_config=_config_specs, **kwargs, ) self._history_chain = history_chain @property def config_specs(self) -> list[ConfigurableFieldSpec]: """Get the configuration specs for the RunnableWithMessageHistory.""" return get_unique_config_specs( super().config_specs + list(self.history_factory_config) ) def get_input_schema( self, config: Optional[RunnableConfig] = None ) -> type[BaseModel]: from langchain_core.messages import BaseMessage fields: dict = {} if self.input_messages_key and self.history_messages_key: fields[self.input_messages_key] = ( Union[str, BaseMessage, Sequence[BaseMessage]], ..., ) elif self.input_messages_key: fields[self.input_messages_key] = (Sequence[BaseMessage], ...) else: return create_model_v2( "RunnableWithChatHistoryInput", module_name=self.__class__.__module__, root=(Sequence[BaseMessage], ...), ) return create_model_v2( # type: ignore[call-overload] "RunnableWithChatHistoryInput", field_definitions=fields, module_name=self.__class__.__module__, ) @property @override def OutputType(self) -> type[Output]: output_type = self._history_chain.OutputType return output_type def get_output_schema( self, config: Optional[RunnableConfig] = None ) -> type[BaseModel]: """Get a pydantic model that can be used to validate output to the Runnable. Runnables that leverage the configurable_fields and configurable_alternatives methods will have a dynamic output schema that depends on which configuration the Runnable is invoked with. This method allows to get an output schema for a specific configuration. Args: config: A config to use when generating the schema. Returns: A pydantic model that can be used to validate output. """ root_type = self.OutputType if ( inspect.isclass(root_type) and not isinstance(root_type, GenericAlias) and issubclass(root_type, BaseModel) ): return root_type return create_model_v2( "RunnableWithChatHistoryOutput", root=root_type, module_name=self.__class__.__module__, ) def _is_not_async(self, *args: Sequence[Any], **kwargs: dict[str, Any]) -> bool: return False async def _is_async(self, *args: Sequence[Any], **kwargs: dict[str, Any]) -> bool: return True def _get_input_messages( self, input_val: Union[str, BaseMessage, Sequence[BaseMessage], dict] ) -> list[BaseMessage]: from langchain_core.messages import BaseMessage # If dictionary, try to pluck the single key representing messages if isinstance(input_val, dict): if self.input_messages_key: key = self.input_messages_key elif len(input_val) == 1: key = list(input_val.keys())[0] else: key = "input" input_val = input_val[key] # If value is a string, convert to a human message if isinstance(input_val, str): from langchain_core.messages import HumanMessage return [HumanMessage(content=input_val)] # If value is a single message, convert to a list elif isinstance(input_val, BaseMessage): return [input_val] # If value is a list or tuple... elif isinstance(input_val, (list, tuple)): # Handle empty case if len(input_val) == 0: return list(input_val) # If is a list of list, then return the first value # This occurs for chat models - since we batch inputs if isinstance(input_val[0], list): if len(input_val) != 1: msg = f"Expected a single list of messages. Got {input_val}." raise ValueError(msg) return input_val[0] return list(input_val) else: msg = ( f"Expected str, BaseMessage, List[BaseMessage], or Tuple[BaseMessage]. " f"Got {input_val}." ) raise ValueError(msg) def _get_output_messages( self, output_val: Union[str, BaseMessage, Sequence[BaseMessage], dict] ) -> list[BaseMessage]: from langchain_core.messages import BaseMessage # If dictionary, try to pluck the single key representing messages if isinstance(output_val, dict): if self.output_messages_key: key = self.output_messages_key elif len(output_val) == 1: key = list(output_val.keys())[0] else: key = "output" # If you are wrapping a chat model directly # The output is actually this weird generations object if key not in output_val and "generations" in output_val: output_val = output_val["generations"][0][0]["message"] else: output_val = output_val[key] if isinstance(output_val, str): from langchain_core.messages import AIMessage return [AIMessage(content=output_val)] # If value is a single message, convert to a list elif isinstance(output_val, BaseMessage): return [output_val] elif isinstance(output_val, (list, tuple)): return list(output_val) else: msg = ( f"Expected str, BaseMessage, List[BaseMessage], or Tuple[BaseMessage]. " f"Got {output_val}." ) raise ValueError(msg) def _enter_history(self, input: Any, config: RunnableConfig) -> list[BaseMessage]: hist: BaseChatMessageHistory = config["configurable"]["message_history"] messages = hist.messages.copy() if not self.history_messages_key: # return all messages input_val = ( input if not self.input_messages_key else input[self.input_messages_key] ) messages += self._get_input_messages(input_val) return messages async def _aenter_history( self, input: dict[str, Any], config: RunnableConfig ) -> list[BaseMessage]: hist: BaseChatMessageHistory = config["configurable"]["message_history"] messages = (await hist.aget_messages()).copy() if not self.history_messages_key: # return all messages input_val = ( input if not self.input_messages_key else input[self.input_messages_key] ) messages += self._get_input_messages(input_val) return messages def _exit_history(self, run: Run, config: RunnableConfig) -> None: hist: BaseChatMessageHistory = config["configurable"]["message_history"] # Get the input messages inputs = load(run.inputs) input_messages = self._get_input_messages(inputs) # If historic messages were prepended to the input messages, remove them to # avoid adding duplicate messages to history. if not self.history_messages_key: historic_messages = config["configurable"]["message_history"].messages input_messages = input_messages[len(historic_messages) :] # Get the output messages output_val = load(run.outputs) output_messages = self._get_output_messages(output_val) hist.add_messages(input_messages + output_messages) async def _aexit_history(self, run: Run, config: RunnableConfig) -> None: hist: BaseChatMessageHistory = config["configurable"]["message_history"] # Get the input messages inputs = load(run.inputs) input_messages = self._get_input_messages(inputs) # If historic messages were prepended to the input messages, remove them to # avoid adding duplicate messages to history. if not self.history_messages_key: historic_messages = await hist.aget_messages() input_messages = input_messages[len(historic_messages) :] # Get the output messages output_val = load(run.outputs) output_messages = self._get_output_messages(output_val) await hist.aadd_messages(input_messages + output_messages) def _merge_configs(self, *configs: Optional[RunnableConfig]) -> RunnableConfig: config = super()._merge_configs(*configs) expected_keys = [field_spec.id for field_spec in self.history_factory_config] configurable = config.get("configurable", {}) missing_keys = set(expected_keys) - set(configurable.keys()) parameter_names = _get_parameter_names(self.get_session_history) if missing_keys and parameter_names: example_input = {self.input_messages_key: "foo"} example_configurable = { missing_key: "[your-value-here]" for missing_key in missing_keys } example_config = {"configurable": example_configurable} msg = ( f"Missing keys {sorted(missing_keys)} in config['configurable'] " f"Expected keys are {sorted(expected_keys)}." f"When using via .invoke() or .stream(), pass in a config; " f"e.g., chain.invoke({example_input}, {example_config})" ) raise ValueError(msg) if len(expected_keys) == 1: if parameter_names: # If arity = 1, then invoke function by positional arguments message_history = self.get_session_history( configurable[expected_keys[0]] ) else: if not config: config["configurable"] = {} message_history = self.get_session_history() else: # otherwise verify that names of keys patch and invoke by named arguments if set(expected_keys) != set(parameter_names): msg = ( f"Expected keys {sorted(expected_keys)} do not match parameter " f"names {sorted(parameter_names)} of get_session_history." ) raise ValueError(msg) message_history = self.get_session_history( **{key: configurable[key] for key in expected_keys} ) config["configurable"]["message_history"] = message_history return config
def _get_parameter_names(callable_: GetSessionHistoryCallable) -> list[str]: """Get the parameter names of the callable.""" sig = inspect.signature(callable_) return list(sig.parameters.keys())