Source code for langchain_community.callbacks.arthur_callback
"""ArthurAI's Callback Handler."""from__future__importannotationsimportosimportuuidfromcollectionsimportdefaultdictfromdatetimeimportdatetimefromtimeimporttimefromtypingimportTYPE_CHECKING,Any,DefaultDict,Dict,List,Optionalimportnumpyasnpfromlangchain_core.agentsimportAgentAction,AgentFinishfromlangchain_core.callbacksimportBaseCallbackHandlerfromlangchain_core.outputsimportLLMResultifTYPE_CHECKING:importarthuraifromarthurai.core.modelsimportArthurModelPROMPT_TOKENS="prompt_tokens"COMPLETION_TOKENS="completion_tokens"TOKEN_USAGE="token_usage"FINISH_REASON="finish_reason"DURATION="duration"def_lazy_load_arthur()->arthurai:"""Lazy load Arthur."""try:importarthuraiexceptImportErrorase:raiseImportError("To use the ArthurCallbackHandler you need the"" `arthurai` package. Please install it with"" `pip install arthurai`.",e,)returnarthurai
[docs]classArthurCallbackHandler(BaseCallbackHandler):"""Callback Handler that logs to Arthur platform. Arthur helps enterprise teams optimize model operations and performance at scale. The Arthur API tracks model performance, explainability, and fairness across tabular, NLP, and CV models. Our API is model- and platform-agnostic, and continuously scales with complex and dynamic enterprise needs. To learn more about Arthur, visit our website at https://www.arthur.ai/ or read the Arthur docs at https://docs.arthur.ai/ """
[docs]def__init__(self,arthur_model:ArthurModel,)->None:"""Initialize callback handler."""super().__init__()arthurai=_lazy_load_arthur()Stage=arthurai.common.constants.StageValueType=arthurai.common.constants.ValueTypeself.arthur_model=arthur_model# save the attributes of this model to be used when preparing# inferences to log to Arthur in on_llm_end()self.attr_names=set([a.nameforainself.arthur_model.get_attributes()])self.input_attr=[xforxinself.arthur_model.get_attributes()ifx.stage==Stage.ModelPipelineInputandx.value_type==ValueType.Unstructured_Text][0].nameself.output_attr=[xforxinself.arthur_model.get_attributes()ifx.stage==Stage.PredictedValueandx.value_type==ValueType.Unstructured_Text][0].nameself.token_likelihood_attr=Noneif(len([xforxinself.arthur_model.get_attributes()ifx.value_type==ValueType.TokenLikelihoods])>0):self.token_likelihood_attr=[xforxinself.arthur_model.get_attributes()ifx.value_type==ValueType.TokenLikelihoods][0].nameself.run_map:DefaultDict[str,Any]=defaultdict(dict)
[docs]@classmethoddeffrom_credentials(cls,model_id:str,arthur_url:Optional[str]="https://app.arthur.ai",arthur_login:Optional[str]=None,arthur_password:Optional[str]=None,)->ArthurCallbackHandler:"""Initialize callback handler from Arthur credentials. Args: model_id (str): The ID of the arthur model to log to. arthur_url (str, optional): The URL of the Arthur instance to log to. Defaults to "https://app.arthur.ai". arthur_login (str, optional): The login to use to connect to Arthur. Defaults to None. arthur_password (str, optional): The password to use to connect to Arthur. Defaults to None. Returns: ArthurCallbackHandler: The initialized callback handler. """arthurai=_lazy_load_arthur()ArthurAI=arthurai.ArthurAIResponseClientError=arthurai.common.exceptions.ResponseClientError# connect to Arthurifarthur_loginisNone:try:arthur_api_key=os.environ["ARTHUR_API_KEY"]exceptKeyError:raiseValueError("No Arthur authentication provided. Either give"" a login to the ArthurCallbackHandler"" or set an ARTHUR_API_KEY as an environment variable.")arthur=ArthurAI(url=arthur_url,access_key=arthur_api_key)else:ifarthur_passwordisNone:arthur=ArthurAI(url=arthur_url,login=arthur_login)else:arthur=ArthurAI(url=arthur_url,login=arthur_login,password=arthur_password)# get model from Arthur by the provided model IDtry:arthur_model=arthur.get_model(model_id)exceptResponseClientError:raiseValueError(f"Was unable to retrieve model with id {model_id} from Arthur."" Make sure the ID corresponds to a model that is currently"" registered with your Arthur account.")returncls(arthur_model)
[docs]defon_llm_start(self,serialized:Dict[str,Any],prompts:List[str],**kwargs:Any)->None:"""On LLM start, save the input prompts"""run_id=kwargs["run_id"]self.run_map[run_id]["input_texts"]=promptsself.run_map[run_id]["start_time"]=time()
[docs]defon_llm_end(self,response:LLMResult,**kwargs:Any)->None:"""On LLM end, send data to Arthur."""try:importpytzexceptImportErrorase:raiseImportError("Could not import pytz. Please install it with 'pip install pytz'.")fromerun_id=kwargs["run_id"]# get the run params from this run ID,# or raise an error if this run ID has no corresponding metadata in self.run_maptry:run_map_data=self.run_map[run_id]exceptKeyErrorase:raiseKeyError("This function has been called with a run_id"" that was never registered in on_llm_start()."" Restart and try running the LLM again")frome# mark the duration time between on_llm_start() and on_llm_end()time_from_start_to_end=time()-run_map_data["start_time"]# create inferences to log to Arthurinferences=[]fori,generationsinenumerate(response.generations):forgenerationingenerations:inference={"partner_inference_id":str(uuid.uuid4()),"inference_timestamp":datetime.now(tz=pytz.UTC),self.input_attr:run_map_data["input_texts"][i],self.output_attr:generation.text,}ifgeneration.generation_infoisnotNone:# add finish reason to the inference# if generation info contains a finish reason and# if the ArthurModel was registered to monitor finish_reasonif(FINISH_REASONingeneration.generation_infoandFINISH_REASONinself.attr_names):inference[FINISH_REASON]=generation.generation_info[FINISH_REASON]# add token likelihoods data to the inference if the ArthurModel# was registered to monitor token likelihoodslogprobs_data=generation.generation_info["logprobs"]if(logprobs_dataisnotNoneandself.token_likelihood_attrisnotNone):logprobs=logprobs_data["top_logprobs"]likelihoods=[{k:np.exp(v)fork,vinlogprobs[i].items()}foriinrange(len(logprobs))]inference[self.token_likelihood_attr]=likelihoods# add token usage counts to the inference if the# ArthurModel was registered to monitor token usageif(isinstance(response.llm_output,dict)andTOKEN_USAGEinresponse.llm_output):token_usage=response.llm_output[TOKEN_USAGE]if(PROMPT_TOKENSintoken_usageandPROMPT_TOKENSinself.attr_names):inference[PROMPT_TOKENS]=token_usage[PROMPT_TOKENS]if(COMPLETION_TOKENSintoken_usageandCOMPLETION_TOKENSinself.attr_names):inference[COMPLETION_TOKENS]=token_usage[COMPLETION_TOKENS]# add inference duration to the inference if the ArthurModel# was registered to monitor inference durationifDURATIONinself.attr_names:inference[DURATION]=time_from_start_to_endinferences.append(inference)# send inferences to arthurself.arthur_model.send_inferences(inferences)
[docs]defon_chain_start(self,serialized:Dict[str,Any],inputs:Dict[str,Any],**kwargs:Any)->None:"""On chain start, do nothing."""
[docs]defon_chain_end(self,outputs:Dict[str,Any],**kwargs:Any)->None:"""On chain end, do nothing."""
[docs]defon_llm_error(self,error:BaseException,**kwargs:Any)->None:"""Do nothing when LLM outputs an error."""
[docs]defon_llm_new_token(self,token:str,**kwargs:Any)->None:"""On new token, pass."""
[docs]defon_chain_error(self,error:BaseException,**kwargs:Any)->None:"""Do nothing when LLM chain outputs an error."""
[docs]defon_tool_start(self,serialized:Dict[str,Any],input_str:str,**kwargs:Any,)->None:"""Do nothing when tool starts."""
[docs]defon_agent_action(self,action:AgentAction,**kwargs:Any)->Any:"""Do nothing when agent takes a specific action."""
[docs]defon_tool_end(self,output:Any,observation_prefix:Optional[str]=None,llm_prefix:Optional[str]=None,**kwargs:Any,)->None:"""Do nothing when tool ends."""
[docs]defon_tool_error(self,error:BaseException,**kwargs:Any)->None:"""Do nothing when tool outputs an error."""