[docs]classLogEntry(TypedDict):"""A single entry in the run log."""id:str"""ID of the sub-run."""name:str"""Name of the object being run."""type:str"""Type of the object being run, eg. prompt, chain, llm, etc."""tags:list[str]"""List of tags for the run."""metadata:dict[str,Any]"""Key-value pairs of metadata for the run."""start_time:str"""ISO-8601 timestamp of when the run started."""streamed_output_str:list[str]"""List of LLM tokens streamed by this run, if applicable."""streamed_output:list[Any]"""List of output chunks streamed by this run, if available."""inputs:NotRequired[Optional[Any]]"""Inputs to this run. Not available currently via astream_log."""final_output:Optional[Any]"""Final output of this run. Only available after the run has finished successfully."""end_time:Optional[str]"""ISO-8601 timestamp of when the run ended. Only available after the run has finished."""
[docs]classRunState(TypedDict):"""State of the run."""id:str"""ID of the run."""streamed_output:list[Any]"""List of output chunks streamed by Runnable.stream()"""final_output:Optional[Any]"""Final output of the run, usually the result of aggregating (`+`) streamed_output. Updated throughout the run when supported by the Runnable."""name:str"""Name of the object being run."""type:str"""Type of the object being run, eg. prompt, chain, llm, etc."""# Do we want tags/metadata on the root run? Client kinda knows it in most situations# tags: List[str]logs:dict[str,LogEntry]"""Map of run names to sub-runs. If filters were supplied, this list will contain only the runs that matched the filters."""
[docs]classRunLogPatch:"""Patch to the run log."""ops:list[dict[str,Any]]"""List of jsonpatch operations, which describe how to create the run state from an empty dict. This is the minimal representation of the log, designed to be serialized as JSON and sent over the wire to reconstruct the log on the other side. Reconstruction of the state can be done with any jsonpatch-compliant library, see https://jsonpatch.com for more information."""
def__add__(self,other:Union[RunLogPatch,Any])->RunLog:iftype(other)isRunLogPatch:ops=self.ops+other.opsstate=jsonpatch.apply_patch(None,copy.deepcopy(ops))returnRunLog(*ops,state=state)msg=f"unsupported operand type(s) for +: '{type(self)}' and '{type(other)}'"raiseTypeError(msg)def__repr__(self)->str:frompprintimportpformat# 1:-1 to get rid of the [] around the listreturnf"RunLogPatch({pformat(self.ops)[1:-1]})"def__eq__(self,other:object)->bool:returnisinstance(other,RunLogPatch)andself.ops==other.ops
[docs]classRunLog(RunLogPatch):"""Run log."""state:RunState"""Current state of the log, obtained from applying all ops in sequence."""
def__add__(self,other:Union[RunLogPatch,Any])->RunLog:iftype(other)isRunLogPatch:ops=self.ops+other.opsstate=jsonpatch.apply_patch(self.state,other.ops)returnRunLog(*ops,state=state)msg=f"unsupported operand type(s) for +: '{type(self)}' and '{type(other)}'"raiseTypeError(msg)def__repr__(self)->str:frompprintimportpformatreturnf"RunLog({pformat(self.state)})"def__eq__(self,other:object)->bool:# First compare that the state is the sameifnotisinstance(other,RunLog):returnFalseifself.state!=other.state:returnFalse# Then compare that the ops are the samereturnsuper().__eq__(other)
T=TypeVar("T")
[docs]classLogStreamCallbackHandler(BaseTracer,_StreamingCallbackHandler):"""Tracer that streams run logs to a stream."""
[docs]def__init__(self,*,auto_close:bool=True,include_names:Optional[Sequence[str]]=None,include_types:Optional[Sequence[str]]=None,include_tags:Optional[Sequence[str]]=None,exclude_names:Optional[Sequence[str]]=None,exclude_types:Optional[Sequence[str]]=None,exclude_tags:Optional[Sequence[str]]=None,# Schema format is for internal use only._schema_format:Literal["original","streaming_events"]="streaming_events",)->None:"""A tracer that streams run logs to a stream. Args: auto_close: Whether to close the stream when the root run finishes. include_names: Only include runs from Runnables with matching names. include_types: Only include runs from Runnables with matching types. include_tags: Only include runs from Runnables with matching tags. exclude_names: Exclude runs from Runnables with matching names. exclude_types: Exclude runs from Runnables with matching types. exclude_tags: Exclude runs from Runnables with matching tags. _schema_format: Primarily changes how the inputs and outputs are handled. **For internal use only. This API will change.** - 'original' is the format used by all current tracers. This format is slightly inconsistent with respect to inputs and outputs. - 'streaming_events' is used for supporting streaming events, for internal usage. It will likely change in the future, or be deprecated entirely in favor of a dedicated async tracer for streaming events. Raises: ValueError: If an invalid schema format is provided (internal use only). """if_schema_formatnotin{"original","streaming_events"}:msg=(f"Invalid schema format: {_schema_format}. "f"Expected one of 'original', 'streaming_events'.")raiseValueError(msg)super().__init__(_schema_format=_schema_format)self.auto_close=auto_closeself.include_names=include_namesself.include_types=include_typesself.include_tags=include_tagsself.exclude_names=exclude_namesself.exclude_types=exclude_typesself.exclude_tags=exclude_tagsloop=asyncio.get_event_loop()memory_stream=_MemoryStream[RunLogPatch](loop)self.lock=threading.Lock()self.send_stream=memory_stream.get_send_stream()self.receive_stream=memory_stream.get_receive_stream()self._key_map_by_run_id:dict[UUID,str]={}self._counter_map_by_name:dict[str,int]=defaultdict(int)self.root_id:Optional[UUID]=None
[docs]defsend(self,*ops:dict[str,Any])->bool:"""Send a patch to the stream, return False if the stream is closed. Args: *ops: The operations to send to the stream. Returns: bool: True if the patch was sent successfully, False if the stream is closed. """# We will likely want to wrap this in try / except at some point# to handle exceptions that might arise at run time.# For now we'll let the exception bubble up, and always return# True on the happy path.self.send_stream.send_nowait(RunLogPatch(*ops))returnTrue
[docs]asyncdeftap_output_aiter(self,run_id:UUID,output:AsyncIterator[T])->AsyncIterator[T]:"""Tap an output async iterator to stream its values to the log. Args: run_id: The ID of the run. output: The output async iterator. Yields: T: The output value. """asyncforchunkinoutput:# root run is handled in .astream_log()# if we can't find the run silently ignore# eg. because this run wasn't included in the logif(run_id!=self.root_idand(key:=self._key_map_by_run_id.get(run_id))and(notself.send({"op":"add","path":f"/logs/{key}/streamed_output/-","value":chunk,}))):breakyieldchunk
[docs]deftap_output_iter(self,run_id:UUID,output:Iterator[T])->Iterator[T]:"""Tap an output async iterator to stream its values to the log. Args: run_id: The ID of the run. output: The output iterator. Yields: T: The output value. """forchunkinoutput:# root run is handled in .astream_log()# if we can't find the run silently ignore# eg. because this run wasn't included in the logif(run_id!=self.root_idand(key:=self._key_map_by_run_id.get(run_id))and(notself.send({"op":"add","path":f"/logs/{key}/streamed_output/-","value":chunk,}))):breakyieldchunk
[docs]definclude_run(self,run:Run)->bool:"""Check if a Run should be included in the log. Args: run: The Run to check. Returns: bool: True if the run should be included, False otherwise. """ifrun.id==self.root_id:returnFalserun_tags=run.tagsor[]if(self.include_namesisNoneandself.include_typesisNoneandself.include_tagsisNone):include=Trueelse:include=Falseifself.include_namesisnotNone:include=includeorrun.nameinself.include_namesifself.include_typesisnotNone:include=includeorrun.run_typeinself.include_typesifself.include_tagsisnotNone:include=includeorany(taginself.include_tagsfortaginrun_tags)ifself.exclude_namesisnotNone:include=includeandrun.namenotinself.exclude_namesifself.exclude_typesisnotNone:include=includeandrun.run_typenotinself.exclude_typesifself.exclude_tagsisnotNone:include=includeandall(tagnotinself.exclude_tagsfortaginrun_tags)returninclude
def_persist_run(self,run:Run)->None:# This is a legacy method only called once for an entire run tree# therefore not useful herepassdef_on_run_create(self,run:Run)->None:"""Start a run."""ifself.root_idisNone:self.root_id=run.idifnotself.send({"op":"replace","path":"","value":RunState(id=str(run.id),streamed_output=[],final_output=None,logs={},name=run.name,type=run.run_type,),}):returnifnotself.include_run(run):return# Determine previous index, increment by 1withself.lock:self._counter_map_by_name[run.name]+=1count=self._counter_map_by_name[run.name]self._key_map_by_run_id[run.id]=(run.nameifcount==1elsef"{run.name}:{count}")entry=LogEntry(id=str(run.id),name=run.name,type=run.run_type,tags=run.tagsor[],metadata=(run.extraor{}).get("metadata",{}),start_time=run.start_time.isoformat(timespec="milliseconds"),streamed_output=[],streamed_output_str=[],final_output=None,end_time=None,)ifself._schema_format=="streaming_events":# If using streaming events let's add inputs as wellentry["inputs"]=_get_standardized_inputs(run,self._schema_format)# Add the run to the streamself.send({"op":"add","path":f"/logs/{self._key_map_by_run_id[run.id]}","value":entry,})def_on_run_update(self,run:Run)->None:"""Finish a run."""try:index=self._key_map_by_run_id.get(run.id)ifindexisNone:returnops=[]ifself._schema_format=="streaming_events":ops.append({"op":"replace","path":f"/logs/{index}/inputs","value":_get_standardized_inputs(run,self._schema_format),})ops.extend([# Replace 'inputs' with final inputs# This is needed because in many cases the inputs are not# known until after the run is finished and the entire# input stream has been processed by the runnable.{"op":"add","path":f"/logs/{index}/final_output",# to undo the dumpd done by some runnables / tracer / etc"value":_get_standardized_outputs(run,self._schema_format),},{"op":"add","path":f"/logs/{index}/end_time","value":run.end_time.isoformat(timespec="milliseconds")ifrun.end_timeisnotNoneelseNone,},])self.send(*ops)finally:ifrun.id==self.root_idandself.auto_close:self.send_stream.close()def_on_llm_new_token(self,run:Run,token:str,chunk:Optional[Union[GenerationChunk,ChatGenerationChunk]],)->None:"""Process new LLM token."""index=self._key_map_by_run_id.get(run.id)ifindexisNone:returnself.send({"op":"add","path":f"/logs/{index}/streamed_output_str/-","value":token,},{"op":"add","path":f"/logs/{index}/streamed_output/-","value":chunk.messageifisinstance(chunk,ChatGenerationChunk)elsetoken,},)
def_get_standardized_inputs(run:Run,schema_format:Literal["original","streaming_events"])->Optional[dict[str,Any]]:"""Extract standardized inputs from a run. Standardizes the inputs based on the type of the runnable used. Args: run: Run object schema_format: The schema format to use. Returns: Valid inputs are only dict. By conventions, inputs always represented invocation using named arguments. None means that the input is not yet known! """ifschema_format=="original":msg=("Do not assign inputs with original schema drop the key for now.""When inputs are added to astream_log they should be added with ""standardized schema for streaming events.")raiseNotImplementedError(msg)inputs=load(run.inputs)ifrun.run_typein{"retriever","llm","chat_model"}:returninputs# new style chains# These nest an additional 'input' key inside the 'inputs' to make sure# the input is always a dict. We need to unpack and user the inner value.inputs=inputs["input"]# We should try to fix this in Runnables and callbacks/tracers# Runnables should be using a None type here not a placeholder# dict.ifinputs=={"input":""}:# Workaround for Runnables not using None# The input is not known, so we don't assign data['input']returnNonereturninputsdef_get_standardized_outputs(run:Run,schema_format:Literal["original","streaming_events","original+chat"])->Optional[Any]:"""Extract standardized output from a run. Standardizes the outputs based on the type of the runnable used. Args: log: The log entry. schema_format: The schema format to use. Returns: An output if returned, otherwise a None """outputs=load(run.outputs)ifschema_format=="original":ifrun.run_type=="prompt"and"output"inoutputs:# These were previously dumped before the tracer.# Now we needn't do anything to them.returnoutputs["output"]# Return the old schema, without standardizing anythingreturnoutputsifrun.run_typein{"retriever","llm","chat_model"}:returnoutputsifisinstance(outputs,dict):returnoutputs.get("output",None)returnNone@overloaddef_astream_log_implementation(runnable:Runnable[Input,Output],input:Any,config:Optional[RunnableConfig]=None,*,stream:LogStreamCallbackHandler,diff:Literal[True]=True,with_streamed_output_list:bool=True,**kwargs:Any,)->AsyncIterator[RunLogPatch]:...@overloaddef_astream_log_implementation(runnable:Runnable[Input,Output],input:Any,config:Optional[RunnableConfig]=None,*,stream:LogStreamCallbackHandler,diff:Literal[False],with_streamed_output_list:bool=True,**kwargs:Any,)->AsyncIterator[RunLog]:...asyncdef_astream_log_implementation(runnable:Runnable[Input,Output],input:Any,config:Optional[RunnableConfig]=None,*,stream:LogStreamCallbackHandler,diff:bool=True,with_streamed_output_list:bool=True,**kwargs:Any,)->Union[AsyncIterator[RunLogPatch],AsyncIterator[RunLog]]:"""Implementation of astream_log for a given runnable. The implementation has been factored out (at least temporarily) as both astream_log and astream_events relies on it. """importjsonpatch# type: ignore[import]fromlangchain_core.callbacks.baseimportBaseCallbackManagerfromlangchain_core.tracers.log_streamimport(RunLog,RunLogPatch,)# Assign the stream handler to the configconfig=ensure_config(config)callbacks=config.get("callbacks")ifcallbacksisNone:config["callbacks"]=[stream]elifisinstance(callbacks,list):config["callbacks"]=callbacks+[stream]elifisinstance(callbacks,BaseCallbackManager):callbacks=callbacks.copy()callbacks.add_handler(stream,inherit=True)config["callbacks"]=callbackselse:msg=(f"Unexpected type for callbacks: {callbacks}.""Expected None, list or AsyncCallbackManager.")raiseValueError(msg)# Call the runnable in streaming mode,# add each chunk to the output streamasyncdefconsume_astream()->None:try:prev_final_output:Optional[Output]=Nonefinal_output:Optional[Output]=Noneasyncforchunkinrunnable.astream(input,config,**kwargs):prev_final_output=final_outputiffinal_outputisNone:final_output=chunkelse:try:final_output=final_output+chunk# type: ignoreexceptTypeError:prev_final_output=Nonefinal_output=chunkpatches:list[dict[str,Any]]=[]ifwith_streamed_output_list:patches.append({"op":"add","path":"/streamed_output/-",# chunk cannot be shared between# streamed_output and final_output# otherwise jsonpatch.apply will# modify both"value":copy.deepcopy(chunk),})foropinjsonpatch.JsonPatch.from_diff(prev_final_output,final_output,dumps=dumps):patches.append({**op,"path":f"/final_output{op['path']}"})awaitstream.send_stream.send(RunLogPatch(*patches))finally:awaitstream.send_stream.aclose()# Start the runnable in a task, so we can start consuming outputtask=asyncio.create_task(consume_astream())try:# Yield each chunk from the output streamifdiff:asyncforloginstream:yieldlogelse:state=RunLog(state=None)# type: ignore[arg-type]asyncforloginstream:state=state+logyieldstatefinally:# Wait for the runnable to finish, if not cancelled (eg. by break)withcontextlib.suppress(asyncio.CancelledError):awaittask