"""A Tracer implementation that records to LangChain endpoint."""from__future__importannotationsimportloggingfromconcurrent.futuresimportThreadPoolExecutorfromdatetimeimportdatetime,timezonefromtypingimportTYPE_CHECKING,Any,Optional,UnionfromuuidimportUUIDfromlangsmithimportClientfromlangsmithimportrun_treesasrtfromlangsmithimportutilsasls_utilsfromtenacityimport(Retrying,retry_if_exception_type,stop_after_attempt,wait_exponential_jitter,)fromtyping_extensionsimportoverridefromlangchain_core.envimportget_runtime_environmentfromlangchain_core.loadimportdumpdfromlangchain_core.tracers.baseimportBaseTracerfromlangchain_core.tracers.schemasimportRunifTYPE_CHECKING:fromlangchain_core.messagesimportBaseMessagefromlangchain_core.outputsimportChatGenerationChunk,GenerationChunklogger=logging.getLogger(__name__)_LOGGED=set()_EXECUTOR:Optional[ThreadPoolExecutor]=None
[docs]deflog_error_once(method:str,exception:Exception)->None:"""Log an error once. Args: method: The method that raised the exception. exception: The exception that was raised. """if(method,type(exception))in_LOGGED:return_LOGGED.add((method,type(exception)))logger.error(exception)
[docs]defwait_for_all_tracers()->None:"""Wait for all tracers to finish."""ifrt._CLIENTisnotNone:# noqa: SLF001rt._CLIENT.flush()# noqa: SLF001
[docs]defget_client()->Client:"""Get the client."""returnrt.get_cached_client()
def_get_executor()->ThreadPoolExecutor:"""Get the executor."""global_EXECUTOR# noqa: PLW0603if_EXECUTORisNone:_EXECUTOR=ThreadPoolExecutor()return_EXECUTOR
[docs]classLangChainTracer(BaseTracer):"""Implementation of the SharedTracer that POSTS to the LangChain endpoint."""run_inline=True
[docs]def__init__(self,example_id:Optional[Union[UUID,str]]=None,project_name:Optional[str]=None,client:Optional[Client]=None,tags:Optional[list[str]]=None,**kwargs:Any,)->None:"""Initialize the LangChain tracer. Args: example_id: The example ID. project_name: The project name. Defaults to the tracer project. client: The client. Defaults to the global client. tags: The tags. Defaults to an empty list. kwargs: Additional keyword arguments. """super().__init__(**kwargs)self.example_id=(UUID(example_id)ifisinstance(example_id,str)elseexample_id)self.project_name=project_nameorls_utils.get_tracer_project()self.client=clientorget_client()self.tags=tagsor[]self.latest_run:Optional[Run]=Noneself.run_has_token_event_map:dict[str,bool]={}
[docs]defon_chat_model_start(self,serialized:dict[str,Any],messages:list[list[BaseMessage]],*,run_id:UUID,tags:Optional[list[str]]=None,parent_run_id:Optional[UUID]=None,metadata:Optional[dict[str,Any]]=None,name:Optional[str]=None,**kwargs:Any,)->Run:"""Start a trace for an LLM run. Args: serialized: The serialized model. messages: The messages. run_id: The run ID. tags: The tags. Defaults to None. parent_run_id: The parent run ID. Defaults to None. metadata: The metadata. Defaults to None. name: The name. Defaults to None. kwargs: Additional keyword arguments. Returns: Run: The run. """start_time=datetime.now(timezone.utc)ifmetadata:kwargs.update({"metadata":metadata})chat_model_run=Run(id=run_id,parent_run_id=parent_run_id,serialized=serialized,inputs={"messages":[[dumpd(msg)formsginbatch]forbatchinmessages]},extra=kwargs,events=[{"name":"start","time":start_time}],start_time=start_time,run_type="llm",tags=tags,name=name,# type: ignore[arg-type])self._start_trace(chat_model_run)self._on_chat_model_start(chat_model_run)returnchat_model_run
def_persist_run(self,run:Run)->None:# We want to free up more memory by avoiding keeping a reference to the# whole nested run tree.self.latest_run=Run.construct(**run.dict(exclude={"child_runs","inputs","outputs"}),inputs=run.inputs,outputs=run.outputs,)
[docs]defget_run_url(self)->str:"""Get the LangSmith root run URL. Returns: str: The LangSmith root run URL. Raises: ValueError: If no traced run is found. ValueError: If the run URL cannot be found. """ifnotself.latest_run:msg="No traced run found."raiseValueError(msg)# If this is the first run in a project, the project may not yet be created.# This method is only really useful for debugging flows, so we will assume# there is some tolerace for latency.forattemptinRetrying(stop=stop_after_attempt(5),wait=wait_exponential_jitter(),retry=retry_if_exception_type(ls_utils.LangSmithError),):withattempt:returnself.client.get_run_url(run=self.latest_run,project_name=self.project_name)msg="Failed to get run URL."raiseValueError(msg)
def_get_tags(self,run:Run)->list[str]:"""Get combined tags for a run."""tags=set(run.tagsor[])tags.update(self.tagsor[])returnlist(tags)def_persist_run_single(self,run:Run)->None:"""Persist a run."""try:run.extra["runtime"]=get_runtime_environment()run.tags=self._get_tags(run)ifrun.ls_clientisnotself.client:run.ls_client=self.clientrun.post()exceptExceptionase:# Errors are swallowed by the thread executor so we need to log them herelog_error_once("post",e)raisedef_update_run_single(self,run:Run)->None:"""Update a run."""try:run.patch(exclude_inputs=run.extra.get("inputs_is_truthy",False))exceptExceptionase:# Errors are swallowed by the thread executor so we need to log them herelog_error_once("patch",e)raisedef_on_llm_start(self,run:Run)->None:"""Persist an LLM run."""ifrun.parent_run_idisNone:run.reference_example_id=self.example_idself._persist_run_single(run)@overridedef_llm_run_with_token_event(self,token:str,run_id:UUID,chunk:Optional[Union[GenerationChunk,ChatGenerationChunk]]=None,parent_run_id:Optional[UUID]=None,)->Run:"""Append token event to LLM run and return the run."""run_id_str=str(run_id)ifrun_id_strnotinself.run_has_token_event_map:self.run_has_token_event_map[run_id_str]=Trueelse:returnself._get_run(run_id,run_type={"llm","chat_model"})returnsuper()._llm_run_with_token_event(# Drop the chunk; we don't need to save ittoken,run_id,chunk=None,parent_run_id=parent_run_id,)def_on_chat_model_start(self,run:Run)->None:"""Persist an LLM run."""ifrun.parent_run_idisNone:run.reference_example_id=self.example_idself._persist_run_single(run)def_on_llm_end(self,run:Run)->None:"""Process the LLM Run."""self._update_run_single(run)def_on_llm_error(self,run:Run)->None:"""Process the LLM Run upon error."""self._update_run_single(run)def_on_chain_start(self,run:Run)->None:"""Process the Chain Run upon start."""ifrun.parent_run_idisNone:run.reference_example_id=self.example_idself._persist_run_single(run)def_on_chain_end(self,run:Run)->None:"""Process the Chain Run."""self._update_run_single(run)def_on_chain_error(self,run:Run)->None:"""Process the Chain Run upon error."""self._update_run_single(run)def_on_tool_start(self,run:Run)->None:"""Process the Tool Run upon start."""ifrun.parent_run_idisNone:run.reference_example_id=self.example_idself._persist_run_single(run)def_on_tool_end(self,run:Run)->None:"""Process the Tool Run."""self._update_run_single(run)def_on_tool_error(self,run:Run)->None:"""Process the Tool Run upon error."""self._update_run_single(run)def_on_retriever_start(self,run:Run)->None:"""Process the Retriever Run upon start."""ifrun.parent_run_idisNone:run.reference_example_id=self.example_idself._persist_run_single(run)def_on_retriever_end(self,run:Run)->None:"""Process the Retriever Run."""self._update_run_single(run)def_on_retriever_error(self,run:Run)->None:"""Process the Retriever Run upon error."""self._update_run_single(run)
[docs]defwait_for_futures(self)->None:"""Wait for the given futures to complete."""ifself.clientisnotNone:self.client.flush()