[docs]classOpenAIAssistantFinish(AgentFinish):"""AgentFinish with run and thread metadata. Parameters: run_id: Run id. thread_id: Thread id. """run_id:strthread_id:str@classmethoddefis_lc_serializable(cls)->bool:"""Check if the class is serializable by LangChain. Returns: False """returnFalse
[docs]classOpenAIAssistantAction(AgentAction):"""AgentAction with info needed to submit custom tool output to existing run. Parameters: tool_call_id: Tool call id. run_id: Run id. thread_id: Thread id """tool_call_id:strrun_id:strthread_id:str@classmethoddefis_lc_serializable(cls)->bool:"""Check if the class is serializable by LangChain. Returns: False """returnFalse
def_get_openai_client()->openai.OpenAI:try:importopenaireturnopenai.OpenAI()exceptImportErrorase:raiseImportError("Unable to import openai, please install with `pip install openai`.")fromeexceptAttributeErrorase:raiseAttributeError("Please make sure you are using a v1.1-compatible version of openai. You "'can install with `pip install "openai>=1.1"`.')fromedef_get_openai_async_client()->openai.AsyncOpenAI:try:importopenaireturnopenai.AsyncOpenAI()exceptImportErrorase:raiseImportError("Unable to import openai, please install with `pip install openai`.")fromeexceptAttributeErrorase:raiseAttributeError("Please make sure you are using a v1.1-compatible version of openai. You "'can install with `pip install "openai>=1.1"`.')fromedef_is_assistants_builtin_tool(tool:Union[Dict[str,Any],Type[BaseModel],Callable,BaseTool],)->bool:"""Determine if tool corresponds to OpenAI Assistants built-in."""assistants_builtin_tools=("code_interpreter","retrieval")return(isinstance(tool,dict)and("type"intool)and(tool["type"]inassistants_builtin_tools))def_get_assistants_tool(tool:Union[Dict[str,Any],Type[BaseModel],Callable,BaseTool],)->Dict[str,Any]:"""Convert a raw function/class to an OpenAI tool. Note that OpenAI assistants supports several built-in tools, such as "code_interpreter" and "retrieval." """if_is_assistants_builtin_tool(tool):returntool# type: ignoreelse:returnconvert_to_openai_tool(tool)OutputType=Union[List[OpenAIAssistantAction],OpenAIAssistantFinish,List["ThreadMessage"],List["RequiredActionFunctionToolCall"],]
[docs]classOpenAIAssistantRunnable(RunnableSerializable[Dict,OutputType]):"""Run an OpenAI Assistant. Example using OpenAI tools: .. code-block:: python from langchain_experimental.openai_assistant import OpenAIAssistantRunnable interpreter_assistant = OpenAIAssistantRunnable.create_assistant( name="langchain assistant", instructions="You are a personal math tutor. Write and run code to answer math questions.", tools=[{"type": "code_interpreter"}], model="gpt-4-1106-preview" ) output = interpreter_assistant.invoke({"content": "What's 10 - 4 raised to the 2.7"}) Example using custom tools and AgentExecutor: .. code-block:: python from langchain_experimental.openai_assistant import OpenAIAssistantRunnable from langchain.agents import AgentExecutor from langchain.tools import E2BDataAnalysisTool tools = [E2BDataAnalysisTool(api_key="...")] agent = OpenAIAssistantRunnable.create_assistant( name="langchain assistant e2b tool", instructions="You are a personal math tutor. Write and run code to answer math questions.", tools=tools, model="gpt-4-1106-preview", as_agent=True ) agent_executor = AgentExecutor(agent=agent, tools=tools) agent_executor.invoke({"content": "What's 10 - 4 raised to the 2.7"}) Example using custom tools and custom execution: .. code-block:: python from langchain_experimental.openai_assistant import OpenAIAssistantRunnable from langchain.agents import AgentExecutor from langchain_core.agents import AgentFinish from langchain.tools import E2BDataAnalysisTool tools = [E2BDataAnalysisTool(api_key="...")] agent = OpenAIAssistantRunnable.create_assistant( name="langchain assistant e2b tool", instructions="You are a personal math tutor. Write and run code to answer math questions.", tools=tools, model="gpt-4-1106-preview", as_agent=True ) def execute_agent(agent, tools, input): tool_map = {tool.name: tool for tool in tools} response = agent.invoke(input) while not isinstance(response, AgentFinish): tool_outputs = [] for action in response: tool_output = tool_map[action.tool].invoke(action.tool_input) tool_outputs.append({"output": tool_output, "tool_call_id": action.tool_call_id}) response = agent.invoke( { "tool_outputs": tool_outputs, "run_id": action.run_id, "thread_id": action.thread_id } ) return response response = execute_agent(agent, tools, {"content": "What's 10 - 4 raised to the 2.7"}) next_response = execute_agent(agent, tools, {"content": "now add 17.241", "thread_id": response.thread_id}) """# noqa: E501client:Any=Field(default_factory=_get_openai_client)"""OpenAI or AzureOpenAI client."""async_client:Any=None"""OpenAI or AzureOpenAI async client."""assistant_id:str"""OpenAI assistant id."""check_every_ms:float=1_000.0"""Frequency with which to check run progress in ms."""as_agent:bool=False"""Use as a LangChain agent, compatible with the AgentExecutor."""@root_validator(pre=False,skip_on_failure=True)defvalidate_async_client(cls,values:dict)->dict:ifvalues["async_client"]isNone:importopenaiapi_key=values["client"].api_keyvalues["async_client"]=openai.AsyncOpenAI(api_key=api_key)returnvalues
[docs]@classmethoddefcreate_assistant(cls,name:str,instructions:str,tools:Sequence[Union[BaseTool,dict]],model:str,*,client:Optional[Union[openai.OpenAI,openai.AzureOpenAI]]=None,**kwargs:Any,)->OpenAIAssistantRunnable:"""Create an OpenAI Assistant and instantiate the Runnable. Args: name: Assistant name. instructions: Assistant instructions. tools: Assistant tools. Can be passed in OpenAI format or as BaseTools. model: Assistant model to use. client: OpenAI or AzureOpenAI client. Will create a default OpenAI client if not specified. kwargs: Additional arguments. Returns: OpenAIAssistantRunnable configured to run using the created assistant. """client=clientor_get_openai_client()assistant=client.beta.assistants.create(name=name,instructions=instructions,tools=[_get_assistants_tool(tool)fortoolintools],# type: ignoremodel=model,)returncls(assistant_id=assistant.id,client=client,**kwargs)
[docs]definvoke(self,input:dict,config:Optional[RunnableConfig]=None)->OutputType:"""Invoke assistant. Args: input: Runnable input dict that can have: content: User message when starting a new run. thread_id: Existing thread to use. run_id: Existing run to use. Should only be supplied when providing the tool output for a required action after an initial invocation. message_metadata: Metadata to associate with new message. thread_metadata: Metadata to associate with new thread. Only relevant when new thread being created. instructions: Additional run instructions. model: Override Assistant model for this run. tools: Override Assistant tools for this run. run_metadata: Metadata to associate with new run. config: Runnable config. Defaults to None. Return: If self.as_agent, will return Union[List[OpenAIAssistantAction], OpenAIAssistantFinish]. Otherwise, will return OpenAI types Union[List[ThreadMessage], List[RequiredActionFunctionToolCall]]. """config=ensure_config(config)callback_manager=CallbackManager.configure(inheritable_callbacks=config.get("callbacks"),inheritable_tags=config.get("tags"),inheritable_metadata=config.get("metadata"),)run_manager=callback_manager.on_chain_start(dumpd(self),input,name=config.get("run_name"))try:# Being run within AgentExecutor and there are tool outputs to submit.ifself.as_agentandinput.get("intermediate_steps"):tool_outputs=self._parse_intermediate_steps(input["intermediate_steps"])run=self.client.beta.threads.runs.submit_tool_outputs(**tool_outputs)# Starting a new thread and a new run.elif"thread_id"notininput:thread={"messages":[{"role":"user","content":input["content"],"metadata":input.get("message_metadata"),}],"metadata":input.get("thread_metadata"),}run=self._create_thread_and_run(input,thread)# Starting a new run in an existing thread.elif"run_id"notininput:_=self.client.beta.threads.messages.create(input["thread_id"],content=input["content"],role="user",metadata=input.get("message_metadata"),)run=self._create_run(input)# Submitting tool outputs to an existing run, outside the AgentExecutor# framework.else:run=self.client.beta.threads.runs.submit_tool_outputs(**input)run=self._wait_for_run(run.id,run.thread_id)exceptBaseExceptionase:run_manager.on_chain_error(e)raiseetry:response=self._get_response(run)exceptBaseExceptionase:run_manager.on_chain_error(e,metadata=run.dict())raiseeelse:run_manager.on_chain_end(response)returnresponse
[docs]@classmethodasyncdefacreate_assistant(cls,name:str,instructions:str,tools:Sequence[Union[BaseTool,dict]],model:str,*,async_client:Optional[Union[openai.AsyncOpenAI,openai.AsyncAzureOpenAI]]=None,**kwargs:Any,)->OpenAIAssistantRunnable:"""Async create an AsyncOpenAI Assistant and instantiate the Runnable. Args: name: Assistant name. instructions: Assistant instructions. tools: Assistant tools. Can be passed in OpenAI format or as BaseTools. model: Assistant model to use. async_client: AsyncOpenAI client. Will create default async_client if not specified. Returns: AsyncOpenAIAssistantRunnable configured to run using the created assistant. """async_client=async_clientor_get_openai_async_client()openai_tools=[_get_assistants_tool(tool)fortoolintools]assistant=awaitasync_client.beta.assistants.create(name=name,instructions=instructions,tools=openai_tools,# type: ignoremodel=model,)returncls(assistant_id=assistant.id,async_client=async_client,**kwargs)
[docs]asyncdefainvoke(self,input:dict,config:Optional[RunnableConfig]=None,**kwargs:Any)->OutputType:"""Async invoke assistant. Args: input: Runnable input dict that can have: content: User message when starting a new run. thread_id: Existing thread to use. run_id: Existing run to use. Should only be supplied when providing the tool output for a required action after an initial invocation. message_metadata: Metadata to associate with a new message. thread_metadata: Metadata to associate with new thread. Only relevant when a new thread is created. instructions: Additional run instructions. model: Override Assistant model for this run. tools: Override Assistant tools for this run. run_metadata: Metadata to associate with new run. config: Runnable config. Defaults to None. kwargs: Additional arguments. Return: If self.as_agent, will return Union[List[OpenAIAssistantAction], OpenAIAssistantFinish]. Otherwise, will return OpenAI types Union[List[ThreadMessage], List[RequiredActionFunctionToolCall]]. """config=configor{}callback_manager=CallbackManager.configure(inheritable_callbacks=config.get("callbacks"),inheritable_tags=config.get("tags"),inheritable_metadata=config.get("metadata"),)run_manager=callback_manager.on_chain_start(dumpd(self),input,name=config.get("run_name"))try:# Being run within AgentExecutor and there are tool outputs to submit.ifself.as_agentandinput.get("intermediate_steps"):tool_outputs=awaitself._aparse_intermediate_steps(input["intermediate_steps"])run=awaitself.async_client.beta.threads.runs.submit_tool_outputs(**tool_outputs)# Starting a new thread and a new run.elif"thread_id"notininput:thread={"messages":[{"role":"user","content":input["content"],"metadata":input.get("message_metadata"),}],"metadata":input.get("thread_metadata"),}run=awaitself._acreate_thread_and_run(input,thread)# Starting a new run in an existing thread.elif"run_id"notininput:_=awaitself.async_client.beta.threads.messages.create(input["thread_id"],content=input["content"],role="user",metadata=input.get("message_metadata"),)run=awaitself._acreate_run(input)# Submitting tool outputs to an existing run, outside the AgentExecutor# framework.else:run=awaitself.async_client.beta.threads.runs.submit_tool_outputs(**input)run=awaitself._await_for_run(run.id,run.thread_id)exceptBaseExceptionase:run_manager.on_chain_error(e)raiseetry:response=self._get_response(run)exceptBaseExceptionase:run_manager.on_chain_error(e,metadata=run.dict())raiseeelse:run_manager.on_chain_end(response)returnresponse
def_parse_intermediate_steps(self,intermediate_steps:List[Tuple[OpenAIAssistantAction,str]])->dict:last_action,last_output=intermediate_steps[-1]run=self._wait_for_run(last_action.run_id,last_action.thread_id)required_tool_call_ids=set()ifrun.required_action:required_tool_call_ids={tc.idfortcinrun.required_action.submit_tool_outputs.tool_calls}tool_outputs=[{"output":str(output),"tool_call_id":action.tool_call_id}foraction,outputinintermediate_stepsifaction.tool_call_idinrequired_tool_call_ids]submit_tool_outputs={"tool_outputs":tool_outputs,"run_id":last_action.run_id,"thread_id":last_action.thread_id,}returnsubmit_tool_outputsdef_create_run(self,input:dict)->Any:params={k:vfork,vininput.items()ifkin("instructions","model","tools","run_metadata")}returnself.client.beta.threads.runs.create(input["thread_id"],assistant_id=self.assistant_id,**params,)def_create_thread_and_run(self,input:dict,thread:dict)->Any:params={k:vfork,vininput.items()ifkin("instructions","model","tools","run_metadata")}run=self.client.beta.threads.create_and_run(assistant_id=self.assistant_id,thread=thread,**params,)returnrundef_get_response(self,run:Any)->Any:# TODO: Paginationifrun.status=="completed":importopenaimajor_version=int(openai.version.VERSION.split(".")[0])minor_version=int(openai.version.VERSION.split(".")[1])version_gte_1_14=(major_version>1)or(major_version==1andminor_version>=14)messages=self.client.beta.threads.messages.list(run.thread_id,order="asc")new_messages=[msgformsginmessagesifmsg.run_id==run.id]ifnotself.as_agent:returnnew_messagesanswer:Any=[msg_contentformsginnew_messagesformsg_contentinmsg.content]ifall((isinstance(content,openai.types.beta.threads.TextContentBlock)ifversion_gte_1_14elseisinstance(content,openai.types.beta.threads.MessageContentText))forcontentinanswer):answer="\n".join(content.text.valueforcontentinanswer)returnOpenAIAssistantFinish(return_values={"output":answer,"thread_id":run.thread_id,"run_id":run.id,},log="",run_id=run.id,thread_id=run.thread_id,)elifrun.status=="requires_action":ifnotself.as_agent:returnrun.required_action.submit_tool_outputs.tool_callsactions=[]fortool_callinrun.required_action.submit_tool_outputs.tool_calls:function=tool_call.functiontry:args=json.loads(function.arguments,strict=False)exceptJSONDecodeErrorase:raiseValueError(f"Received invalid JSON function arguments: "f"{function.arguments} for function {function.name}")fromeiflen(args)==1and"__arg1"inargs:args=args["__arg1"]actions.append(OpenAIAssistantAction(tool=function.name,tool_input=args,tool_call_id=tool_call.id,log="",run_id=run.id,thread_id=run.thread_id,))returnactionselse:run_info=json.dumps(run.dict(),indent=2)raiseValueError(f"Unexpected run status: {run.status}. Full run info:\n\n{run_info})")def_wait_for_run(self,run_id:str,thread_id:str)->Any:in_progress=Truewhilein_progress:run=self.client.beta.threads.runs.retrieve(run_id,thread_id=thread_id)in_progress=run.statusin("in_progress","queued")ifin_progress:sleep(self.check_every_ms/1000)returnrunasyncdef_aparse_intermediate_steps(self,intermediate_steps:List[Tuple[OpenAIAssistantAction,str]])->dict:last_action,last_output=intermediate_steps[-1]run=awaitself._wait_for_run(last_action.run_id,last_action.thread_id)required_tool_call_ids=set()ifrun.required_action:required_tool_call_ids={tc.idfortcinrun.required_action.submit_tool_outputs.tool_calls}tool_outputs=[{"output":str(output),"tool_call_id":action.tool_call_id}foraction,outputinintermediate_stepsifaction.tool_call_idinrequired_tool_call_ids]submit_tool_outputs={"tool_outputs":tool_outputs,"run_id":last_action.run_id,"thread_id":last_action.thread_id,}returnsubmit_tool_outputsasyncdef_acreate_run(self,input:dict)->Any:params={k:vfork,vininput.items()ifkin("instructions","model","tools","run_metadata")}returnawaitself.async_client.beta.threads.runs.create(input["thread_id"],assistant_id=self.assistant_id,**params,)asyncdef_acreate_thread_and_run(self,input:dict,thread:dict)->Any:params={k:vfork,vininput.items()ifkin("instructions","model","tools","run_metadata")}run=awaitself.async_client.beta.threads.create_and_run(assistant_id=self.assistant_id,thread=thread,**params,)returnrunasyncdef_aget_response(self,run:Any)->Any:# TODO: Paginationifrun.status=="completed":importopenaimajor_version=int(openai.version.VERSION.split(".")[0])minor_version=int(openai.version.VERSION.split(".")[1])version_gte_1_14=(major_version>1)or(major_version==1andminor_version>=14)messages=awaitself.async_client.beta.threads.messages.list(run.thread_id,order="asc")new_messages=[msgformsginmessagesifmsg.run_id==run.id]ifnotself.as_agent:returnnew_messagesanswer:Any=[msg_contentformsginnew_messagesformsg_contentinmsg.content]ifall((isinstance(content,openai.types.beta.threads.TextContentBlock)ifversion_gte_1_14elseisinstance(content,openai.types.beta.threads.MessageContentText))forcontentinanswer):answer="\n".join(content.text.valueforcontentinanswer)returnOpenAIAssistantFinish(return_values={"output":answer,"thread_id":run.thread_id,"run_id":run.id,},log="",run_id=run.id,thread_id=run.thread_id,)elifrun.status=="requires_action":ifnotself.as_agent:returnrun.required_action.submit_tool_outputs.tool_callsactions=[]fortool_callinrun.required_action.submit_tool_outputs.tool_calls:function=tool_call.functiontry:args=json.loads(function.arguments,strict=False)exceptJSONDecodeErrorase:raiseValueError(f"Received invalid JSON function arguments: "f"{function.arguments} for function {function.name}")fromeiflen(args)==1and"__arg1"inargs:args=args["__arg1"]actions.append(OpenAIAssistantAction(tool=function.name,tool_input=args,tool_call_id=tool_call.id,log="",run_id=run.id,thread_id=run.thread_id,))returnactionselse:run_info=json.dumps(run.dict(),indent=2)raiseValueError(f"Unexpected run status: {run.status}. Full run info:\n\n{run_info})")asyncdef_await_for_run(self,run_id:str,thread_id:str)->Any:in_progress=Truewhilein_progress:run=awaitself.async_client.beta.threads.runs.retrieve(run_id,thread_id=thread_id)in_progress=run.statusin("in_progress","queued")ifin_progress:awaitasyncio.sleep(self.check_every_ms/1000)returnrun