"""Chain that takes in an input and produces an action and action input."""from__future__importannotationsimportasyncioimportjsonimportloggingimporttimefromabcimportabstractmethodfrompathlibimportPathfromtypingimport(Any,AsyncIterator,Callable,Dict,Iterator,List,Optional,Sequence,Tuple,Union,cast,)importyamlfromlangchain_core._apiimportdeprecatedfromlangchain_core.agentsimportAgentAction,AgentFinish,AgentStepfromlangchain_core.callbacksimport(AsyncCallbackManagerForChainRun,AsyncCallbackManagerForToolRun,BaseCallbackManager,CallbackManagerForChainRun,CallbackManagerForToolRun,Callbacks,)fromlangchain_core.exceptionsimportOutputParserExceptionfromlangchain_core.language_modelsimportBaseLanguageModelfromlangchain_core.messagesimportBaseMessagefromlangchain_core.output_parsersimportBaseOutputParserfromlangchain_core.promptsimportBasePromptTemplatefromlangchain_core.prompts.few_shotimportFewShotPromptTemplatefromlangchain_core.prompts.promptimportPromptTemplatefromlangchain_core.runnablesimportRunnable,RunnableConfig,ensure_configfromlangchain_core.runnables.utilsimportAddableDictfromlangchain_core.toolsimportBaseToolfromlangchain_core.utils.inputimportget_color_mappingfrompydanticimportBaseModel,ConfigDict,model_validatorfromtyping_extensionsimportSelffromlangchain._api.deprecationimportAGENT_DEPRECATION_WARNINGfromlangchain.agents.agent_iteratorimportAgentExecutorIteratorfromlangchain.agents.agent_typesimportAgentTypefromlangchain.agents.toolsimportInvalidToolfromlangchain.chains.baseimportChainfromlangchain.chains.llmimportLLMChainfromlangchain.utilities.asyncioimportasyncio_timeoutlogger=logging.getLogger(__name__)
[docs]classBaseSingleActionAgent(BaseModel):"""Base Single Action Agent class."""@propertydefreturn_values(self)->List[str]:"""Return values of the agent."""return["output"]
[docs]@abstractmethoddefplan(self,intermediate_steps:List[Tuple[AgentAction,str]],callbacks:Callbacks=None,**kwargs:Any,)->Union[AgentAction,AgentFinish]:"""Given input, decided what to do. Args: intermediate_steps: Steps the LLM has taken to date, along with observations. callbacks: Callbacks to run. **kwargs: User inputs. Returns: Action specifying what tool to use. """
[docs]@abstractmethodasyncdefaplan(self,intermediate_steps:List[Tuple[AgentAction,str]],callbacks:Callbacks=None,**kwargs:Any,)->Union[AgentAction,AgentFinish]:"""Async given input, decided what to do. Args: intermediate_steps: Steps the LLM has taken to date, along with observations. callbacks: Callbacks to run. **kwargs: User inputs. Returns: Action specifying what tool to use. """
@property@abstractmethoddefinput_keys(self)->List[str]:"""Return the input keys. :meta private: """
[docs]defreturn_stopped_response(self,early_stopping_method:str,intermediate_steps:List[Tuple[AgentAction,str]],**kwargs:Any,)->AgentFinish:"""Return response when agent has been stopped due to max iterations. Args: early_stopping_method: Method to use for early stopping. intermediate_steps: Steps the LLM has taken to date, along with observations. **kwargs: User inputs. Returns: AgentFinish: Agent finish object. Raises: ValueError: If `early_stopping_method` is not supported. """ifearly_stopping_method=="force":# `force` just returns a constant stringreturnAgentFinish({"output":"Agent stopped due to iteration limit or time limit."},"")else:raiseValueError(f"Got unsupported early_stopping_method `{early_stopping_method}`")
[docs]@classmethoddeffrom_llm_and_tools(cls,llm:BaseLanguageModel,tools:Sequence[BaseTool],callback_manager:Optional[BaseCallbackManager]=None,**kwargs:Any,)->BaseSingleActionAgent:"""Construct an agent from an LLM and tools. Args: llm: Language model to use. tools: Tools to use. callback_manager: Callback manager to use. kwargs: Additional arguments. Returns: BaseSingleActionAgent: Agent object. """raiseNotImplementedError
@propertydef_agent_type(self)->str:"""Return Identifier of an agent type."""raiseNotImplementedErrordefdict(self,**kwargs:Any)->Dict:"""Return dictionary representation of agent. Returns: Dict: Dictionary representation of agent. """_dict=super().model_dump()try:_type=self._agent_typeexceptNotImplementedError:_type=Noneifisinstance(_type,AgentType):_dict["_type"]=str(_type.value)elif_typeisnotNone:_dict["_type"]=_typereturn_dict
[docs]defsave(self,file_path:Union[Path,str])->None:"""Save the agent. Args: file_path: Path to file to save the agent to. Example: .. code-block:: python # If working with agent executor agent.agent.save(file_path="path/agent.yaml") """# Convert file to Path object.ifisinstance(file_path,str):save_path=Path(file_path)else:save_path=file_pathdirectory_path=save_path.parentdirectory_path.mkdir(parents=True,exist_ok=True)# Fetch dictionary to saveagent_dict=self.dict()if"_type"notinagent_dict:raiseNotImplementedError(f"Agent {self} does not support saving")ifsave_path.suffix==".json":withopen(file_path,"w")asf:json.dump(agent_dict,f,indent=4)elifsave_path.suffix.endswith((".yaml",".yml")):withopen(file_path,"w")asf:yaml.dump(agent_dict,f,default_flow_style=False)else:raiseValueError(f"{save_path} must be json or yaml")
[docs]deftool_run_logging_kwargs(self)->Dict:"""Return logging kwargs for tool run."""return{}
[docs]classBaseMultiActionAgent(BaseModel):"""Base Multi Action Agent class."""@propertydefreturn_values(self)->List[str]:"""Return values of the agent."""return["output"]
[docs]@abstractmethoddefplan(self,intermediate_steps:List[Tuple[AgentAction,str]],callbacks:Callbacks=None,**kwargs:Any,)->Union[List[AgentAction],AgentFinish]:"""Given input, decided what to do. Args: intermediate_steps: Steps the LLM has taken to date, along with the observations. callbacks: Callbacks to run. **kwargs: User inputs. Returns: Actions specifying what tool to use. """
[docs]@abstractmethodasyncdefaplan(self,intermediate_steps:List[Tuple[AgentAction,str]],callbacks:Callbacks=None,**kwargs:Any,)->Union[List[AgentAction],AgentFinish]:"""Async given input, decided what to do. Args: intermediate_steps: Steps the LLM has taken to date, along with the observations. callbacks: Callbacks to run. **kwargs: User inputs. Returns: Actions specifying what tool to use. """
@property@abstractmethoddefinput_keys(self)->List[str]:"""Return the input keys. :meta private: """
[docs]defreturn_stopped_response(self,early_stopping_method:str,intermediate_steps:List[Tuple[AgentAction,str]],**kwargs:Any,)->AgentFinish:"""Return response when agent has been stopped due to max iterations. Args: early_stopping_method: Method to use for early stopping. intermediate_steps: Steps the LLM has taken to date, along with observations. **kwargs: User inputs. Returns: AgentFinish: Agent finish object. Raises: ValueError: If `early_stopping_method` is not supported. """ifearly_stopping_method=="force":# `force` just returns a constant stringreturnAgentFinish({"output":"Agent stopped due to max iterations."},"")else:raiseValueError(f"Got unsupported early_stopping_method `{early_stopping_method}`")
@propertydef_agent_type(self)->str:"""Return Identifier of an agent type."""raiseNotImplementedErrordefdict(self,**kwargs:Any)->Dict:"""Return dictionary representation of agent."""_dict=super().model_dump()try:_dict["_type"]=str(self._agent_type)exceptNotImplementedError:passreturn_dict
[docs]defsave(self,file_path:Union[Path,str])->None:"""Save the agent. Args: file_path: Path to file to save the agent to. Raises: NotImplementedError: If agent does not support saving. ValueError: If file_path is not json or yaml. Example: .. code-block:: python # If working with agent executor agent.agent.save(file_path="path/agent.yaml") """# Convert file to Path object.ifisinstance(file_path,str):save_path=Path(file_path)else:save_path=file_path# Fetch dictionary to saveagent_dict=self.dict()if"_type"notinagent_dict:raiseNotImplementedError(f"Agent {self} does not support saving.")directory_path=save_path.parentdirectory_path.mkdir(parents=True,exist_ok=True)ifsave_path.suffix==".json":withopen(file_path,"w")asf:json.dump(agent_dict,f,indent=4)elifsave_path.suffix.endswith((".yaml",".yml")):withopen(file_path,"w")asf:yaml.dump(agent_dict,f,default_flow_style=False)else:raiseValueError(f"{save_path} must be json or yaml")
[docs]deftool_run_logging_kwargs(self)->Dict:"""Return logging kwargs for tool run."""return{}
[docs]classAgentOutputParser(BaseOutputParser[Union[AgentAction,AgentFinish]]):"""Base class for parsing agent output into agent action/finish."""
[docs]@abstractmethoddefparse(self,text:str)->Union[AgentAction,AgentFinish]:"""Parse text into agent action/finish."""
[docs]classMultiActionAgentOutputParser(BaseOutputParser[Union[List[AgentAction],AgentFinish]]):"""Base class for parsing agent output into agent actions/finish. This is used for agents that can return multiple actions. """
[docs]@abstractmethoddefparse(self,text:str)->Union[List[AgentAction],AgentFinish]:"""Parse text into agent actions/finish. Args: text: Text to parse. Returns: Union[List[AgentAction], AgentFinish]: List of agent actions or agent finish. """
[docs]classRunnableAgent(BaseSingleActionAgent):"""Agent powered by Runnables."""runnable:Runnable[dict,Union[AgentAction,AgentFinish]]"""Runnable to call to get agent action."""input_keys_arg:List[str]=[]return_keys_arg:List[str]=[]stream_runnable:bool=True"""Whether to stream from the runnable or not. If True then underlying LLM is invoked in a streaming fashion to make it possible to get access to the individual LLM tokens when using stream_log with the Agent Executor. If False then LLM is invoked in a non-streaming fashion and individual LLM tokens will not be available in stream_log. """model_config=ConfigDict(arbitrary_types_allowed=True,)@propertydefreturn_values(self)->List[str]:"""Return values of the agent."""returnself.return_keys_arg@propertydefinput_keys(self)->List[str]:"""Return the input keys."""returnself.input_keys_arg
[docs]defplan(self,intermediate_steps:List[Tuple[AgentAction,str]],callbacks:Callbacks=None,**kwargs:Any,)->Union[AgentAction,AgentFinish]:"""Based on past history and current inputs, decide what to do. Args: intermediate_steps: Steps the LLM has taken to date, along with the observations. callbacks: Callbacks to run. **kwargs: User inputs. Returns: Action specifying what tool to use. """inputs={**kwargs,**{"intermediate_steps":intermediate_steps}}final_output:Any=Noneifself.stream_runnable:# Use streaming to make sure that the underlying LLM is invoked in a# streaming# fashion to make it possible to get access to the individual LLM tokens# when using stream_log with the Agent Executor.# Because the response from the plan is not a generator, we need to# accumulate the output into final output and return that.forchunkinself.runnable.stream(inputs,config={"callbacks":callbacks}):iffinal_outputisNone:final_output=chunkelse:final_output+=chunkelse:final_output=self.runnable.invoke(inputs,config={"callbacks":callbacks})returnfinal_output
[docs]asyncdefaplan(self,intermediate_steps:List[Tuple[AgentAction,str]],callbacks:Callbacks=None,**kwargs:Any,)->Union[AgentAction,AgentFinish,]:"""Async based on past history and current inputs, decide what to do. Args: intermediate_steps: Steps the LLM has taken to date, along with observations. callbacks: Callbacks to run. **kwargs: User inputs. Returns: Action specifying what tool to use. """inputs={**kwargs,**{"intermediate_steps":intermediate_steps}}final_output:Any=Noneifself.stream_runnable:# Use streaming to make sure that the underlying LLM is invoked in a# streaming# fashion to make it possible to get access to the individual LLM tokens# when using stream_log with the Agent Executor.# Because the response from the plan is not a generator, we need to# accumulate the output into final output and return that.asyncforchunkinself.runnable.astream(inputs,config={"callbacks":callbacks}):iffinal_outputisNone:final_output=chunkelse:final_output+=chunkelse:final_output=awaitself.runnable.ainvoke(inputs,config={"callbacks":callbacks})returnfinal_output
[docs]classRunnableMultiActionAgent(BaseMultiActionAgent):"""Agent powered by Runnables."""runnable:Runnable[dict,Union[List[AgentAction],AgentFinish]]"""Runnable to call to get agent actions."""input_keys_arg:List[str]=[]return_keys_arg:List[str]=[]stream_runnable:bool=True"""Whether to stream from the runnable or not. If True then underlying LLM is invoked in a streaming fashion to make it possible to get access to the individual LLM tokens when using stream_log with the Agent Executor. If False then LLM is invoked in a non-streaming fashion and individual LLM tokens will not be available in stream_log. """model_config=ConfigDict(arbitrary_types_allowed=True,)@propertydefreturn_values(self)->List[str]:"""Return values of the agent."""returnself.return_keys_arg@propertydefinput_keys(self)->List[str]:"""Return the input keys. Returns: List of input keys. """returnself.input_keys_arg
[docs]defplan(self,intermediate_steps:List[Tuple[AgentAction,str]],callbacks:Callbacks=None,**kwargs:Any,)->Union[List[AgentAction],AgentFinish,]:"""Based on past history and current inputs, decide what to do. Args: intermediate_steps: Steps the LLM has taken to date, along with the observations. callbacks: Callbacks to run. **kwargs: User inputs. Returns: Action specifying what tool to use. """inputs={**kwargs,**{"intermediate_steps":intermediate_steps}}final_output:Any=Noneifself.stream_runnable:# Use streaming to make sure that the underlying LLM is invoked in a# streaming# fashion to make it possible to get access to the individual LLM tokens# when using stream_log with the Agent Executor.# Because the response from the plan is not a generator, we need to# accumulate the output into final output and return that.forchunkinself.runnable.stream(inputs,config={"callbacks":callbacks}):iffinal_outputisNone:final_output=chunkelse:final_output+=chunkelse:final_output=self.runnable.invoke(inputs,config={"callbacks":callbacks})returnfinal_output
[docs]asyncdefaplan(self,intermediate_steps:List[Tuple[AgentAction,str]],callbacks:Callbacks=None,**kwargs:Any,)->Union[List[AgentAction],AgentFinish,]:"""Async based on past history and current inputs, decide what to do. Args: intermediate_steps: Steps the LLM has taken to date, along with observations. callbacks: Callbacks to run. **kwargs: User inputs. Returns: Action specifying what tool to use. """inputs={**kwargs,**{"intermediate_steps":intermediate_steps}}final_output:Any=Noneifself.stream_runnable:# Use streaming to make sure that the underlying LLM is invoked in a# streaming# fashion to make it possible to get access to the individual LLM tokens# when using stream_log with the Agent Executor.# Because the response from the plan is not a generator, we need to# accumulate the output into final output and return that.asyncforchunkinself.runnable.astream(inputs,config={"callbacks":callbacks}):iffinal_outputisNone:final_output=chunkelse:final_output+=chunkelse:final_output=awaitself.runnable.ainvoke(inputs,config={"callbacks":callbacks})returnfinal_output
[docs]@deprecated("0.1.0",message=AGENT_DEPRECATION_WARNING,removal="1.0",)classLLMSingleActionAgent(BaseSingleActionAgent):"""Base class for single action agents."""llm_chain:LLMChain"""LLMChain to use for agent."""output_parser:AgentOutputParser"""Output parser to use for agent."""stop:List[str]"""List of strings to stop on."""@propertydefinput_keys(self)->List[str]:"""Return the input keys. Returns: List of input keys. """returnlist(set(self.llm_chain.input_keys)-{"intermediate_steps"})defdict(self,**kwargs:Any)->Dict:"""Return dictionary representation of agent."""_dict=super().dict()del_dict["output_parser"]return_dict
[docs]defplan(self,intermediate_steps:List[Tuple[AgentAction,str]],callbacks:Callbacks=None,**kwargs:Any,)->Union[AgentAction,AgentFinish]:"""Given input, decided what to do. Args: intermediate_steps: Steps the LLM has taken to date, along with the observations. callbacks: Callbacks to run. **kwargs: User inputs. Returns: Action specifying what tool to use. """output=self.llm_chain.run(intermediate_steps=intermediate_steps,stop=self.stop,callbacks=callbacks,**kwargs,)returnself.output_parser.parse(output)
[docs]asyncdefaplan(self,intermediate_steps:List[Tuple[AgentAction,str]],callbacks:Callbacks=None,**kwargs:Any,)->Union[AgentAction,AgentFinish]:"""Async given input, decided what to do. Args: intermediate_steps: Steps the LLM has taken to date, along with observations. callbacks: Callbacks to run. **kwargs: User inputs. Returns: Action specifying what tool to use. """output=awaitself.llm_chain.arun(intermediate_steps=intermediate_steps,stop=self.stop,callbacks=callbacks,**kwargs,)returnself.output_parser.parse(output)
[docs]deftool_run_logging_kwargs(self)->Dict:"""Return logging kwargs for tool run."""return{"llm_prefix":"","observation_prefix":""iflen(self.stop)==0elseself.stop[0],}
[docs]@deprecated("0.1.0",message=AGENT_DEPRECATION_WARNING,removal="1.0",)classAgent(BaseSingleActionAgent):"""Agent that calls the language model and deciding the action. This is driven by a LLMChain. The prompt in the LLMChain MUST include a variable called "agent_scratchpad" where the agent can put its intermediary work. """llm_chain:LLMChain"""LLMChain to use for agent."""output_parser:AgentOutputParser"""Output parser to use for agent."""allowed_tools:Optional[List[str]]=None"""Allowed tools for the agent. If None, all tools are allowed."""defdict(self,**kwargs:Any)->Dict:"""Return dictionary representation of agent."""_dict=super().dict()del_dict["output_parser"]return_dict
@propertydefreturn_values(self)->List[str]:"""Return values of the agent."""return["output"]def_fix_text(self,text:str)->str:"""Fix the text. Args: text: Text to fix. Returns: str: Fixed text. """raiseValueError("fix_text not implemented for this agent.")@propertydef_stop(self)->List[str]:return[f"\n{self.observation_prefix.rstrip()}",f"\n\t{self.observation_prefix.rstrip()}",]def_construct_scratchpad(self,intermediate_steps:List[Tuple[AgentAction,str]])->Union[str,List[BaseMessage]]:"""Construct the scratchpad that lets the agent continue its thought process."""thoughts=""foraction,observationinintermediate_steps:thoughts+=action.logthoughts+=f"\n{self.observation_prefix}{observation}\n{self.llm_prefix}"returnthoughts
[docs]defplan(self,intermediate_steps:List[Tuple[AgentAction,str]],callbacks:Callbacks=None,**kwargs:Any,)->Union[AgentAction,AgentFinish]:"""Given input, decided what to do. Args: intermediate_steps: Steps the LLM has taken to date, along with observations. callbacks: Callbacks to run. **kwargs: User inputs. Returns: Action specifying what tool to use. """full_inputs=self.get_full_inputs(intermediate_steps,**kwargs)full_output=self.llm_chain.predict(callbacks=callbacks,**full_inputs)returnself.output_parser.parse(full_output)
[docs]asyncdefaplan(self,intermediate_steps:List[Tuple[AgentAction,str]],callbacks:Callbacks=None,**kwargs:Any,)->Union[AgentAction,AgentFinish]:"""Async given input, decided what to do. Args: intermediate_steps: Steps the LLM has taken to date, along with observations. callbacks: Callbacks to run. **kwargs: User inputs. Returns: Action specifying what tool to use. """full_inputs=self.get_full_inputs(intermediate_steps,**kwargs)full_output=awaitself.llm_chain.apredict(callbacks=callbacks,**full_inputs)agent_output=awaitself.output_parser.aparse(full_output)returnagent_output
[docs]defget_full_inputs(self,intermediate_steps:List[Tuple[AgentAction,str]],**kwargs:Any)->Dict[str,Any]:"""Create the full inputs for the LLMChain from intermediate steps. Args: intermediate_steps: Steps the LLM has taken to date, along with observations. **kwargs: User inputs. Returns: Dict[str, Any]: Full inputs for the LLMChain. """thoughts=self._construct_scratchpad(intermediate_steps)new_inputs={"agent_scratchpad":thoughts,"stop":self._stop}full_inputs={**kwargs,**new_inputs}returnfull_inputs
@propertydefinput_keys(self)->List[str]:"""Return the input keys. :meta private: """returnlist(set(self.llm_chain.input_keys)-{"agent_scratchpad"})@model_validator(mode="after")defvalidate_prompt(self)->Self:"""Validate that prompt matches format. Args: values: Values to validate. Returns: Dict: Validated values. Raises: ValueError: If `agent_scratchpad` is not in prompt.input_variables and prompt is not a FewShotPromptTemplate or a PromptTemplate. """prompt=self.llm_chain.promptif"agent_scratchpad"notinprompt.input_variables:logger.warning("`agent_scratchpad` should be a variable in prompt.input_variables."" Did not find it, so adding it at the end.")prompt.input_variables.append("agent_scratchpad")ifisinstance(prompt,PromptTemplate):prompt.template+="\n{agent_scratchpad}"elifisinstance(prompt,FewShotPromptTemplate):prompt.suffix+="\n{agent_scratchpad}"else:raiseValueError(f"Got unexpected prompt type {type(prompt)}")returnself@property@abstractmethoddefobservation_prefix(self)->str:"""Prefix to append the observation with."""@property@abstractmethoddefllm_prefix(self)->str:"""Prefix to append the LLM call with."""
[docs]@classmethod@abstractmethoddefcreate_prompt(cls,tools:Sequence[BaseTool])->BasePromptTemplate:"""Create a prompt for this class. Args: tools: Tools to use. Returns: BasePromptTemplate: Prompt template. """
@classmethoddef_validate_tools(cls,tools:Sequence[BaseTool])->None:"""Validate that appropriate tools are passed in. Args: tools: Tools to use. """pass@classmethod@abstractmethoddef_get_default_output_parser(cls,**kwargs:Any)->AgentOutputParser:"""Get default output parser for this class."""
[docs]@classmethoddeffrom_llm_and_tools(cls,llm:BaseLanguageModel,tools:Sequence[BaseTool],callback_manager:Optional[BaseCallbackManager]=None,output_parser:Optional[AgentOutputParser]=None,**kwargs:Any,)->Agent:"""Construct an agent from an LLM and tools. Args: llm: Language model to use. tools: Tools to use. callback_manager: Callback manager to use. output_parser: Output parser to use. kwargs: Additional arguments. Returns: Agent: Agent object. """cls._validate_tools(tools)llm_chain=LLMChain(llm=llm,prompt=cls.create_prompt(tools),callback_manager=callback_manager,)tool_names=[tool.namefortoolintools]_output_parser=output_parserorcls._get_default_output_parser()returncls(llm_chain=llm_chain,allowed_tools=tool_names,output_parser=_output_parser,**kwargs,)
[docs]defreturn_stopped_response(self,early_stopping_method:str,intermediate_steps:List[Tuple[AgentAction,str]],**kwargs:Any,)->AgentFinish:"""Return response when agent has been stopped due to max iterations. Args: early_stopping_method: Method to use for early stopping. intermediate_steps: Steps the LLM has taken to date, along with observations. **kwargs: User inputs. Returns: AgentFinish: Agent finish object. Raises: ValueError: If `early_stopping_method` is not in ['force', 'generate']. """ifearly_stopping_method=="force":# `force` just returns a constant stringreturnAgentFinish({"output":"Agent stopped due to iteration limit or time limit."},"")elifearly_stopping_method=="generate":# Generate does one final forward passthoughts=""foraction,observationinintermediate_steps:thoughts+=action.logthoughts+=(f"\n{self.observation_prefix}{observation}\n{self.llm_prefix}")# Adding to the previous steps, we now tell the LLM to make a final predthoughts+=("\n\nI now need to return a final answer based on the previous steps:")new_inputs={"agent_scratchpad":thoughts,"stop":self._stop}full_inputs={**kwargs,**new_inputs}full_output=self.llm_chain.predict(**full_inputs)# We try to extract a final answerparsed_output=self.output_parser.parse(full_output)ifisinstance(parsed_output,AgentFinish):# If we can extract, we send the correct stuffreturnparsed_outputelse:# If we can extract, but the tool is not the final tool,# we just return the full outputreturnAgentFinish({"output":full_output},full_output)else:raiseValueError("early_stopping_method should be one of `force` or `generate`, "f"got {early_stopping_method}")
[docs]deftool_run_logging_kwargs(self)->Dict:"""Return logging kwargs for tool run."""return{"llm_prefix":self.llm_prefix,"observation_prefix":self.observation_prefix,}
[docs]classExceptionTool(BaseTool):# type: ignore[override]"""Tool that just returns the query."""name:str="_Exception""""Name of the tool."""description:str="Exception tool""""Description of the tool."""def_run(self,query:str,run_manager:Optional[CallbackManagerForToolRun]=None,)->str:returnqueryasyncdef_arun(self,query:str,run_manager:Optional[AsyncCallbackManagerForToolRun]=None,)->str:returnquery
[docs]classAgentExecutor(Chain):"""Agent that is using tools."""agent:Union[BaseSingleActionAgent,BaseMultiActionAgent,Runnable]"""The agent to run for creating a plan and determining actions to take at each step of the execution loop."""tools:Sequence[BaseTool]"""The valid tools the agent can call."""return_intermediate_steps:bool=False"""Whether to return the agent's trajectory of intermediate steps at the end in addition to the final output."""max_iterations:Optional[int]=15"""The maximum number of steps to take before ending the execution loop. Setting to 'None' could lead to an infinite loop."""max_execution_time:Optional[float]=None"""The maximum amount of wall clock time to spend in the execution loop. """early_stopping_method:str="force""""The method to use for early stopping if the agent never returns `AgentFinish`. Either 'force' or 'generate'. `"force"` returns a string saying that it stopped because it met a time or iteration limit. `"generate"` calls the agent's LLM Chain one final time to generate a final answer based on the previous steps. """handle_parsing_errors:Union[bool,str,Callable[[OutputParserException],str]]=(False)"""How to handle errors raised by the agent's output parser. Defaults to `False`, which raises the error. If `true`, the error will be sent back to the LLM as an observation. If a string, the string itself will be sent to the LLM as an observation. If a callable function, the function will be called with the exception as an argument, and the result of that function will be passed to the agent as an observation. """trim_intermediate_steps:Union[int,Callable[[List[Tuple[AgentAction,str]]],List[Tuple[AgentAction,str]]]]=-1"""How to trim the intermediate steps before returning them. Defaults to -1, which means no trimming. """
[docs]@classmethoddeffrom_agent_and_tools(cls,agent:Union[BaseSingleActionAgent,BaseMultiActionAgent,Runnable],tools:Sequence[BaseTool],callbacks:Callbacks=None,**kwargs:Any,)->AgentExecutor:"""Create from agent and tools. Args: agent: Agent to use. tools: Tools to use. callbacks: Callbacks to use. kwargs: Additional arguments. Returns: AgentExecutor: Agent executor object. """returncls(agent=agent,tools=tools,callbacks=callbacks,**kwargs,)
@model_validator(mode="after")defvalidate_tools(self)->Self:"""Validate that tools are compatible with agent. Args: values: Values to validate. Returns: Dict: Validated values. Raises: ValueError: If allowed tools are different than provided tools. """agent=self.agenttools=self.toolsallowed_tools=agent.get_allowed_tools()# type: ignoreifallowed_toolsisnotNone:ifset(allowed_tools)!=set([tool.namefortoolintools]):raiseValueError(f"Allowed tools ({allowed_tools}) different than "f"provided tools ({[tool.namefortoolintools]})")returnself@model_validator(mode="before")@classmethoddefvalidate_runnable_agent(cls,values:Dict)->Any:"""Convert runnable to agent if passed in. Args: values: Values to validate. Returns: Dict: Validated values. """agent=values.get("agent")ifagentandisinstance(agent,Runnable):try:output_type=agent.OutputTypeexceptExceptionas_:multi_action=Falseelse:multi_action=output_type==Union[List[AgentAction],AgentFinish]stream_runnable=values.pop("stream_runnable",True)ifmulti_action:values["agent"]=RunnableMultiActionAgent(runnable=agent,stream_runnable=stream_runnable)else:values["agent"]=RunnableAgent(runnable=agent,stream_runnable=stream_runnable)returnvalues@propertydef_action_agent(self)->Union[BaseSingleActionAgent,BaseMultiActionAgent]:"""Type cast self.agent. If the `agent` attribute is a Runnable, it will be converted one of RunnableAgentType in the validate_runnable_agent root_validator. To support instantiating with a Runnable, here we explicitly cast the type to reflect the changes made in the root_validator. """ifisinstance(self.agent,Runnable):returncast(RunnableAgentType,self.agent)else:returnself.agent
[docs]defsave(self,file_path:Union[Path,str])->None:"""Raise error - saving not supported for Agent Executors. Args: file_path: Path to save to. Raises: ValueError: Saving not supported for agent executors. """raiseValueError("Saving not supported for agent executors. ""If you are trying to save the agent, please use the ""`.save_agent(...)`")
[docs]defsave_agent(self,file_path:Union[Path,str])->None:"""Save the underlying agent. Args: file_path: Path to save to. """returnself._action_agent.save(file_path)
[docs]defiter(self,inputs:Any,callbacks:Callbacks=None,*,include_run_info:bool=False,async_:bool=False,# arg kept for backwards compat, but ignored)->AgentExecutorIterator:"""Enables iteration over steps taken to reach final output. Args: inputs: Inputs to the agent. callbacks: Callbacks to run. include_run_info: Whether to include run info. async_: Whether to run async. (Ignored) Returns: AgentExecutorIterator: Agent executor iterator object. """returnAgentExecutorIterator(self,inputs,callbacks,tags=self.tags,include_run_info=include_run_info,)
@propertydefinput_keys(self)->List[str]:"""Return the input keys. :meta private: """returnself._action_agent.input_keys@propertydefoutput_keys(self)->List[str]:"""Return the singular output key. :meta private: """ifself.return_intermediate_steps:returnself._action_agent.return_values+["intermediate_steps"]else:returnself._action_agent.return_values
[docs]deflookup_tool(self,name:str)->BaseTool:"""Lookup tool by name. Args: name: Name of tool. Returns: BaseTool: Tool object. """return{tool.name:toolfortoolinself.tools}[name]
def_should_continue(self,iterations:int,time_elapsed:float)->bool:ifself.max_iterationsisnotNoneanditerations>=self.max_iterations:returnFalseif(self.max_execution_timeisnotNoneandtime_elapsed>=self.max_execution_time):returnFalsereturnTruedef_return(self,output:AgentFinish,intermediate_steps:list,run_manager:Optional[CallbackManagerForChainRun]=None,)->Dict[str,Any]:ifrun_manager:run_manager.on_agent_finish(output,color="green",verbose=self.verbose)final_output=output.return_valuesifself.return_intermediate_steps:final_output["intermediate_steps"]=intermediate_stepsreturnfinal_outputasyncdef_areturn(self,output:AgentFinish,intermediate_steps:list,run_manager:Optional[AsyncCallbackManagerForChainRun]=None,)->Dict[str,Any]:ifrun_manager:awaitrun_manager.on_agent_finish(output,color="green",verbose=self.verbose)final_output=output.return_valuesifself.return_intermediate_steps:final_output["intermediate_steps"]=intermediate_stepsreturnfinal_outputdef_consume_next_step(self,values:NextStepOutput)->Union[AgentFinish,List[Tuple[AgentAction,str]]]:ifisinstance(values[-1],AgentFinish):assertlen(values)==1returnvalues[-1]else:return[(a.action,a.observation)forainvaluesifisinstance(a,AgentStep)]def_take_next_step(self,name_to_tool_map:Dict[str,BaseTool],color_mapping:Dict[str,str],inputs:Dict[str,str],intermediate_steps:List[Tuple[AgentAction,str]],run_manager:Optional[CallbackManagerForChainRun]=None,)->Union[AgentFinish,List[Tuple[AgentAction,str]]]:returnself._consume_next_step([aforainself._iter_next_step(name_to_tool_map,color_mapping,inputs,intermediate_steps,run_manager,)])def_iter_next_step(self,name_to_tool_map:Dict[str,BaseTool],color_mapping:Dict[str,str],inputs:Dict[str,str],intermediate_steps:List[Tuple[AgentAction,str]],run_manager:Optional[CallbackManagerForChainRun]=None,)->Iterator[Union[AgentFinish,AgentAction,AgentStep]]:"""Take a single step in the thought-action-observation loop. Override this to take control of how the agent makes and acts on choices. """try:intermediate_steps=self._prepare_intermediate_steps(intermediate_steps)# Call the LLM to see what to do.output=self._action_agent.plan(intermediate_steps,callbacks=run_manager.get_child()ifrun_managerelseNone,**inputs,)exceptOutputParserExceptionase:ifisinstance(self.handle_parsing_errors,bool):raise_error=notself.handle_parsing_errorselse:raise_error=Falseifraise_error:raiseValueError("An output parsing error occurred. ""In order to pass this error back to the agent and have it try ""again, pass `handle_parsing_errors=True` to the AgentExecutor. "f"This is the error: {str(e)}")text=str(e)ifisinstance(self.handle_parsing_errors,bool):ife.send_to_llm:observation=str(e.observation)text=str(e.llm_output)else:observation="Invalid or incomplete response"elifisinstance(self.handle_parsing_errors,str):observation=self.handle_parsing_errorselifcallable(self.handle_parsing_errors):observation=self.handle_parsing_errors(e)else:raiseValueError("Got unexpected type of `handle_parsing_errors`")output=AgentAction("_Exception",observation,text)ifrun_manager:run_manager.on_agent_action(output,color="green")tool_run_kwargs=self._action_agent.tool_run_logging_kwargs()observation=ExceptionTool().run(output.tool_input,verbose=self.verbose,color=None,callbacks=run_manager.get_child()ifrun_managerelseNone,**tool_run_kwargs,)yieldAgentStep(action=output,observation=observation)return# If the tool chosen is the finishing tool, then we end and return.ifisinstance(output,AgentFinish):yieldoutputreturnactions:List[AgentAction]ifisinstance(output,AgentAction):actions=[output]else:actions=outputforagent_actioninactions:yieldagent_actionforagent_actioninactions:yieldself._perform_agent_action(name_to_tool_map,color_mapping,agent_action,run_manager)def_perform_agent_action(self,name_to_tool_map:Dict[str,BaseTool],color_mapping:Dict[str,str],agent_action:AgentAction,run_manager:Optional[CallbackManagerForChainRun]=None,)->AgentStep:ifrun_manager:run_manager.on_agent_action(agent_action,color="green")# Otherwise we lookup the toolifagent_action.toolinname_to_tool_map:tool=name_to_tool_map[agent_action.tool]return_direct=tool.return_directcolor=color_mapping[agent_action.tool]tool_run_kwargs=self._action_agent.tool_run_logging_kwargs()ifreturn_direct:tool_run_kwargs["llm_prefix"]=""# We then call the tool on the tool input to get an observationobservation=tool.run(agent_action.tool_input,verbose=self.verbose,color=color,callbacks=run_manager.get_child()ifrun_managerelseNone,**tool_run_kwargs,)else:tool_run_kwargs=self._action_agent.tool_run_logging_kwargs()observation=InvalidTool().run({"requested_tool_name":agent_action.tool,"available_tool_names":list(name_to_tool_map.keys()),},verbose=self.verbose,color=None,callbacks=run_manager.get_child()ifrun_managerelseNone,**tool_run_kwargs,)returnAgentStep(action=agent_action,observation=observation)asyncdef_atake_next_step(self,name_to_tool_map:Dict[str,BaseTool],color_mapping:Dict[str,str],inputs:Dict[str,str],intermediate_steps:List[Tuple[AgentAction,str]],run_manager:Optional[AsyncCallbackManagerForChainRun]=None,)->Union[AgentFinish,List[Tuple[AgentAction,str]]]:returnself._consume_next_step([aasyncforainself._aiter_next_step(name_to_tool_map,color_mapping,inputs,intermediate_steps,run_manager,)])asyncdef_aiter_next_step(self,name_to_tool_map:Dict[str,BaseTool],color_mapping:Dict[str,str],inputs:Dict[str,str],intermediate_steps:List[Tuple[AgentAction,str]],run_manager:Optional[AsyncCallbackManagerForChainRun]=None,)->AsyncIterator[Union[AgentFinish,AgentAction,AgentStep]]:"""Take a single step in the thought-action-observation loop. Override this to take control of how the agent makes and acts on choices. """try:intermediate_steps=self._prepare_intermediate_steps(intermediate_steps)# Call the LLM to see what to do.output=awaitself._action_agent.aplan(intermediate_steps,callbacks=run_manager.get_child()ifrun_managerelseNone,**inputs,)exceptOutputParserExceptionase:ifisinstance(self.handle_parsing_errors,bool):raise_error=notself.handle_parsing_errorselse:raise_error=Falseifraise_error:raiseValueError("An output parsing error occurred. ""In order to pass this error back to the agent and have it try ""again, pass `handle_parsing_errors=True` to the AgentExecutor. "f"This is the error: {str(e)}")text=str(e)ifisinstance(self.handle_parsing_errors,bool):ife.send_to_llm:observation=str(e.observation)text=str(e.llm_output)else:observation="Invalid or incomplete response"elifisinstance(self.handle_parsing_errors,str):observation=self.handle_parsing_errorselifcallable(self.handle_parsing_errors):observation=self.handle_parsing_errors(e)else:raiseValueError("Got unexpected type of `handle_parsing_errors`")output=AgentAction("_Exception",observation,text)tool_run_kwargs=self._action_agent.tool_run_logging_kwargs()observation=awaitExceptionTool().arun(output.tool_input,verbose=self.verbose,color=None,callbacks=run_manager.get_child()ifrun_managerelseNone,**tool_run_kwargs,)yieldAgentStep(action=output,observation=observation)return# If the tool chosen is the finishing tool, then we end and return.ifisinstance(output,AgentFinish):yieldoutputreturnactions:List[AgentAction]ifisinstance(output,AgentAction):actions=[output]else:actions=outputforagent_actioninactions:yieldagent_action# Use asyncio.gather to run multiple tool.arun() calls concurrentlyresult=awaitasyncio.gather(*[self._aperform_agent_action(name_to_tool_map,color_mapping,agent_action,run_manager)foragent_actioninactions],)# TODO This could yield each result as it becomes availableforchunkinresult:yieldchunkasyncdef_aperform_agent_action(self,name_to_tool_map:Dict[str,BaseTool],color_mapping:Dict[str,str],agent_action:AgentAction,run_manager:Optional[AsyncCallbackManagerForChainRun]=None,)->AgentStep:ifrun_manager:awaitrun_manager.on_agent_action(agent_action,verbose=self.verbose,color="green")# Otherwise we lookup the toolifagent_action.toolinname_to_tool_map:tool=name_to_tool_map[agent_action.tool]return_direct=tool.return_directcolor=color_mapping[agent_action.tool]tool_run_kwargs=self._action_agent.tool_run_logging_kwargs()ifreturn_direct:tool_run_kwargs["llm_prefix"]=""# We then call the tool on the tool input to get an observationobservation=awaittool.arun(agent_action.tool_input,verbose=self.verbose,color=color,callbacks=run_manager.get_child()ifrun_managerelseNone,**tool_run_kwargs,)else:tool_run_kwargs=self._action_agent.tool_run_logging_kwargs()observation=awaitInvalidTool().arun({"requested_tool_name":agent_action.tool,"available_tool_names":list(name_to_tool_map.keys()),},verbose=self.verbose,color=None,callbacks=run_manager.get_child()ifrun_managerelseNone,**tool_run_kwargs,)returnAgentStep(action=agent_action,observation=observation)def_call(self,inputs:Dict[str,str],run_manager:Optional[CallbackManagerForChainRun]=None,)->Dict[str,Any]:"""Run text through and get agent response."""# Construct a mapping of tool name to tool for easy lookupname_to_tool_map={tool.name:toolfortoolinself.tools}# We construct a mapping from each tool to a color, used for logging.color_mapping=get_color_mapping([tool.namefortoolinself.tools],excluded_colors=["green","red"])intermediate_steps:List[Tuple[AgentAction,str]]=[]# Let's start tracking the number of iterations and time elapsediterations=0time_elapsed=0.0start_time=time.time()# We now enter the agent loop (until it returns something).whileself._should_continue(iterations,time_elapsed):next_step_output=self._take_next_step(name_to_tool_map,color_mapping,inputs,intermediate_steps,run_manager=run_manager,)ifisinstance(next_step_output,AgentFinish):returnself._return(next_step_output,intermediate_steps,run_manager=run_manager)intermediate_steps.extend(next_step_output)iflen(next_step_output)==1:next_step_action=next_step_output[0]# See if tool should return directlytool_return=self._get_tool_return(next_step_action)iftool_returnisnotNone:returnself._return(tool_return,intermediate_steps,run_manager=run_manager)iterations+=1time_elapsed=time.time()-start_timeoutput=self._action_agent.return_stopped_response(self.early_stopping_method,intermediate_steps,**inputs)returnself._return(output,intermediate_steps,run_manager=run_manager)asyncdef_acall(self,inputs:Dict[str,str],run_manager:Optional[AsyncCallbackManagerForChainRun]=None,)->Dict[str,str]:"""Async run text through and get agent response."""# Construct a mapping of tool name to tool for easy lookupname_to_tool_map={tool.name:toolfortoolinself.tools}# We construct a mapping from each tool to a color, used for logging.color_mapping=get_color_mapping([tool.namefortoolinself.tools],excluded_colors=["green"])intermediate_steps:List[Tuple[AgentAction,str]]=[]# Let's start tracking the number of iterations and time elapsediterations=0time_elapsed=0.0start_time=time.time()# We now enter the agent loop (until it returns something).try:asyncwithasyncio_timeout(self.max_execution_time):whileself._should_continue(iterations,time_elapsed):next_step_output=awaitself._atake_next_step(name_to_tool_map,color_mapping,inputs,intermediate_steps,run_manager=run_manager,)ifisinstance(next_step_output,AgentFinish):returnawaitself._areturn(next_step_output,intermediate_steps,run_manager=run_manager,)intermediate_steps.extend(next_step_output)iflen(next_step_output)==1:next_step_action=next_step_output[0]# See if tool should return directlytool_return=self._get_tool_return(next_step_action)iftool_returnisnotNone:returnawaitself._areturn(tool_return,intermediate_steps,run_manager=run_manager)iterations+=1time_elapsed=time.time()-start_timeoutput=self._action_agent.return_stopped_response(self.early_stopping_method,intermediate_steps,**inputs)returnawaitself._areturn(output,intermediate_steps,run_manager=run_manager)except(TimeoutError,asyncio.TimeoutError):# stop early when interrupted by the async timeoutoutput=self._action_agent.return_stopped_response(self.early_stopping_method,intermediate_steps,**inputs)returnawaitself._areturn(output,intermediate_steps,run_manager=run_manager)def_get_tool_return(self,next_step_output:Tuple[AgentAction,str])->Optional[AgentFinish]:"""Check if the tool is a returning tool."""agent_action,observation=next_step_outputname_to_tool_map={tool.name:toolfortoolinself.tools}return_value_key="output"iflen(self._action_agent.return_values)>0:return_value_key=self._action_agent.return_values[0]# Invalid tools won't be in the map, so we return False.ifagent_action.toolinname_to_tool_map:ifname_to_tool_map[agent_action.tool].return_direct:returnAgentFinish({return_value_key:observation},"",)returnNonedef_prepare_intermediate_steps(self,intermediate_steps:List[Tuple[AgentAction,str]])->List[Tuple[AgentAction,str]]:if(isinstance(self.trim_intermediate_steps,int)andself.trim_intermediate_steps>0):returnintermediate_steps[-self.trim_intermediate_steps:]elifcallable(self.trim_intermediate_steps):returnself.trim_intermediate_steps(intermediate_steps)else:returnintermediate_steps
[docs]defstream(self,input:Union[Dict[str,Any],Any],config:Optional[RunnableConfig]=None,**kwargs:Any,)->Iterator[AddableDict]:"""Enables streaming over steps taken to reach final output. Args: input: Input to the agent. config: Config to use. kwargs: Additional arguments. Yields: AddableDict: Addable dictionary. """config=ensure_config(config)iterator=AgentExecutorIterator(self,input,config.get("callbacks"),tags=config.get("tags"),metadata=config.get("metadata"),run_name=config.get("run_name"),run_id=config.get("run_id"),yield_actions=True,**kwargs,)forstepiniterator:yieldstep
[docs]asyncdefastream(self,input:Union[Dict[str,Any],Any],config:Optional[RunnableConfig]=None,**kwargs:Any,)->AsyncIterator[AddableDict]:"""Async enables streaming over steps taken to reach final output. Args: input: Input to the agent. config: Config to use. kwargs: Additional arguments. Yields: AddableDict: Addable dictionary. """config=ensure_config(config)iterator=AgentExecutorIterator(self,input,config.get("callbacks"),tags=config.get("tags"),metadata=config.get("metadata"),run_name=config.get("run_name"),run_id=config.get("run_id"),yield_actions=True,**kwargs,)asyncforstepiniterator:yieldstep