[docs]defidentify(user_id:str,user_props:Any=None)->UserContextManager:"""Builds an LLMonitor UserContextManager Parameters: - `user_id`: The user id. - `user_props`: The user properties. Returns: A context manager that sets the user context. """returnUserContextManager(user_id,user_props)
def_serialize(obj:Any)->Union[Dict[str,Any],List[Any],Any]:ifhasattr(obj,"to_json"):returnobj.to_json()ifisinstance(obj,dict):return{key:_serialize(value)forkey,valueinobj.items()}ifisinstance(obj,list):return[_serialize(element)forelementinobj]returnobjdef_parse_input(raw_input:Any)->Any:ifnotraw_input:returnNone# if it's an array of 1, just parse the first elementifisinstance(raw_input,list)andlen(raw_input)==1:return_parse_input(raw_input[0])ifnotisinstance(raw_input,dict):return_serialize(raw_input)input_value=raw_input.get("input")inputs_value=raw_input.get("inputs")question_value=raw_input.get("question")query_value=raw_input.get("query")ifinput_value:returninput_valueifinputs_value:returninputs_valueifquestion_value:returnquestion_valueifquery_value:returnquery_valuereturn_serialize(raw_input)def_parse_output(raw_output:dict)->Any:ifnotraw_output:returnNoneifnotisinstance(raw_output,dict):return_serialize(raw_output)text_value=raw_output.get("text")output_value=raw_output.get("output")output_text_value=raw_output.get("output_text")answer_value=raw_output.get("answer")result_value=raw_output.get("result")iftext_value:returntext_valueifanswer_value:returnanswer_valueifoutput_value:returnoutput_valueifoutput_text_value:returnoutput_text_valueifresult_value:returnresult_valuereturn_serialize(raw_output)def_parse_lc_role(role:str,)->str:ifrole=="human":return"user"else:returnroledef_get_user_id(metadata:Any)->Any:ifuser_ctx.get()isnotNone:returnuser_ctx.get()metadata=metadataor{}user_id=metadata.get("user_id")ifuser_idisNone:user_id=metadata.get("userId")# legacy, to delete in the futurereturnuser_iddef_get_user_props(metadata:Any)->Any:ifuser_props_ctx.get()isnotNone:returnuser_props_ctx.get()metadata=metadataor{}returnmetadata.get("user_props",None)def_parse_lc_message(message:BaseMessage)->Dict[str,Any]:keys=["function_call","tool_calls","tool_call_id","name"]parsed={"text":message.content,"role":_parse_lc_role(message.type)}parsed.update({key:cast(Any,message.additional_kwargs.get(key))forkeyinkeysifmessage.additional_kwargs.get(key)isnotNone})returnparseddef_parse_lc_messages(messages:Union[List[BaseMessage],Any])->List[Dict[str,Any]]:return[_parse_lc_message(message)formessageinmessages]
[docs]classLLMonitorCallbackHandler(BaseCallbackHandler):"""Callback Handler for LLMonitor`. #### Parameters: - `app_id`: The app id of the app you want to report to. Defaults to `None`, which means that `LLMONITOR_APP_ID` will be used. - `api_url`: The url of the LLMonitor API. Defaults to `None`, which means that either `LLMONITOR_API_URL` environment variable or `https://app.llmonitor.com` will be used. #### Raises: - `ValueError`: if `app_id` is not provided either as an argument or as an environment variable. - `ConnectionError`: if the connection to the API fails. #### Example: ```python from langchain_community.llms import OpenAI from langchain_community.callbacks import LLMonitorCallbackHandler llmonitor_callback = LLMonitorCallbackHandler() llm = OpenAI(callbacks=[llmonitor_callback], metadata={"userId": "user-123"}) llm.invoke("Hello, how are you?") ``` """__api_url:str__app_id:str__verbose:bool__llmonitor_version:str__has_valid_config:bool
[docs]def__init__(self,app_id:Union[str,None]=None,api_url:Union[str,None]=None,verbose:bool=False,)->None:super().__init__()self.__has_valid_config=Truetry:importllmonitorself.__llmonitor_version=importlib.metadata.version("llmonitor")self.__track_event=llmonitor.track_eventexceptImportError:logger.warning("""[LLMonitor] To use the LLMonitor callback handler you need to have the `llmonitor` Python package installed. Please install it with `pip install llmonitor`""")self.__has_valid_config=Falsereturnifparse(self.__llmonitor_version)<parse("0.0.32"):logger.warning(f"""[LLMonitor] The installed `llmonitor` version is {self.__llmonitor_version} but `LLMonitorCallbackHandler` requires at least version 0.0.32 upgrade `llmonitor` with `pip install --upgrade llmonitor`""")self.__has_valid_config=Falseself.__has_valid_config=Trueself.__api_url=api_urloros.getenv("LLMONITOR_API_URL")orDEFAULT_API_URLself.__verbose=verboseorbool(os.getenv("LLMONITOR_VERBOSE"))_app_id=app_idoros.getenv("LLMONITOR_APP_ID")if_app_idisNone:logger.warning("""[LLMonitor] app_id must be provided either as an argument or as an environment variable""")self.__has_valid_config=Falseelse:self.__app_id=_app_idifself.__has_valid_configisFalse:returnNonetry:res=requests.get(f"{self.__api_url}/api/app/{self.__app_id}")ifnotres.ok:raiseConnectionError()exceptException:logger.warning(f"""[LLMonitor] Could not connect to the LLMonitor API at {self.__api_url}""")
[docs]defon_llm_start(self,serialized:Dict[str,Any],prompts:List[str],*,run_id:UUID,parent_run_id:Union[UUID,None]=None,tags:Union[List[str],None]=None,metadata:Union[Dict[str,Any],None]=None,**kwargs:Any,)->None:ifself.__has_valid_configisFalse:returntry:user_id=_get_user_id(metadata)user_props=_get_user_props(metadata)params=kwargs.get("invocation_params",{})params.update(serialized.get("kwargs",{}))# Sometimes, for example with ChatAnthropic, `invocation_params` is emptyname=(params.get("model")orparams.get("model_name")orparams.get("model_id"))ifnotnameand"anthropic"inparams.get("_type"):name="claude-2"extra={param:params.get(param)forparaminPARAMS_TO_CAPTUREifparams.get(param)isnotNone}input=_parse_input(prompts)self.__track_event("llm","start",user_id=user_id,run_id=str(run_id),parent_run_id=str(parent_run_id)ifparent_run_idelseNone,name=name,input=input,tags=tags,extra=extra,metadata=metadata,user_props=user_props,app_id=self.__app_id,)exceptExceptionase:warnings.warn(f"[LLMonitor] An error occurred in on_llm_start: {e}")
[docs]defon_chat_model_start(self,serialized:Dict[str,Any],messages:List[List[BaseMessage]],*,run_id:UUID,parent_run_id:Union[UUID,None]=None,tags:Union[List[str],None]=None,metadata:Union[Dict[str,Any],None]=None,**kwargs:Any,)->Any:ifself.__has_valid_configisFalse:returntry:user_id=_get_user_id(metadata)user_props=_get_user_props(metadata)params=kwargs.get("invocation_params",{})params.update(serialized.get("kwargs",{}))# Sometimes, for example with ChatAnthropic, `invocation_params` is emptyname=(params.get("model")orparams.get("model_name")orparams.get("model_id"))ifnotnameand"anthropic"inparams.get("_type"):name="claude-2"extra={param:params.get(param)forparaminPARAMS_TO_CAPTUREifparams.get(param)isnotNone}input=_parse_lc_messages(messages[0])self.__track_event("llm","start",user_id=user_id,run_id=str(run_id),parent_run_id=str(parent_run_id)ifparent_run_idelseNone,name=name,input=input,tags=tags,extra=extra,metadata=metadata,user_props=user_props,app_id=self.__app_id,)exceptExceptionase:logger.error(f"[LLMonitor] An error occurred in on_chat_model_start: {e}")
[docs]defon_llm_end(self,response:LLMResult,*,run_id:UUID,parent_run_id:Union[UUID,None]=None,**kwargs:Any,)->None:ifself.__has_valid_configisFalse:returntry:token_usage=(response.llm_outputor{}).get("token_usage",{})parsed_output:Any=[_parse_lc_message(generation.message)ifhasattr(generation,"message")elsegeneration.textforgenerationinresponse.generations[0]]# if it's an array of 1, just parse the first elementiflen(parsed_output)==1:parsed_output=parsed_output[0]self.__track_event("llm","end",run_id=str(run_id),parent_run_id=str(parent_run_id)ifparent_run_idelseNone,output=parsed_output,token_usage={"prompt":token_usage.get("prompt_tokens"),"completion":token_usage.get("completion_tokens"),},app_id=self.__app_id,)exceptExceptionase:logger.error(f"[LLMonitor] An error occurred in on_llm_end: {e}")
[docs]defon_tool_start(self,serialized:Dict[str,Any],input_str:str,*,run_id:UUID,parent_run_id:Union[UUID,None]=None,tags:Union[List[str],None]=None,metadata:Union[Dict[str,Any],None]=None,**kwargs:Any,)->None:ifself.__has_valid_configisFalse:returntry:user_id=_get_user_id(metadata)user_props=_get_user_props(metadata)name=serialized.get("name")self.__track_event("tool","start",user_id=user_id,run_id=str(run_id),parent_run_id=str(parent_run_id)ifparent_run_idelseNone,name=name,input=input_str,tags=tags,metadata=metadata,user_props=user_props,app_id=self.__app_id,)exceptExceptionase:logger.error(f"[LLMonitor] An error occurred in on_tool_start: {e}")
[docs]defon_tool_end(self,output:Any,*,run_id:UUID,parent_run_id:Union[UUID,None]=None,tags:Union[List[str],None]=None,**kwargs:Any,)->None:output=str(output)ifself.__has_valid_configisFalse:returntry:self.__track_event("tool","end",run_id=str(run_id),parent_run_id=str(parent_run_id)ifparent_run_idelseNone,output=output,app_id=self.__app_id,)exceptExceptionase:logger.error(f"[LLMonitor] An error occurred in on_tool_end: {e}")
[docs]defon_chain_start(self,serialized:Dict[str,Any],inputs:Dict[str,Any],*,run_id:UUID,parent_run_id:Union[UUID,None]=None,tags:Union[List[str],None]=None,metadata:Union[Dict[str,Any],None]=None,**kwargs:Any,)->Any:ifself.__has_valid_configisFalse:returntry:name=serialized.get("id",[None,None,None,None])[3]type="chain"metadata=metadataor{}agentName=metadata.get("agent_name")ifagentNameisNone:agentName=metadata.get("agentName")ifname=="AgentExecutor"orname=="PlanAndExecute":type="agent"ifagentNameisnotNone:type="agent"name=agentNameifparent_run_idisnotNone:type="chain"user_id=_get_user_id(metadata)user_props=_get_user_props(metadata)input=_parse_input(inputs)self.__track_event(type,"start",user_id=user_id,run_id=str(run_id),parent_run_id=str(parent_run_id)ifparent_run_idelseNone,name=name,input=input,tags=tags,metadata=metadata,user_props=user_props,app_id=self.__app_id,)exceptExceptionase:logger.error(f"[LLMonitor] An error occurred in on_chain_start: {e}")
[docs]defon_chain_end(self,outputs:Dict[str,Any],*,run_id:UUID,parent_run_id:Union[UUID,None]=None,**kwargs:Any,)->Any:ifself.__has_valid_configisFalse:returntry:output=_parse_output(outputs)self.__track_event("chain","end",run_id=str(run_id),parent_run_id=str(parent_run_id)ifparent_run_idelseNone,output=output,app_id=self.__app_id,)exceptExceptionase:logger.error(f"[LLMonitor] An error occurred in on_chain_end: {e}")
[docs]defon_agent_action(self,action:AgentAction,*,run_id:UUID,parent_run_id:Union[UUID,None]=None,**kwargs:Any,)->Any:ifself.__has_valid_configisFalse:returntry:name=action.toolinput=_parse_input(action.tool_input)self.__track_event("tool","start",run_id=str(run_id),parent_run_id=str(parent_run_id)ifparent_run_idelseNone,name=name,input=input,app_id=self.__app_id,)exceptExceptionase:logger.error(f"[LLMonitor] An error occurred in on_agent_action: {e}")
[docs]defon_agent_finish(self,finish:AgentFinish,*,run_id:UUID,parent_run_id:Union[UUID,None]=None,**kwargs:Any,)->Any:ifself.__has_valid_configisFalse:returntry:output=_parse_output(finish.return_values)self.__track_event("agent","end",run_id=str(run_id),parent_run_id=str(parent_run_id)ifparent_run_idelseNone,output=output,app_id=self.__app_id,)exceptExceptionase:logger.error(f"[LLMonitor] An error occurred in on_agent_finish: {e}")
[docs]defon_chain_error(self,error:BaseException,*,run_id:UUID,parent_run_id:Union[UUID,None]=None,**kwargs:Any,)->Any:ifself.__has_valid_configisFalse:returntry:self.__track_event("chain","error",run_id=str(run_id),parent_run_id=str(parent_run_id)ifparent_run_idelseNone,error={"message":str(error),"stack":traceback.format_exc()},app_id=self.__app_id,)exceptExceptionase:logger.error(f"[LLMonitor] An error occurred in on_chain_error: {e}")
[docs]defon_tool_error(self,error:BaseException,*,run_id:UUID,parent_run_id:Union[UUID,None]=None,**kwargs:Any,)->Any:ifself.__has_valid_configisFalse:returntry:self.__track_event("tool","error",run_id=str(run_id),parent_run_id=str(parent_run_id)ifparent_run_idelseNone,error={"message":str(error),"stack":traceback.format_exc()},app_id=self.__app_id,)exceptExceptionase:logger.error(f"[LLMonitor] An error occurred in on_tool_error: {e}")
[docs]defon_llm_error(self,error:BaseException,*,run_id:UUID,parent_run_id:Union[UUID,None]=None,**kwargs:Any,)->Any:ifself.__has_valid_configisFalse:returntry:self.__track_event("llm","error",run_id=str(run_id),parent_run_id=str(parent_run_id)ifparent_run_idelseNone,error={"message":str(error),"stack":traceback.format_exc()},app_id=self.__app_id,)exceptExceptionase:logger.error(f"[LLMonitor] An error occurred in on_llm_error: {e}")