"""A Tracer implementation that records to LangChain endpoint."""from__future__importannotationsimportloggingfromconcurrent.futuresimportThreadPoolExecutorfromdatetimeimportdatetime,timezonefromtypingimportTYPE_CHECKING,Any,Dict,List,Optional,UnionfromuuidimportUUIDfromlangsmithimportClientfromlangsmithimportutilsasls_utilsfromtenacityimport(Retrying,retry_if_exception_type,stop_after_attempt,wait_exponential_jitter,)fromlangchain_core.envimportget_runtime_environmentfromlangchain_core.loadimportdumpdfromlangchain_core.tracers.baseimportBaseTracerfromlangchain_core.tracers.schemasimportRunifTYPE_CHECKING:fromlangchain_core.messagesimportBaseMessagelogger=logging.getLogger(__name__)_LOGGED=set()_CLIENT:Optional[Client]=None_EXECUTOR:Optional[ThreadPoolExecutor]=None
[docs]deflog_error_once(method:str,exception:Exception)->None:"""Log an error once. Args: method: The method that raised the exception. exception: The exception that was raised. """global_LOGGEDif(method,type(exception))in_LOGGED:return_LOGGED.add((method,type(exception)))logger.error(exception)
[docs]defwait_for_all_tracers()->None:"""Wait for all tracers to finish."""global_CLIENTif_CLIENTisnotNoneand_CLIENT.tracing_queueisnotNone:_CLIENT.tracing_queue.join()
[docs]defget_client()->Client:"""Get the client."""global_CLIENTif_CLIENTisNone:_CLIENT=Client()return_CLIENT
def_get_executor()->ThreadPoolExecutor:"""Get the executor."""global_EXECUTORif_EXECUTORisNone:_EXECUTOR=ThreadPoolExecutor()return_EXECUTORdef_run_to_dict(run:Run)->dict:return{**run.dict(exclude={"child_runs","inputs","outputs"}),"inputs":run.inputs.copy()ifrun.inputsisnotNoneelseNone,"outputs":run.outputs.copy()ifrun.outputsisnotNoneelseNone,}
[docs]classLangChainTracer(BaseTracer):"""Implementation of the SharedTracer that POSTS to the LangChain endpoint."""
[docs]def__init__(self,example_id:Optional[Union[UUID,str]]=None,project_name:Optional[str]=None,client:Optional[Client]=None,tags:Optional[List[str]]=None,**kwargs:Any,)->None:"""Initialize the LangChain tracer. Args: example_id: The example ID. project_name: The project name. Defaults to the tracer project. client: The client. Defaults to the global client. tags: The tags. Defaults to an empty list. kwargs: Additional keyword arguments. """super().__init__(**kwargs)self.example_id=(UUID(example_id)ifisinstance(example_id,str)elseexample_id)self.project_name=project_nameorls_utils.get_tracer_project()self.client=clientorget_client()self.tags=tagsor[]self.latest_run:Optional[Run]=None
[docs]defon_chat_model_start(self,serialized:Dict[str,Any],messages:List[List[BaseMessage]],*,run_id:UUID,tags:Optional[List[str]]=None,parent_run_id:Optional[UUID]=None,metadata:Optional[Dict[str,Any]]=None,name:Optional[str]=None,**kwargs:Any,)->Run:"""Start a trace for an LLM run. Args: serialized: The serialized model. messages: The messages. run_id: The run ID. tags: The tags. Defaults to None. parent_run_id: The parent run ID. Defaults to None. metadata: The metadata. Defaults to None. name: The name. Defaults to None. kwargs: Additional keyword arguments. Returns: Run: The run. """start_time=datetime.now(timezone.utc)ifmetadata:kwargs.update({"metadata":metadata})chat_model_run=Run(id=run_id,parent_run_id=parent_run_id,serialized=serialized,inputs={"messages":[[dumpd(msg)formsginbatch]forbatchinmessages]},extra=kwargs,events=[{"name":"start","time":start_time}],start_time=start_time,run_type="llm",tags=tags,name=name,# type: ignore[arg-type])self._start_trace(chat_model_run)self._on_chat_model_start(chat_model_run)returnchat_model_run
[docs]defget_run_url(self)->str:"""Get the LangSmith root run URL. Returns: str: The LangSmith root run URL. Raises: ValueError: If no traced run is found. ValueError: If the run URL cannot be found. """ifnotself.latest_run:raiseValueError("No traced run found.")# If this is the first run in a project, the project may not yet be created.# This method is only really useful for debugging flows, so we will assume# there is some tolerace for latency.forattemptinRetrying(stop=stop_after_attempt(5),wait=wait_exponential_jitter(),retry=retry_if_exception_type(ls_utils.LangSmithError),):withattempt:returnself.client.get_run_url(run=self.latest_run,project_name=self.project_name)raiseValueError("Failed to get run URL.")
def_get_tags(self,run:Run)->List[str]:"""Get combined tags for a run."""tags=set(run.tagsor[])tags.update(self.tagsor[])returnlist(tags)def_persist_run_single(self,run:Run)->None:"""Persist a run."""run_dict=_run_to_dict(run)run_dict["tags"]=self._get_tags(run)extra=run_dict.get("extra",{})extra["runtime"]=get_runtime_environment()run_dict["extra"]=extratry:self.client.create_run(**run_dict,project_name=self.project_name)exceptExceptionase:# Errors are swallowed by the thread executor so we need to log them herelog_error_once("post",e)raisedef_update_run_single(self,run:Run)->None:"""Update a run."""try:run_dict=_run_to_dict(run)run_dict["tags"]=self._get_tags(run)self.client.update_run(run.id,**run_dict)exceptExceptionase:# Errors are swallowed by the thread executor so we need to log them herelog_error_once("patch",e)raisedef_on_llm_start(self,run:Run)->None:"""Persist an LLM run."""ifrun.parent_run_idisNone:run.reference_example_id=self.example_idself._persist_run_single(run)def_on_chat_model_start(self,run:Run)->None:"""Persist an LLM run."""ifrun.parent_run_idisNone:run.reference_example_id=self.example_idself._persist_run_single(run)def_on_llm_end(self,run:Run)->None:"""Process the LLM Run."""self._update_run_single(run)def_on_llm_error(self,run:Run)->None:"""Process the LLM Run upon error."""self._update_run_single(run)def_on_chain_start(self,run:Run)->None:"""Process the Chain Run upon start."""ifrun.parent_run_idisNone:run.reference_example_id=self.example_idself._persist_run_single(run)def_on_chain_end(self,run:Run)->None:"""Process the Chain Run."""self._update_run_single(run)def_on_chain_error(self,run:Run)->None:"""Process the Chain Run upon error."""self._update_run_single(run)def_on_tool_start(self,run:Run)->None:"""Process the Tool Run upon start."""ifrun.parent_run_idisNone:run.reference_example_id=self.example_idself._persist_run_single(run)def_on_tool_end(self,run:Run)->None:"""Process the Tool Run."""self._update_run_single(run)def_on_tool_error(self,run:Run)->None:"""Process the Tool Run upon error."""self._update_run_single(run)def_on_retriever_start(self,run:Run)->None:"""Process the Retriever Run upon start."""ifrun.parent_run_idisNone:run.reference_example_id=self.example_idself._persist_run_single(run)def_on_retriever_end(self,run:Run)->None:"""Process the Retriever Run."""self._update_run_single(run)def_on_retriever_error(self,run:Run)->None:"""Process the Retriever Run upon error."""self._update_run_single(run)
[docs]defwait_for_futures(self)->None:"""Wait for the given futures to complete."""ifself.clientisnotNoneandself.client.tracing_queueisnotNone:self.client.tracing_queue.join()