[docs]classPlanAndExecute(Chain):"""Plan and execute a chain of steps."""planner:BasePlanner"""The planner to use."""executor:BaseExecutor"""The executor to use."""step_container:BaseStepContainer=Field(default_factory=ListStepContainer)"""The step container to use."""input_key:str="input"output_key:str="output"@propertydefinput_keys(self)->List[str]:return[self.input_key]@propertydefoutput_keys(self)->List[str]:return[self.output_key]def_call(self,inputs:Dict[str,Any],run_manager:Optional[CallbackManagerForChainRun]=None,)->Dict[str,Any]:plan=self.planner.plan(inputs,callbacks=run_manager.get_child()ifrun_managerelseNone,)ifrun_manager:run_manager.on_text(str(plan),verbose=self.verbose)forstepinplan.steps:_new_inputs={"previous_steps":self.step_container,"current_step":step,"objective":inputs[self.input_key],}new_inputs={**_new_inputs,**inputs}response=self.executor.step(new_inputs,callbacks=run_manager.get_child()ifrun_managerelseNone,)ifrun_manager:run_manager.on_text(f"*****\n\nStep: {step.value}",verbose=self.verbose)run_manager.on_text(f"\n\nResponse: {response.response}",verbose=self.verbose)self.step_container.add_step(step,response)return{self.output_key:self.step_container.get_final_response()}asyncdef_acall(self,inputs:Dict[str,Any],run_manager:Optional[AsyncCallbackManagerForChainRun]=None,)->Dict[str,Any]:plan=awaitself.planner.aplan(inputs,callbacks=run_manager.get_child()ifrun_managerelseNone,)ifrun_manager:awaitrun_manager.on_text(str(plan),verbose=self.verbose)forstepinplan.steps:_new_inputs={"previous_steps":self.step_container,"current_step":step,"objective":inputs[self.input_key],}new_inputs={**_new_inputs,**inputs}response=awaitself.executor.astep(new_inputs,callbacks=run_manager.get_child()ifrun_managerelseNone,)ifrun_manager:awaitrun_manager.on_text(f"*****\n\nStep: {step.value}",verbose=self.verbose)awaitrun_manager.on_text(f"\n\nResponse: {response.response}",verbose=self.verbose)self.step_container.add_step(step,response)return{self.output_key:self.step_container.get_final_response()}