Source code for langchain_community.callbacks.promptlayer_callback
"""Callback handler for promptlayer."""from__future__importannotationsimportdatetimefromtypingimportTYPE_CHECKING,Any,Callable,Dict,List,Optional,TuplefromuuidimportUUIDfromlangchain_core.callbacksimportBaseCallbackHandlerfromlangchain_core.messagesimport(AIMessage,BaseMessage,ChatMessage,HumanMessage,SystemMessage,)fromlangchain_core.outputsimport(ChatGeneration,LLMResult,)ifTYPE_CHECKING:importpromptlayerdef_lazy_import_promptlayer()->promptlayer:"""Lazy import promptlayer to avoid circular imports."""try:importpromptlayerexceptImportError:raiseImportError("The PromptLayerCallbackHandler requires the promptlayer package. "" Please install it with `pip install promptlayer`.")returnpromptlayer
[docs]classPromptLayerCallbackHandler(BaseCallbackHandler):"""Callback handler for promptlayer."""
[docs]def__init__(self,pl_id_callback:Optional[Callable[...,Any]]=None,pl_tags:Optional[List[str]]=None,)->None:"""Initialize the PromptLayerCallbackHandler."""_lazy_import_promptlayer()self.pl_id_callback=pl_id_callbackself.pl_tags=pl_tagsor[]self.runs:Dict[UUID,Dict[str,Any]]={}
def_convert_message_to_dict(self,message:BaseMessage)->Dict[str,Any]:ifisinstance(message,HumanMessage):message_dict={"role":"user","content":message.content}elifisinstance(message,AIMessage):message_dict={"role":"assistant","content":message.content}elifisinstance(message,SystemMessage):message_dict={"role":"system","content":message.content}elifisinstance(message,ChatMessage):message_dict={"role":message.role,"content":message.content}else:raiseValueError(f"Got unknown type {message}")if"name"inmessage.additional_kwargs:message_dict["name"]=message.additional_kwargs["name"]returnmessage_dictdef_create_message_dicts(self,messages:List[BaseMessage])->Tuple[List[Dict[str,Any]],Dict[str,Any]]:params:Dict[str,Any]={}message_dicts=[self._convert_message_to_dict(m)forminmessages]returnmessage_dicts,params