[docs]classAgentExecutorIterator:"""Iterator for AgentExecutor."""
[docs]def__init__(self,agent_executor:AgentExecutor,inputs:Any,callbacks:Callbacks=None,*,tags:Optional[list[str]]=None,metadata:Optional[Dict[str,Any]]=None,run_name:Optional[str]=None,run_id:Optional[UUID]=None,include_run_info:bool=False,yield_actions:bool=False,):""" Initialize the AgentExecutorIterator with the given AgentExecutor, inputs, and optional callbacks. Args: agent_executor (AgentExecutor): The AgentExecutor to iterate over. inputs (Any): The inputs to the AgentExecutor. callbacks (Callbacks, optional): The callbacks to use during iteration. Defaults to None. tags (Optional[list[str]], optional): The tags to use during iteration. Defaults to None. metadata (Optional[Dict[str, Any]], optional): The metadata to use during iteration. Defaults to None. run_name (Optional[str], optional): The name of the run. Defaults to None. run_id (Optional[UUID], optional): The ID of the run. Defaults to None. include_run_info (bool, optional): Whether to include run info in the output. Defaults to False. yield_actions (bool, optional): Whether to yield actions as they are generated. Defaults to False. """self._agent_executor=agent_executorself.inputs=inputsself.callbacks=callbacksself.tags=tagsself.metadata=metadataself.run_name=run_nameself.run_id=run_idself.include_run_info=include_run_infoself.yield_actions=yield_actionsself.reset()
_inputs:Dict[str,str]callbacks:Callbackstags:Optional[list[str]]metadata:Optional[Dict[str,Any]]run_name:Optional[str]run_id:Optional[UUID]include_run_info:boolyield_actions:bool@propertydefinputs(self)->Dict[str,str]:"""The inputs to the AgentExecutor."""returnself._inputs@inputs.setterdefinputs(self,inputs:Any)->None:self._inputs=self.agent_executor.prep_inputs(inputs)@propertydefagent_executor(self)->AgentExecutor:"""The AgentExecutor to iterate over."""returnself._agent_executor@agent_executor.setterdefagent_executor(self,agent_executor:AgentExecutor)->None:self._agent_executor=agent_executor# force re-prep inputs in case agent_executor's prep_inputs fn changedself.inputs=self.inputs@propertydefname_to_tool_map(self)->Dict[str,BaseTool]:"""A mapping of tool names to tools."""return{tool.name:toolfortoolinself.agent_executor.tools}@propertydefcolor_mapping(self)->Dict[str,str]:"""A mapping of tool names to colors."""returnget_color_mapping([tool.namefortoolinself.agent_executor.tools],excluded_colors=["green","red"],)
[docs]defreset(self)->None:""" Reset the iterator to its initial state, clearing intermediate steps, iterations, and time elapsed. """logger.debug("(Re)setting AgentExecutorIterator to fresh state")self.intermediate_steps:list[tuple[AgentAction,str]]=[]self.iterations=0# maybe better to start these on the first __anext__ call?self.time_elapsed=0.0self.start_time=time.time()
[docs]defupdate_iterations(self)->None:""" Increment the number of iterations and update the time elapsed. """self.iterations+=1self.time_elapsed=time.time()-self.start_timelogger.debug(f"Agent Iterations: {self.iterations} ({self.time_elapsed:.2f}s elapsed)")
[docs]defmake_final_outputs(self,outputs:Dict[str,Any],run_manager:Union[CallbackManagerForChainRun,AsyncCallbackManagerForChainRun],)->AddableDict:# have access to intermediate steps by design in iterator,# so return only outputs may as well always be true.prepared_outputs=AddableDict(self.agent_executor.prep_outputs(self.inputs,outputs,return_only_outputs=True))ifself.include_run_info:prepared_outputs[RUN_KEY]=RunInfo(run_id=run_manager.run_id)returnprepared_outputs
def__iter__(self:"AgentExecutorIterator")->Iterator[AddableDict]:logger.debug("Initialising AgentExecutorIterator")self.reset()callback_manager=CallbackManager.configure(self.callbacks,self.agent_executor.callbacks,self.agent_executor.verbose,self.tags,self.agent_executor.tags,self.metadata,self.agent_executor.metadata,)run_manager=callback_manager.on_chain_start(dumpd(self.agent_executor),self.inputs,self.run_id,name=self.run_name,)try:whileself.agent_executor._should_continue(self.iterations,self.time_elapsed):# take the next step: this plans next action, executes it,# yielding action and observation as they are generatednext_step_seq:NextStepOutput=[]forchunkinself.agent_executor._iter_next_step(self.name_to_tool_map,self.color_mapping,self.inputs,self.intermediate_steps,run_manager,):next_step_seq.append(chunk)# if we're yielding actions, yield them as they come# do not yield AgentFinish, which will be handled belowifself.yield_actions:ifisinstance(chunk,AgentAction):yieldAddableDict(actions=[chunk],messages=chunk.messages)elifisinstance(chunk,AgentStep):yieldAddableDict(steps=[chunk],messages=chunk.messages)# convert iterator output to format handled by _process_next_step_outputnext_step=self.agent_executor._consume_next_step(next_step_seq)# update iterations and time elapsedself.update_iterations()# decide if this is the final outputoutput=self._process_next_step_output(next_step,run_manager)is_final="intermediate_step"notinoutput# yield the final output always# for backwards compat, yield int. output if not yielding actionsifnotself.yield_actionsoris_final:yieldoutput# if final output reached, stop iterationifis_final:returnexceptBaseExceptionase:run_manager.on_chain_error(e)raise# if we got here means we exhausted iterations or timeyieldself._stop(run_manager)asyncdef__aiter__(self)->AsyncIterator[AddableDict]:""" N.B. __aiter__ must be a normal method, so need to initialize async run manager on first __anext__ call where we can await it """logger.debug("Initialising AgentExecutorIterator (async)")self.reset()callback_manager=AsyncCallbackManager.configure(self.callbacks,self.agent_executor.callbacks,self.agent_executor.verbose,self.tags,self.agent_executor.tags,self.metadata,self.agent_executor.metadata,)run_manager=awaitcallback_manager.on_chain_start(dumpd(self.agent_executor),self.inputs,self.run_id,name=self.run_name,)try:asyncwithasyncio_timeout(self.agent_executor.max_execution_time):whileself.agent_executor._should_continue(self.iterations,self.time_elapsed):# take the next step: this plans next action, executes it,# yielding action and observation as they are generatednext_step_seq:NextStepOutput=[]asyncforchunkinself.agent_executor._aiter_next_step(self.name_to_tool_map,self.color_mapping,self.inputs,self.intermediate_steps,run_manager,):next_step_seq.append(chunk)# if we're yielding actions, yield them as they come# do not yield AgentFinish, which will be handled belowifself.yield_actions:ifisinstance(chunk,AgentAction):yieldAddableDict(actions=[chunk],messages=chunk.messages)elifisinstance(chunk,AgentStep):yieldAddableDict(steps=[chunk],messages=chunk.messages)# convert iterator output to format handled by _process_next_stepnext_step=self.agent_executor._consume_next_step(next_step_seq)# update iterations and time elapsedself.update_iterations()# decide if this is the final outputoutput=awaitself._aprocess_next_step_output(next_step,run_manager)is_final="intermediate_step"notinoutput# yield the final output always# for backwards compat, yield int. output if not yielding actionsifnotself.yield_actionsoris_final:yieldoutput# if final output reached, stop iterationifis_final:returnexcept(TimeoutError,asyncio.TimeoutError):yieldawaitself._astop(run_manager)returnexceptBaseExceptionase:awaitrun_manager.on_chain_error(e)raise# if we got here means we exhausted iterations or timeyieldawaitself._astop(run_manager)def_process_next_step_output(self,next_step_output:Union[AgentFinish,List[Tuple[AgentAction,str]]],run_manager:CallbackManagerForChainRun,)->AddableDict:""" Process the output of the next step, handling AgentFinish and tool return cases. """logger.debug("Processing output of Agent loop step")ifisinstance(next_step_output,AgentFinish):logger.debug("Hit AgentFinish: _return -> on_chain_end -> run final output logic")returnself._return(next_step_output,run_manager=run_manager)self.intermediate_steps.extend(next_step_output)logger.debug("Updated intermediate_steps with step output")# Check for tool returniflen(next_step_output)==1:next_step_action=next_step_output[0]tool_return=self.agent_executor._get_tool_return(next_step_action)iftool_returnisnotNone:returnself._return(tool_return,run_manager=run_manager)returnAddableDict(intermediate_step=next_step_output)asyncdef_aprocess_next_step_output(self,next_step_output:Union[AgentFinish,List[Tuple[AgentAction,str]]],run_manager:AsyncCallbackManagerForChainRun,)->AddableDict:""" Process the output of the next async step, handling AgentFinish and tool return cases. """logger.debug("Processing output of async Agent loop step")ifisinstance(next_step_output,AgentFinish):logger.debug("Hit AgentFinish: _areturn -> on_chain_end -> run final output logic")returnawaitself._areturn(next_step_output,run_manager=run_manager)self.intermediate_steps.extend(next_step_output)logger.debug("Updated intermediate_steps with step output")# Check for tool returniflen(next_step_output)==1:next_step_action=next_step_output[0]tool_return=self.agent_executor._get_tool_return(next_step_action)iftool_returnisnotNone:returnawaitself._areturn(tool_return,run_manager=run_manager)returnAddableDict(intermediate_step=next_step_output)def_stop(self,run_manager:CallbackManagerForChainRun)->AddableDict:""" Stop the iterator and raise a StopIteration exception with the stopped response. """logger.warning("Stopping agent prematurely due to triggering stop condition")# this manually constructs agent finish with output keyoutput=self.agent_executor._action_agent.return_stopped_response(self.agent_executor.early_stopping_method,self.intermediate_steps,**self.inputs,)returnself._return(output,run_manager=run_manager)asyncdef_astop(self,run_manager:AsyncCallbackManagerForChainRun)->AddableDict:""" Stop the async iterator and raise a StopAsyncIteration exception with the stopped response. """logger.warning("Stopping agent prematurely due to triggering stop condition")output=self.agent_executor._action_agent.return_stopped_response(self.agent_executor.early_stopping_method,self.intermediate_steps,**self.inputs,)returnawaitself._areturn(output,run_manager=run_manager)def_return(self,output:AgentFinish,run_manager:CallbackManagerForChainRun)->AddableDict:""" Return the final output of the iterator. """returned_output=self.agent_executor._return(output,self.intermediate_steps,run_manager=run_manager)returned_output["messages"]=output.messagesrun_manager.on_chain_end(returned_output)returnself.make_final_outputs(returned_output,run_manager)asyncdef_areturn(self,output:AgentFinish,run_manager:AsyncCallbackManagerForChainRun)->AddableDict:""" Return the final output of the async iterator. """returned_output=awaitself.agent_executor._areturn(output,self.intermediate_steps,run_manager=run_manager)returned_output["messages"]=output.messagesawaitrun_manager.on_chain_end(returned_output)returnself.make_final_outputs(returned_output,run_manager)