"""A Tracer implementation that records to LangChain endpoint."""from__future__importannotationsimportloggingimportwarningsfromconcurrent.futuresimportThreadPoolExecutorfromdatetimeimportdatetime,timezonefromtypingimportTYPE_CHECKING,Any,Optional,UnionfromuuidimportUUIDfromlangsmithimportClientfromlangsmithimportrun_treesasrtfromlangsmithimportutilsasls_utilsfrompydanticimportPydanticDeprecationWarningfromtenacityimport(Retrying,retry_if_exception_type,stop_after_attempt,wait_exponential_jitter,)fromlangchain_core.envimportget_runtime_environmentfromlangchain_core.loadimportdumpdfromlangchain_core.tracers.baseimportBaseTracerfromlangchain_core.tracers.schemasimportRunifTYPE_CHECKING:fromlangchain_core.messagesimportBaseMessagefromlangchain_core.outputsimportChatGenerationChunk,GenerationChunklogger=logging.getLogger(__name__)_LOGGED=set()_EXECUTOR:Optional[ThreadPoolExecutor]=None
[docs]deflog_error_once(method:str,exception:Exception)->None:"""Log an error once. Args: method: The method that raised the exception. exception: The exception that was raised. """if(method,type(exception))in_LOGGED:return_LOGGED.add((method,type(exception)))logger.error(exception)
[docs]defwait_for_all_tracers()->None:"""Wait for all tracers to finish."""ifrt._CLIENTisnotNone:rt._CLIENT.flush()
[docs]defget_client()->Client:"""Get the client."""returnrt.get_cached_client()
def_get_executor()->ThreadPoolExecutor:"""Get the executor."""global_EXECUTOR# noqa: PLW0603if_EXECUTORisNone:_EXECUTOR=ThreadPoolExecutor()return_EXECUTORdef_run_to_dict(run:Run,exclude_inputs:bool=False)->dict:# TODO: Update once langsmith moves to Pydantic V2 and we can swap run.dict for# run.model_dumpwithwarnings.catch_warnings():warnings.simplefilter("ignore",category=PydanticDeprecationWarning)res={**run.dict(exclude={"child_runs","inputs","outputs"}),"outputs":run.outputs,}ifnotexclude_inputs:res["inputs"]=run.inputsreturnres
[docs]classLangChainTracer(BaseTracer):"""Implementation of the SharedTracer that POSTS to the LangChain endpoint."""run_inline=True
[docs]def__init__(self,example_id:Optional[Union[UUID,str]]=None,project_name:Optional[str]=None,client:Optional[Client]=None,tags:Optional[list[str]]=None,**kwargs:Any,)->None:"""Initialize the LangChain tracer. Args: example_id: The example ID. project_name: The project name. Defaults to the tracer project. client: The client. Defaults to the global client. tags: The tags. Defaults to an empty list. kwargs: Additional keyword arguments. """super().__init__(**kwargs)self.example_id=(UUID(example_id)ifisinstance(example_id,str)elseexample_id)self.project_name=project_nameorls_utils.get_tracer_project()self.client=clientorget_client()self.tags=tagsor[]self.latest_run:Optional[Run]=None
[docs]defon_chat_model_start(self,serialized:dict[str,Any],messages:list[list[BaseMessage]],*,run_id:UUID,tags:Optional[list[str]]=None,parent_run_id:Optional[UUID]=None,metadata:Optional[dict[str,Any]]=None,name:Optional[str]=None,**kwargs:Any,)->Run:"""Start a trace for an LLM run. Args: serialized: The serialized model. messages: The messages. run_id: The run ID. tags: The tags. Defaults to None. parent_run_id: The parent run ID. Defaults to None. metadata: The metadata. Defaults to None. name: The name. Defaults to None. kwargs: Additional keyword arguments. Returns: Run: The run. """start_time=datetime.now(timezone.utc)ifmetadata:kwargs.update({"metadata":metadata})chat_model_run=Run(id=run_id,parent_run_id=parent_run_id,serialized=serialized,inputs={"messages":[[dumpd(msg)formsginbatch]forbatchinmessages]},extra=kwargs,events=[{"name":"start","time":start_time}],start_time=start_time,run_type="llm",tags=tags,name=name,# type: ignore[arg-type])self._start_trace(chat_model_run)self._on_chat_model_start(chat_model_run)returnchat_model_run
def_persist_run(self,run:Run)->None:# We want to free up more memory by avoiding keeping a reference to the# whole nested run tree.self.latest_run=Run.construct(**run.dict(exclude={"child_runs","inputs","outputs"}),inputs=run.inputs,outputs=run.outputs,)
[docs]defget_run_url(self)->str:"""Get the LangSmith root run URL. Returns: str: The LangSmith root run URL. Raises: ValueError: If no traced run is found. ValueError: If the run URL cannot be found. """ifnotself.latest_run:msg="No traced run found."raiseValueError(msg)# If this is the first run in a project, the project may not yet be created.# This method is only really useful for debugging flows, so we will assume# there is some tolerace for latency.forattemptinRetrying(stop=stop_after_attempt(5),wait=wait_exponential_jitter(),retry=retry_if_exception_type(ls_utils.LangSmithError),):withattempt:returnself.client.get_run_url(run=self.latest_run,project_name=self.project_name)msg="Failed to get run URL."raiseValueError(msg)
def_get_tags(self,run:Run)->list[str]:"""Get combined tags for a run."""tags=set(run.tagsor[])tags.update(self.tagsor[])returnlist(tags)def_persist_run_single(self,run:Run)->None:"""Persist a run."""try:run_dict=_run_to_dict(run)run_dict["tags"]=self._get_tags(run)extra=run_dict.get("extra",{})extra["runtime"]=get_runtime_environment()run_dict["extra"]=extrainputs_=run_dict.get("inputs")ifinputs_and(len(inputs_)>1orbool(next(iter(inputs_.values())))):inputs_is_truthy=Trueelse:inputs_is_truthy=Falserun.extra["inputs_is_truthy"]=inputs_is_truthyself.client.create_run(**run_dict,project_name=self.project_name)exceptExceptionase:# Errors are swallowed by the thread executor so we need to log them herelog_error_once("post",e)raisedef_update_run_single(self,run:Run)->None:"""Update a run."""try:exclude_inputs=run.extra.get("inputs_is_truthy",False)run_dict=_run_to_dict(run,exclude_inputs=exclude_inputs)run_dict["tags"]=self._get_tags(run)self.client.update_run(run.id,**run_dict)exceptExceptionase:# Errors are swallowed by the thread executor so we need to log them herelog_error_once("patch",e)raisedef_on_llm_start(self,run:Run)->None:"""Persist an LLM run."""ifrun.parent_run_idisNone:run.reference_example_id=self.example_idself._persist_run_single(run)def_llm_run_with_token_event(self,token:str,run_id:UUID,chunk:Optional[Union[GenerationChunk,ChatGenerationChunk]]=None,parent_run_id:Optional[UUID]=None,**kwargs:Any,)->Run:"""Append token event to LLM run and return the run."""returnsuper()._llm_run_with_token_event(# Drop the chunk; we don't need to save ittoken,run_id,chunk=None,parent_run_id=parent_run_id,**kwargs,)def_on_chat_model_start(self,run:Run)->None:"""Persist an LLM run."""ifrun.parent_run_idisNone:run.reference_example_id=self.example_idself._persist_run_single(run)def_on_llm_end(self,run:Run)->None:"""Process the LLM Run."""self._update_run_single(run)def_on_llm_error(self,run:Run)->None:"""Process the LLM Run upon error."""self._update_run_single(run)def_on_chain_start(self,run:Run)->None:"""Process the Chain Run upon start."""ifrun.parent_run_idisNone:run.reference_example_id=self.example_idself._persist_run_single(run)def_on_chain_end(self,run:Run)->None:"""Process the Chain Run."""self._update_run_single(run)def_on_chain_error(self,run:Run)->None:"""Process the Chain Run upon error."""self._update_run_single(run)def_on_tool_start(self,run:Run)->None:"""Process the Tool Run upon start."""ifrun.parent_run_idisNone:run.reference_example_id=self.example_idself._persist_run_single(run)def_on_tool_end(self,run:Run)->None:"""Process the Tool Run."""self._update_run_single(run)def_on_tool_error(self,run:Run)->None:"""Process the Tool Run upon error."""self._update_run_single(run)def_on_retriever_start(self,run:Run)->None:"""Process the Retriever Run upon start."""ifrun.parent_run_idisNone:run.reference_example_id=self.example_idself._persist_run_single(run)def_on_retriever_end(self,run:Run)->None:"""Process the Retriever Run."""self._update_run_single(run)def_on_retriever_error(self,run:Run)->None:"""Process the Retriever Run upon error."""self._update_run_single(run)
[docs]defwait_for_futures(self)->None:"""Wait for the given futures to complete."""ifself.clientisnotNone:self.client.flush()