[docs]defimport_aim()->Any:"""Import the aim python package and raise an error if it is not installed."""returnguard_import("aim")
[docs]classBaseMetadataCallbackHandler:"""Callback handler for the metadata and associated function states for callbacks. Attributes: step (int): The current step. starts (int): The number of times the start method has been called. ends (int): The number of times the end method has been called. errors (int): The number of times the error method has been called. text_ctr (int): The number of times the text method has been called. ignore_llm_ (bool): Whether to ignore llm callbacks. ignore_chain_ (bool): Whether to ignore chain callbacks. ignore_agent_ (bool): Whether to ignore agent callbacks. ignore_retriever_ (bool): Whether to ignore retriever callbacks. always_verbose_ (bool): Whether to always be verbose. chain_starts (int): The number of times the chain start method has been called. chain_ends (int): The number of times the chain end method has been called. llm_starts (int): The number of times the llm start method has been called. llm_ends (int): The number of times the llm end method has been called. llm_streams (int): The number of times the text method has been called. tool_starts (int): The number of times the tool start method has been called. tool_ends (int): The number of times the tool end method has been called. agent_ends (int): The number of times the agent end method has been called. """
@propertydefalways_verbose(self)->bool:"""Whether to call verbose callbacks even if verbose is False."""returnself.always_verbose_@propertydefignore_llm(self)->bool:"""Whether to ignore LLM callbacks."""returnself.ignore_llm_@propertydefignore_chain(self)->bool:"""Whether to ignore chain callbacks."""returnself.ignore_chain_@propertydefignore_agent(self)->bool:"""Whether to ignore agent callbacks."""returnself.ignore_agent_@propertydefignore_retriever(self)->bool:"""Whether to ignore retriever callbacks."""returnself.ignore_retriever_
[docs]defreset_callback_meta(self)->None:"""Reset the callback metadata."""self.step=0self.starts=0self.ends=0self.errors=0self.text_ctr=0self.ignore_llm_=Falseself.ignore_chain_=Falseself.ignore_agent_=Falseself.always_verbose_=Falseself.chain_starts=0self.chain_ends=0self.llm_starts=0self.llm_ends=0self.llm_streams=0self.tool_starts=0self.tool_ends=0self.agent_ends=0returnNone
[docs]classAimCallbackHandler(BaseMetadataCallbackHandler,BaseCallbackHandler):"""Callback Handler that logs to Aim. Parameters: repo (:obj:`str`, optional): Aim repository path or Repo object to which Run object is bound. If skipped, default Repo is used. experiment_name (:obj:`str`, optional): Sets Run's `experiment` property. 'default' if not specified. Can be used later to query runs/sequences. system_tracking_interval (:obj:`int`, optional): Sets the tracking interval in seconds for system usage metrics (CPU, Memory, etc.). Set to `None` to disable system metrics tracking. log_system_params (:obj:`bool`, optional): Enable/Disable logging of system params such as installed packages, git info, environment variables, etc. This handler will utilize the associated callback method called and formats the input of each callback function with metadata regarding the state of LLM run and then logs the response to Aim. """
[docs]defon_llm_start(self,serialized:Dict[str,Any],prompts:List[str],**kwargs:Any)->None:"""Run when LLM starts."""aim=import_aim()self.step+=1self.llm_starts+=1self.starts+=1resp={"action":"on_llm_start"}resp.update(self.get_custom_callback_meta())prompts_res=deepcopy(prompts)self._run.track([aim.Text(prompt)forpromptinprompts_res],name="on_llm_start",context=resp,)
[docs]defon_llm_end(self,response:LLMResult,**kwargs:Any)->None:"""Run when LLM ends running."""aim=import_aim()self.step+=1self.llm_ends+=1self.ends+=1resp={"action":"on_llm_end"}resp.update(self.get_custom_callback_meta())response_res=deepcopy(response)generated=[aim.Text(generation.text)forgenerationsinresponse_res.generationsforgenerationingenerations]self._run.track(generated,name="on_llm_end",context=resp,)
[docs]defon_llm_new_token(self,token:str,**kwargs:Any)->None:"""Run when LLM generates a new token."""self.step+=1self.llm_streams+=1
[docs]defon_llm_error(self,error:BaseException,**kwargs:Any)->None:"""Run when LLM errors."""self.step+=1self.errors+=1
[docs]defon_chain_start(self,serialized:Dict[str,Any],inputs:Dict[str,Any],**kwargs:Any)->None:"""Run when chain starts running."""aim=import_aim()self.step+=1self.chain_starts+=1self.starts+=1resp={"action":"on_chain_start"}resp.update(self.get_custom_callback_meta())inputs_res=deepcopy(inputs)self._run.track(aim.Text(inputs_res["input"]),name="on_chain_start",context=resp)
[docs]defon_chain_end(self,outputs:Dict[str,Any],**kwargs:Any)->None:"""Run when chain ends running."""aim=import_aim()self.step+=1self.chain_ends+=1self.ends+=1resp={"action":"on_chain_end"}resp.update(self.get_custom_callback_meta())outputs_res=deepcopy(outputs)self._run.track(aim.Text(outputs_res["output"]),name="on_chain_end",context=resp)
[docs]defon_chain_error(self,error:BaseException,**kwargs:Any)->None:"""Run when chain errors."""self.step+=1self.errors+=1
[docs]defon_tool_start(self,serialized:Dict[str,Any],input_str:str,**kwargs:Any)->None:"""Run when tool starts running."""aim=import_aim()self.step+=1self.tool_starts+=1self.starts+=1resp={"action":"on_tool_start"}resp.update(self.get_custom_callback_meta())self._run.track(aim.Text(input_str),name="on_tool_start",context=resp)
[docs]defon_tool_end(self,output:Any,**kwargs:Any)->None:"""Run when tool ends running."""output=str(output)aim=import_aim()self.step+=1self.tool_ends+=1self.ends+=1resp={"action":"on_tool_end"}resp.update(self.get_custom_callback_meta())self._run.track(aim.Text(output),name="on_tool_end",context=resp)
[docs]defon_tool_error(self,error:BaseException,**kwargs:Any)->None:"""Run when tool errors."""self.step+=1self.errors+=1
[docs]defon_text(self,text:str,**kwargs:Any)->None:""" Run when agent is ending. """self.step+=1self.text_ctr+=1
[docs]defon_agent_finish(self,finish:AgentFinish,**kwargs:Any)->None:"""Run when agent ends running."""aim=import_aim()self.step+=1self.agent_ends+=1self.ends+=1resp={"action":"on_agent_finish"}resp.update(self.get_custom_callback_meta())finish_res=deepcopy(finish)text="OUTPUT:\n{}\n\nLOG:\n{}".format(finish_res.return_values["output"],finish_res.log)self._run.track(aim.Text(text),name="on_agent_finish",context=resp)
[docs]defon_agent_action(self,action:AgentAction,**kwargs:Any)->Any:"""Run on agent action."""aim=import_aim()self.step+=1self.tool_starts+=1self.starts+=1resp={"action":"on_agent_action","tool":action.tool,}resp.update(self.get_custom_callback_meta())action_res=deepcopy(action)text="TOOL INPUT:\n{}\n\nLOG:\n{}".format(action_res.tool_input,action_res.log)self._run.track(aim.Text(text),name="on_agent_action",context=resp)
[docs]defflush_tracker(self,repo:Optional[str]=None,experiment_name:Optional[str]=None,system_tracking_interval:Optional[int]=10,log_system_params:bool=True,langchain_asset:Any=None,reset:bool=True,finish:bool=False,)->None:"""Flush the tracker and reset the session. Args: repo (:obj:`str`, optional): Aim repository path or Repo object to which Run object is bound. If skipped, default Repo is used. experiment_name (:obj:`str`, optional): Sets Run's `experiment` property. 'default' if not specified. Can be used later to query runs/sequences. system_tracking_interval (:obj:`int`, optional): Sets the tracking interval in seconds for system usage metrics (CPU, Memory, etc.). Set to `None` to disable system metrics tracking. log_system_params (:obj:`bool`, optional): Enable/Disable logging of system params such as installed packages, git info, environment variables, etc. langchain_asset: The langchain asset to save. reset: Whether to reset the session. finish: Whether to finish the run. Returns: None """iflangchain_asset:try:forkey,valueinlangchain_asset.dict().items():self._run.set(key,value,strict=False)exceptException:passiffinishorreset:self._run.close()self.reset_callback_meta()ifreset:aim=import_aim()self.repo=repoifrepoelseself.repoself.experiment_name=(experiment_nameifexperiment_nameelseself.experiment_name)self.system_tracking_interval=(system_tracking_intervalifsystem_tracking_intervalelseself.system_tracking_interval)self.log_system_params=(log_system_paramsiflog_system_paramselseself.log_system_params)self._run=aim.Run(repo=self.repo,experiment=self.experiment_name,system_tracking_interval=self.system_tracking_interval,log_system_params=self.log_system_params,)self._run_hash=self._run.hashself.action_records=[]