Source code for langchain_community.chat_message_histories.zep_cloud

from __future__ import annotations

import logging
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence

from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.messages import (
    AIMessage,
    BaseMessage,
    HumanMessage,
)

if TYPE_CHECKING:
    from zep_cloud import (
        Memory,
        MemoryGetRequestMemoryType,
        MemorySearchResult,
        Message,
        NotFoundError,
        RoleType,
        SearchScope,
        SearchType,
    )

logger = logging.getLogger(__name__)


[docs] def condense_zep_memory_into_human_message(zep_memory: Memory) -> BaseMessage: """Condense Zep memory into a human message. Args: zep_memory: The Zep memory object. Returns: BaseMessage: The human message. """ prompt = "" if zep_memory.facts: prompt = "\n".join(zep_memory.facts) if zep_memory.summary and zep_memory.summary.content: prompt += "\n" + zep_memory.summary.content for msg in zep_memory.messages or []: prompt += f"\n{msg.role or msg.role_type}: {msg.content}" return HumanMessage(content=prompt)
[docs] def get_zep_message_role_type(role: str) -> RoleType: """Get the Zep role type from the role string. Args: role: The role string. One of "human", "ai", "system", "function", "tool". Returns: RoleType: The Zep role type. One of "user", "assistant", "system", "function", "tool". """ if role == "human": return "user" elif role == "ai": return "assistant" elif role == "system": return "system" elif role == "function": return "function" elif role == "tool": return "tool" else: return "system"
[docs] class ZepCloudChatMessageHistory(BaseChatMessageHistory): """Chat message history that uses Zep Cloud as a backend. Recommended usage:: # Set up Zep Chat History zep_chat_history = ZepChatMessageHistory( session_id=session_id, api_key=<your_api_key>, ) # Use a standard ConversationBufferMemory to encapsulate the Zep chat history memory = ConversationBufferMemory( memory_key="chat_history", chat_memory=zep_chat_history ) Zep - Recall, understand, and extract data from chat histories. Power personalized AI experiences. Zep is a long-term memory service for AI Assistant apps. With Zep, you can provide AI assistants with the ability to recall past conversations, no matter how distant, while also reducing hallucinations, latency, and cost. see Zep Cloud Docs: https://help.getzep.com This class is a thin wrapper around the zep-python package. Additional Zep functionality is exposed via the `zep_summary`, `zep_messages` and `zep_facts` properties. For more information on the zep-python package, see: https://github.com/getzep/zep-python """
[docs] def __init__( self, session_id: str, api_key: str, *, memory_type: Optional[MemoryGetRequestMemoryType] = None, lastn: Optional[int] = None, ai_prefix: Optional[str] = None, human_prefix: Optional[str] = None, summary_instruction: Optional[str] = None, ) -> None: try: from zep_cloud.client import AsyncZep, Zep except ImportError: raise ImportError( "Could not import zep-cloud package. " "Please install it with `pip install zep-cloud`." ) self.zep_client = Zep(api_key=api_key) self.zep_client_async = AsyncZep(api_key=api_key) self.session_id = session_id self.memory_type = memory_type or "perpetual" self.lastn = lastn self.ai_prefix = ai_prefix or "ai" self.human_prefix = human_prefix or "human" self.summary_instruction = summary_instruction
@property def messages(self) -> List[BaseMessage]: # type: ignore """Retrieve messages from Zep memory""" zep_memory: Optional[Memory] = self._get_memory() if not zep_memory: return [] return [condense_zep_memory_into_human_message(zep_memory)] @property def zep_messages(self) -> List[Message]: """Retrieve summary from Zep memory""" zep_memory: Optional[Memory] = self._get_memory() if not zep_memory: return [] return zep_memory.messages or [] @property def zep_summary(self) -> Optional[str]: """Retrieve summary from Zep memory""" zep_memory: Optional[Memory] = self._get_memory() if not zep_memory or not zep_memory.summary: return None return zep_memory.summary.content @property def zep_facts(self) -> Optional[List[str]]: """Retrieve conversation facts from Zep memory""" if self.memory_type != "perpetual": return None zep_memory: Optional[Memory] = self._get_memory() if not zep_memory or not zep_memory.facts: return None return zep_memory.facts def _get_memory(self) -> Optional[Memory]: """Retrieve memory from Zep""" from zep_cloud import NotFoundError try: zep_memory: Memory = self.zep_client.memory.get( self.session_id, memory_type=self.memory_type, lastn=self.lastn ) except NotFoundError: logger.warning( f"Session {self.session_id} not found in Zep. Returning None" ) return None return zep_memory
[docs] def add_user_message( # type: ignore[override] self, message: str, metadata: Optional[Dict[str, Any]] = None ) -> None: """Convenience method for adding a human message string to the store. Args: message: The string contents of a human message. metadata: Optional metadata to attach to the message. """ self.add_message(HumanMessage(content=message), metadata=metadata)
[docs] def add_ai_message( # type: ignore[override] self, message: str, metadata: Optional[Dict[str, Any]] = None ) -> None: """Convenience method for adding an AI message string to the store. Args: message: The string contents of an AI message. metadata: Optional metadata to attach to the message. """ self.add_message(AIMessage(content=message), metadata=metadata)
[docs] def add_message( self, message: BaseMessage, metadata: Optional[Dict[str, Any]] = None ) -> None: """Append the message to the Zep memory history""" from zep_cloud import Message self.zep_client.memory.add( self.session_id, messages=[ Message( content=str(message.content), role=message.type, role_type=get_zep_message_role_type(message.type), metadata=metadata, ) ], )
[docs] def add_messages(self, messages: Sequence[BaseMessage]) -> None: """Append the messages to the Zep memory history""" from zep_cloud import Message zep_messages = [ Message( content=str(message.content), role=message.type, role_type=get_zep_message_role_type(message.type), metadata=message.additional_kwargs.get("metadata", None), ) for message in messages ] self.zep_client.memory.add(self.session_id, messages=zep_messages)
[docs] async def aadd_messages(self, messages: Sequence[BaseMessage]) -> None: """Append the messages to the Zep memory history asynchronously""" from zep_cloud import Message zep_messages = [ Message( content=str(message.content), role=message.type, role_type=get_zep_message_role_type(message.type), metadata=message.additional_kwargs.get("metadata", None), ) for message in messages ] await self.zep_client_async.memory.add(self.session_id, messages=zep_messages)
[docs] def search( self, query: str, metadata: Optional[Dict] = None, search_scope: SearchScope = "messages", search_type: SearchType = "similarity", mmr_lambda: Optional[float] = None, limit: Optional[int] = None, ) -> List[MemorySearchResult]: """Search Zep memory for messages matching the query""" return self.zep_client.memory.search( self.session_id, text=query, metadata=metadata, search_scope=search_scope, search_type=search_type, mmr_lambda=mmr_lambda, limit=limit, )
[docs] def clear(self) -> None: """Clear session memory from Zep. Note that Zep is long-term storage for memory and this is not advised unless you have specific data retention requirements. """ try: self.zep_client.memory.delete(self.session_id) except NotFoundError: logger.warning( f"Session {self.session_id} not found in Zep. Skipping delete." )
[docs] async def aclear(self) -> None: """Clear session memory from Zep asynchronously. Note that Zep is long-term storage for memory and this is not advised unless you have specific data retention requirements. """ try: await self.zep_client_async.memory.delete(self.session_id) except NotFoundError: logger.warning( f"Session {self.session_id} not found in Zep. Skipping delete." )