Source code for langchain_community.callbacks.streamlit.streamlit_callback_handler
"""Callback Handler that prints to streamlit."""from__future__importannotationsfromenumimportEnumfromtypingimportTYPE_CHECKING,Any,Dict,List,NamedTuple,Optionalfromlangchain_core.agentsimportAgentAction,AgentFinishfromlangchain_core.callbacksimportBaseCallbackHandlerfromlangchain_core.outputsimportLLMResultfromlangchain_community.callbacks.streamlit.mutable_expanderimportMutableExpanderifTYPE_CHECKING:fromstreamlit.delta_generatorimportDeltaGeneratordef_convert_newlines(text:str)->str:"""Convert newline characters to markdown newline sequences (space, space, newline). """returntext.replace("\n"," \n")CHECKMARK_EMOJI="✅"THINKING_EMOJI=":thinking_face:"HISTORY_EMOJI=":books:"EXCEPTION_EMOJI="⚠️"
[docs]classLLMThoughtState(Enum):"""Enumerator of the LLMThought state."""# The LLM is thinking about what to do next. We don't know which tool we'll run.THINKING="THINKING"# The LLM has decided to run a tool. We don't have results from the tool yet.RUNNING_TOOL="RUNNING_TOOL"# We have results from the tool.COMPLETE="COMPLETE"
[docs]classToolRecord(NamedTuple):"""Tool record as a NamedTuple."""name:strinput_str:str
[docs]classLLMThoughtLabeler:""" Generates markdown labels for LLMThought containers. Pass a custom subclass of this to StreamlitCallbackHandler to override its default labeling logic. """
[docs]@staticmethoddefget_initial_label()->str:"""Return the markdown label for a new LLMThought that doesn't have an associated tool yet. """returnf"{THINKING_EMOJI} **Thinking...**"
[docs]@staticmethoddefget_tool_label(tool:ToolRecord,is_complete:bool)->str:"""Return the label for an LLMThought that has an associated tool. Parameters ---------- tool The tool's ToolRecord is_complete True if the thought is complete; False if the thought is still receiving input. Returns ------- The markdown label for the thought's container. """input=tool.input_strname=tool.nameemoji=CHECKMARK_EMOJIifis_completeelseTHINKING_EMOJIifname=="_Exception":emoji=EXCEPTION_EMOJIname="Parsing error"idx=min([60,len(input)])input=input[0:idx]iflen(tool.input_str)>idx:input=input+"..."input=input.replace("\n"," ")label=f"{emoji} **{name}:** {input}"returnlabel
[docs]@staticmethoddefget_history_label()->str:"""Return a markdown label for the special 'history' container that contains overflow thoughts. """returnf"{HISTORY_EMOJI} **History**"
[docs]@staticmethoddefget_final_agent_thought_label()->str:"""Return the markdown label for the agent's final thought - the "Now I have the answer" thought, that doesn't involve a tool. """returnf"{CHECKMARK_EMOJI} **Complete!**"
[docs]classLLMThought:"""A thought in the LLM's thought stream."""
[docs]def__init__(self,parent_container:DeltaGenerator,labeler:LLMThoughtLabeler,expanded:bool,collapse_on_complete:bool,):"""Initialize the LLMThought. Args: parent_container: The container we're writing into. labeler: The labeler to use for this thought. expanded: Whether the thought should be expanded by default. collapse_on_complete: Whether the thought should be collapsed. """self._container=MutableExpander(parent_container=parent_container,label=labeler.get_initial_label(),expanded=expanded,)self._state=LLMThoughtState.THINKINGself._llm_token_stream=""self._llm_token_writer_idx:Optional[int]=Noneself._last_tool:Optional[ToolRecord]=Noneself._collapse_on_complete=collapse_on_completeself._labeler=labeler
@propertydefcontainer(self)->MutableExpander:"""The container we're writing into."""returnself._container@propertydeflast_tool(self)->Optional[ToolRecord]:"""The last tool executed by this thought"""returnself._last_tooldef_reset_llm_token_stream(self)->None:self._llm_token_stream=""self._llm_token_writer_idx=None
[docs]defon_llm_new_token(self,token:str,**kwargs:Any)->None:# This is only called when the LLM is initialized with `streaming=True`self._llm_token_stream+=_convert_newlines(token)self._llm_token_writer_idx=self._container.markdown(self._llm_token_stream,index=self._llm_token_writer_idx)
[docs]defon_llm_end(self,response:LLMResult,**kwargs:Any)->None:# `response` is the concatenation of all the tokens received by the LLM.# If we're receiving streaming tokens from `on_llm_new_token`, this response# data is redundantself._reset_llm_token_stream()
[docs]defon_llm_error(self,error:BaseException,**kwargs:Any)->None:self._container.markdown("**LLM encountered an error...**")self._container.exception(error)
[docs]defon_tool_start(self,serialized:Dict[str,Any],input_str:str,**kwargs:Any)->None:# Called with the name of the tool we're about to run (in `serialized[name]`),# and its input. We change our container's label to be the tool name.self._state=LLMThoughtState.RUNNING_TOOLtool_name=serialized["name"]self._last_tool=ToolRecord(name=tool_name,input_str=input_str)self._container.update(new_label=self._labeler.get_tool_label(self._last_tool,is_complete=False))
[docs]defon_tool_error(self,error:BaseException,**kwargs:Any)->None:self._container.markdown("**Tool encountered an error...**")self._container.exception(error)
[docs]defon_agent_action(self,action:AgentAction,color:Optional[str]=None,**kwargs:Any)->Any:# Called when we're about to kick off a new tool. The `action` data# tells us the tool we're about to use, and the input we'll give it.# We don't output anything here, because we'll receive this same data# when `on_tool_start` is called immediately after.pass
[docs]defcomplete(self,final_label:Optional[str]=None)->None:"""Finish the thought."""iffinal_labelisNoneandself._state==LLMThoughtState.RUNNING_TOOL:assertself._last_toolisnotNone,("_last_tool should never be null when _state == RUNNING_TOOL")final_label=self._labeler.get_tool_label(self._last_tool,is_complete=True)self._state=LLMThoughtState.COMPLETEifself._collapse_on_complete:self._container.update(new_label=final_label,new_expanded=False)else:self._container.update(new_label=final_label)
[docs]defclear(self)->None:"""Remove the thought from the screen. A cleared thought can't be reused."""self._container.clear()
[docs]classStreamlitCallbackHandler(BaseCallbackHandler):"""Callback handler that writes to a Streamlit app."""
[docs]def__init__(self,parent_container:DeltaGenerator,*,max_thought_containers:int=4,expand_new_thoughts:bool=True,collapse_completed_thoughts:bool=True,thought_labeler:Optional[LLMThoughtLabeler]=None,):"""Create a StreamlitCallbackHandler instance. Parameters ---------- parent_container The `st.container` that will contain all the Streamlit elements that the Handler creates. max_thought_containers The max number of completed LLM thought containers to show at once. When this threshold is reached, a new thought will cause the oldest thoughts to be collapsed into a "History" expander. Defaults to 4. expand_new_thoughts Each LLM "thought" gets its own `st.expander`. This param controls whether that expander is expanded by default. Defaults to True. collapse_completed_thoughts If True, LLM thought expanders will be collapsed when completed. Defaults to True. thought_labeler An optional custom LLMThoughtLabeler instance. If unspecified, the handler will use the default thought labeling logic. Defaults to None. """self._parent_container=parent_containerself._history_parent=parent_container.container()self._history_container:Optional[MutableExpander]=Noneself._current_thought:Optional[LLMThought]=Noneself._completed_thoughts:List[LLMThought]=[]self._max_thought_containers=max(max_thought_containers,1)self._expand_new_thoughts=expand_new_thoughtsself._collapse_completed_thoughts=collapse_completed_thoughtsself._thought_labeler=thought_labelerorLLMThoughtLabeler()
def_require_current_thought(self)->LLMThought:"""Return our current LLMThought. Raise an error if we have no current thought. """ifself._current_thoughtisNone:raiseRuntimeError("Current LLMThought is unexpectedly None!")returnself._current_thoughtdef_get_last_completed_thought(self)->Optional[LLMThought]:"""Return our most recent completed LLMThought, or None if we don't have one."""iflen(self._completed_thoughts)>0:returnself._completed_thoughts[len(self._completed_thoughts)-1]returnNone@propertydef_num_thought_containers(self)->int:"""The number of 'thought containers' we're currently showing: the number of completed thought containers, the history container (if it exists), and the current thought container (if it exists). """count=len(self._completed_thoughts)ifself._history_containerisnotNone:count+=1ifself._current_thoughtisnotNone:count+=1returncountdef_complete_current_thought(self,final_label:Optional[str]=None)->None:"""Complete the current thought, optionally assigning it a new label. Add it to our _completed_thoughts list. """thought=self._require_current_thought()thought.complete(final_label)self._completed_thoughts.append(thought)self._current_thought=Nonedef_prune_old_thought_containers(self)->None:"""If we have too many thoughts onscreen, move older thoughts to the 'history container.' """while(self._num_thought_containers>self._max_thought_containersandlen(self._completed_thoughts)>0):# Create our history container if it doesn't exist, and if# max_thought_containers is > 1. (if max_thought_containers is 1, we don't# have room to show history.)ifself._history_containerisNoneandself._max_thought_containers>1:self._history_container=MutableExpander(self._history_parent,label=self._thought_labeler.get_history_label(),expanded=False,)oldest_thought=self._completed_thoughts.pop(0)ifself._history_containerisnotNone:self._history_container.markdown(oldest_thought.container.label)self._history_container.append_copy(oldest_thought.container)oldest_thought.clear()