[docs]defimport_infino()->Any:"""Import the infino client."""returnguard_import("infinopy").InfinoClient()
[docs]defimport_tiktoken()->Any:"""Import tiktoken for counting tokens for OpenAI models."""returnguard_import("tiktoken")
[docs]defget_num_tokens(string:str,openai_model_name:str)->int:"""Calculate num tokens for OpenAI with tiktoken package. Official documentation: https://github.com/openai/openai-cookbook/blob/main /examples/How_to_count_tokens_with_tiktoken.ipynb """tiktoken=import_tiktoken()encoding=tiktoken.encoding_for_model(openai_model_name)num_tokens=len(encoding.encode(string))returnnum_tokens
[docs]classInfinoCallbackHandler(BaseCallbackHandler):"""Callback Handler that logs to Infino."""
[docs]def__init__(self,model_id:Optional[str]=None,model_version:Optional[str]=None,verbose:bool=False,)->None:# Set Infino clientself.client=import_infino()self.model_id=model_idself.model_version=model_versionself.verbose=verboseself.is_chat_openai_model=Falseself.chat_openai_model_name="gpt-3.5-turbo"
def_send_to_infino(self,key:str,value:Any,is_ts:bool=True,)->None:"""Send the key-value to Infino. Parameters: key (str): the key to send to Infino. value (Any): the value to send to Infino. is_ts (bool): if True, the value is part of a time series, else it is sent as a log message. """payload={"date":int(time.time()),key:value,"labels":{"model_id":self.model_id,"model_version":self.model_version,},}ifself.verbose:print(f"Tracking {key} with Infino: {payload}")# noqa: T201# Append to Infino time series only if is_ts is True, otherwise# append to Infino log.ifis_ts:self.client.append_ts(payload)else:self.client.append_log(payload)
[docs]defon_llm_start(self,serialized:Dict[str,Any],prompts:List[str],**kwargs:Any,)->None:"""Log the prompts to Infino, and set start time and error flag."""forpromptinprompts:self._send_to_infino("prompt",prompt,is_ts=False)# Set the error flag to indicate no error (this will get overridden# in on_llm_error if an error occurs).self.error=0# Set the start time (so that we can calculate the request# duration in on_llm_end).self.start_time=time.time()
[docs]defon_llm_new_token(self,token:str,**kwargs:Any)->None:"""Do nothing when a new token is generated."""pass
[docs]defon_llm_end(self,response:LLMResult,**kwargs:Any)->None:"""Log the latency, error, token usage, and response to Infino."""# Calculate and track the request latency.self.end_time=time.time()duration=self.end_time-self.start_timeself._send_to_infino("latency",duration)# Track success or error flag.self._send_to_infino("error",self.error)# Track prompt response.forgenerationsinresponse.generations:forgenerationingenerations:self._send_to_infino("prompt_response",generation.text,is_ts=False)# Track token usage (for non-chat models).if(response.llm_outputisnotNone)andisinstance(response.llm_output,Dict):token_usage=response.llm_output["token_usage"]iftoken_usageisnotNone:prompt_tokens=token_usage["prompt_tokens"]total_tokens=token_usage["total_tokens"]completion_tokens=token_usage["completion_tokens"]self._send_to_infino("prompt_tokens",prompt_tokens)self._send_to_infino("total_tokens",total_tokens)self._send_to_infino("completion_tokens",completion_tokens)# Track completion token usage (for openai chat models).ifself.is_chat_openai_model:messages=" ".join(cast(str,cast(ChatGeneration,generation).message.content)forgenerationingenerations)completion_tokens=get_num_tokens(messages,openai_model_name=self.chat_openai_model_name)self._send_to_infino("completion_tokens",completion_tokens)
[docs]defon_llm_error(self,error:BaseException,**kwargs:Any)->None:"""Set the error flag."""self.error=1
[docs]defon_chain_start(self,serialized:Dict[str,Any],inputs:Dict[str,Any],**kwargs:Any)->None:"""Do nothing when LLM chain starts."""pass
[docs]defon_chain_end(self,outputs:Dict[str,Any],**kwargs:Any)->None:"""Do nothing when LLM chain ends."""pass
[docs]defon_chain_error(self,error:BaseException,**kwargs:Any)->None:"""Need to log the error."""pass
[docs]defon_tool_start(self,serialized:Dict[str,Any],input_str:str,**kwargs:Any,)->None:"""Do nothing when tool starts."""pass
[docs]defon_agent_action(self,action:AgentAction,**kwargs:Any)->Any:"""Do nothing when agent takes a specific action."""pass
[docs]defon_tool_end(self,output:str,observation_prefix:Optional[str]=None,llm_prefix:Optional[str]=None,**kwargs:Any,)->None:"""Do nothing when tool ends."""pass
[docs]defon_tool_error(self,error:BaseException,**kwargs:Any)->None:"""Do nothing when tool outputs an error."""pass
[docs]defon_chat_model_start(self,serialized:Dict[str,Any],messages:List[List[BaseMessage]],**kwargs:Any,)->None:"""Run when LLM starts running."""# Currently, for chat models, we only support input prompts for ChatOpenAI.# Check if this model is a ChatOpenAI model.values=serialized.get("id")ifvalues:forvalueinvalues:ifvalue=="ChatOpenAI":self.is_chat_openai_model=Truebreak# Track prompt tokens for ChatOpenAI model.ifself.is_chat_openai_model:invocation_params=kwargs.get("invocation_params")ifinvocation_params:model_name=invocation_params.get("model_name")ifmodel_name:self.chat_openai_model_name=model_nameprompt_tokens=0formessage_listinmessages:message_string=" ".join(cast(str,msg.content)formsginmessage_list)num_tokens=get_num_tokens(message_string,openai_model_name=self.chat_openai_model_name,)prompt_tokens+=num_tokensself._send_to_infino("prompt_tokens",prompt_tokens)ifself.verbose:print(# noqa: T201f"on_chat_model_start: is_chat_openai_model= \{self.is_chat_openai_model}, \ chat_openai_model_name={self.chat_openai_model_name}")# Send the prompt to infinoprompt=" ".join(cast(str,msg.content)forsublistinmessagesformsginsublist)self._send_to_infino("prompt",prompt,is_ts=False)# Set the error flag to indicate no error (this will get overridden# in on_llm_error if an error occurs).self.error=0# Set the start time (so that we can calculate the request# duration in on_llm_end).self.start_time=time.time()