Source code for langchain_community.agents.openai_assistant.base
from__future__importannotationsfromtypingimport(TYPE_CHECKING,Any,Callable,Dict,Optional,Sequence,Type,Union,)fromlangchain.agents.openai_assistant.baseimportOpenAIAssistantRunnable,OutputTypefromlangchain_core._apiimportbetafromlangchain_core.callbacksimportCallbackManagerfromlangchain_core.loadimportdumpdfromlangchain_core.pydantic_v1importBaseModel,Field,root_validatorfromlangchain_core.runnablesimportRunnableConfig,ensure_configfromlangchain_core.toolsimportBaseToolfromlangchain_core.utils.function_callingimportconvert_to_openai_toolifTYPE_CHECKING:importopenaifromopenai._typesimportNotGivenfromopenai.types.beta.assistantimportToolResourcesasAssistantToolResourcesdef_get_openai_client()->openai.OpenAI:try:importopenaireturnopenai.OpenAI(default_headers={"OpenAI-Beta":"assistants=v2"})exceptImportErrorase:raiseImportError("Unable to import openai, please install with `pip install openai`.")fromeexceptAttributeErrorase:raiseAttributeError("Please make sure you are using a v1.23-compatible version of openai. You "'can install with `pip install "openai>=1.23"`.')fromedef_get_openai_async_client()->openai.AsyncOpenAI:try:importopenaireturnopenai.AsyncOpenAI(default_headers={"OpenAI-Beta":"assistants=v2"})exceptImportErrorase:raiseImportError("Unable to import openai, please install with `pip install openai`.")fromeexceptAttributeErrorase:raiseAttributeError("Please make sure you are using a v1.23-compatible version of openai. You "'can install with `pip install "openai>=1.23"`.')fromedef_convert_file_ids_into_attachments(file_ids:list)->list:""" Convert file_ids into attachments File search and Code interpreter will be turned on by default. Args: file_ids (list): List of file_ids that need to be converted into attachments. Returns: A list of attachments that are converted from file_ids. """attachments=[]foridinfile_ids:attachments.append({"file_id":id,"tools":[{"type":"file_search"},{"type":"code_interpreter"}],})returnattachmentsdef_is_assistants_builtin_tool(tool:Union[Dict[str,Any],Type[BaseModel],Callable,BaseTool],)->bool:""" Determine if tool corresponds to OpenAI Assistants built-in. Args: tool : Tool that needs to be determined Returns: A boolean response of true or false indicating if the tool corresponds to OpenAI Assistants built-in. """assistants_builtin_tools=("code_interpreter","retrieval","file_search")return(isinstance(tool,dict)and("type"intool)and(tool["type"]inassistants_builtin_tools))def_get_assistants_tool(tool:Union[Dict[str,Any],Type[BaseModel],Callable,BaseTool],)->Dict[str,Any]:"""Convert a raw function/class to an OpenAI tool. Note that OpenAI assistants supports several built-in tools, such as "code_interpreter" and "retrieval." Args: tool: Tools or functions that need to be converted to OpenAI tools. Returns: A dictionary of tools that are converted into OpenAI tools. """if_is_assistants_builtin_tool(tool):returntool# type: ignoreelse:returnconvert_to_openai_tool(tool)
[docs]@beta()classOpenAIAssistantV2Runnable(OpenAIAssistantRunnable):"""Run an OpenAI Assistant. Example using OpenAI tools: .. code-block:: python from langchain.agents.openai_assistant import OpenAIAssistantV2Runnable interpreter_assistant = OpenAIAssistantV2Runnable.create_assistant( name="langchain assistant", instructions="You are a personal math tutor. Write and run code to answer math questions.", tools=[{"type": "code_interpreter"}], model="gpt-4-1106-preview" ) output = interpreter_assistant.invoke({"content": "What's 10 - 4 raised to the 2.7"}) Example using custom tools and AgentExecutor: .. code-block:: python from langchain.agents.openai_assistant import OpenAIAssistantV2Runnable from langchain.agents import AgentExecutor from langchain.tools import E2BDataAnalysisTool tools = [E2BDataAnalysisTool(api_key="...")] agent = OpenAIAssistantV2Runnable.create_assistant( name="langchain assistant e2b tool", instructions="You are a personal math tutor. Write and run code to answer math questions.", tools=tools, model="gpt-4-1106-preview", as_agent=True ) agent_executor = AgentExecutor(agent=agent, tools=tools) agent_executor.invoke({"content": "What's 10 - 4 raised to the 2.7"}) Example using custom tools and custom execution: .. code-block:: python from langchain.agents.openai_assistant import OpenAIAssistantV2Runnable from langchain.agents import AgentExecutor from langchain_core.agents import AgentFinish from langchain.tools import E2BDataAnalysisTool tools = [E2BDataAnalysisTool(api_key="...")] agent = OpenAIAssistantV2Runnable.create_assistant( name="langchain assistant e2b tool", instructions="You are a personal math tutor. Write and run code to answer math questions.", tools=tools, model="gpt-4-1106-preview", as_agent=True ) def execute_agent(agent, tools, input): tool_map = {tool.name: tool for tool in tools} response = agent.invoke(input) while not isinstance(response, AgentFinish): tool_outputs = [] for action in response: tool_output = tool_map[action.tool].invoke(action.tool_input) tool_outputs.append({"output": tool_output, "tool_call_id": action.tool_call_id}) response = agent.invoke( { "tool_outputs": tool_outputs, "run_id": action.run_id, "thread_id": action.thread_id } ) return response response = execute_agent(agent, tools, {"content": "What's 10 - 4 raised to the 2.7"}) next_response = execute_agent(agent, tools, {"content": "now add 17.241", "thread_id": response.thread_id}) """# noqa: E501client:Any=Field(default_factory=_get_openai_client)"""OpenAI or AzureOpenAI client."""async_client:Any=None"""OpenAI or AzureOpenAI async client."""assistant_id:str"""OpenAI assistant id."""check_every_ms:float=1_000.0"""Frequency with which to check run progress in ms."""as_agent:bool=False"""Use as a LangChain agent, compatible with the AgentExecutor."""@root_validator(pre=False,skip_on_failure=True)defvalidate_async_client(cls,values:dict)->dict:ifvalues["async_client"]isNone:importopenaiapi_key=values["client"].api_keyvalues["async_client"]=openai.AsyncOpenAI(api_key=api_key)returnvalues
[docs]@classmethoddefcreate_assistant(cls,name:str,instructions:str,tools:Sequence[Union[BaseTool,dict]],model:str,*,client:Optional[Union[openai.OpenAI,openai.AzureOpenAI]]=None,tool_resources:Optional[Union[AssistantToolResources,dict,NotGiven]]=None,**kwargs:Any,)->OpenAIAssistantRunnable:"""Create an OpenAI Assistant and instantiate the Runnable. Args: name: Assistant name. instructions: Assistant instructions. tools: Assistant tools. Can be passed in OpenAI format or as BaseTools. tool_resources: Assistant tool resources. Can be passed in OpenAI format model: Assistant model to use. client: OpenAI or AzureOpenAI client. Will create default OpenAI client (Assistant v2) if not specified. Returns: OpenAIAssistantRunnable configured to run using the created assistant. """client=clientor_get_openai_client()iftool_resourcesisNone:fromopenai._typesimportNOT_GIVENtool_resources=NOT_GIVENassistant=client.beta.assistants.create(name=name,instructions=instructions,tools=[_get_assistants_tool(tool)fortoolintools],# type: ignoretool_resources=tool_resources,model=model,)returncls(assistant_id=assistant.id,client=client,**kwargs)
[docs]definvoke(self,input:dict,config:Optional[RunnableConfig]=None,**kwargs:Any)->OutputType:"""Invoke assistant. Args: input: Runnable input dict that can have: content: User message when starting a new run. thread_id: Existing thread to use. run_id: Existing run to use. Should only be supplied when providing the tool output for a required action after an initial invocation. file_ids: (deprecated) File ids to include in new run. Use 'attachments' instead attachments: Assistant files to include in new run. (v2 API). message_metadata: Metadata to associate with new message. thread_metadata: Metadata to associate with new thread. Only relevant when new thread being created. instructions: Additional run instructions. model: Override Assistant model for this run. tools: Override Assistant tools for this run. tool_resources: Override Assistant tool resources for this run (v2 API). run_metadata: Metadata to associate with new run. config: Runnable config: Return: If self.as_agent, will return Union[List[OpenAIAssistantAction], OpenAIAssistantFinish]. Otherwise, will return OpenAI types Union[List[ThreadMessage], List[RequiredActionFunctionToolCall]]. Raises: BaseException: If an error occurs during the invocation. """config=ensure_config(config)callback_manager=CallbackManager.configure(inheritable_callbacks=config.get("callbacks"),inheritable_tags=config.get("tags"),inheritable_metadata=config.get("metadata"),)run_manager=callback_manager.on_chain_start(dumpd(self),input,name=config.get("run_name"))files=_convert_file_ids_into_attachments(kwargs.get("file_ids",[]))attachments=kwargs.get("attachments",[])+filestry:# Being run within AgentExecutor and there are tool outputs to submit.ifself.as_agentandinput.get("intermediate_steps"):tool_outputs=self._parse_intermediate_steps(input["intermediate_steps"])run=self.client.beta.threads.runs.submit_tool_outputs(**tool_outputs)# Starting a new thread and a new run.elif"thread_id"notininput:thread={"messages":[{"role":"user","content":input["content"],"attachments":attachments,"metadata":input.get("message_metadata"),}],"metadata":input.get("thread_metadata"),}run=self._create_thread_and_run(input,thread)# Starting a new run in an existing thread.elif"run_id"notininput:_=self.client.beta.threads.messages.create(input["thread_id"],content=input["content"],role="user",attachments=attachments,metadata=input.get("message_metadata"),)run=self._create_run(input)# Submitting tool outputs to an existing run, outside the AgentExecutor# framework.else:run=self.client.beta.threads.runs.submit_tool_outputs(**input)run=self._wait_for_run(run.id,run.thread_id)exceptBaseExceptionase:run_manager.on_chain_error(e)raiseetry:response=self._get_response(run)exceptBaseExceptionase:run_manager.on_chain_error(e,metadata=run.dict())raiseeelse:run_manager.on_chain_end(response)returnresponse
[docs]@classmethodasyncdefacreate_assistant(cls,name:str,instructions:str,tools:Sequence[Union[BaseTool,dict]],model:str,*,async_client:Optional[Union[openai.AsyncOpenAI,openai.AsyncAzureOpenAI]]=None,tool_resources:Optional[Union[AssistantToolResources,dict,NotGiven]]=None,**kwargs:Any,)->OpenAIAssistantRunnable:"""Create an AsyncOpenAI Assistant and instantiate the Runnable. Args: name: Assistant name. instructions: Assistant instructions. tools: Assistant tools. Can be passed in OpenAI format or as BaseTools. tool_resources: Assistant tool resources. Can be passed in OpenAI format model: Assistant model to use. async_client: AsyncOpenAI client. Will create default async_client if not specified. Returns: AsyncOpenAIAssistantRunnable configured to run using the created assistant. """async_client=async_clientor_get_openai_async_client()iftool_resourcesisNone:fromopenai._typesimportNOT_GIVENtool_resources=NOT_GIVENopenai_tools=[_get_assistants_tool(tool)fortoolintools]assistant=awaitasync_client.beta.assistants.create(name=name,instructions=instructions,tools=openai_tools,# type: ignoretool_resources=tool_resources,model=model,)returncls(assistant_id=assistant.id,async_client=async_client,**kwargs)
[docs]asyncdefainvoke(self,input:dict,config:Optional[RunnableConfig]=None,**kwargs:Any)->OutputType:"""Async invoke assistant. Args: input: Runnable input dict that can have: content: User message when starting a new run. thread_id: Existing thread to use. run_id: Existing run to use. Should only be supplied when providing the tool output for a required action after an initial invocation. file_ids: (deprecated) File ids to include in new run. Use 'attachments' instead attachments: Assistant files to include in new run. (v2 API). message_metadata: Metadata to associate with new message. thread_metadata: Metadata to associate with new thread. Only relevant when new thread being created. instructions: Additional run instructions. model: Override Assistant model for this run. tools: Override Assistant tools for this run. tool_resources: Override Assistant tool resources for this run (v2 API). run_metadata: Metadata to associate with new run. config: Runnable config: Return: If self.as_agent, will return Union[List[OpenAIAssistantAction], OpenAIAssistantFinish]. Otherwise, will return OpenAI types Union[List[ThreadMessage], List[RequiredActionFunctionToolCall]]. """config=configor{}callback_manager=CallbackManager.configure(inheritable_callbacks=config.get("callbacks"),inheritable_tags=config.get("tags"),inheritable_metadata=config.get("metadata"),)run_manager=callback_manager.on_chain_start(dumpd(self),input,name=config.get("run_name"))files=_convert_file_ids_into_attachments(kwargs.get("file_ids",[]))attachments=kwargs.get("attachments",[])+filestry:# Being run within AgentExecutor and there are tool outputs to submit.ifself.as_agentandinput.get("intermediate_steps"):tool_outputs=self._parse_intermediate_steps(input["intermediate_steps"])run=awaitself.async_client.beta.threads.runs.submit_tool_outputs(**tool_outputs)# Starting a new thread and a new run.elif"thread_id"notininput:thread={"messages":[{"role":"user","content":input["content"],"attachments":attachments,"metadata":input.get("message_metadata"),}],"metadata":input.get("thread_metadata"),}run=awaitself._acreate_thread_and_run(input,thread)# Starting a new run in an existing thread.elif"run_id"notininput:_=awaitself.async_client.beta.threads.messages.create(input["thread_id"],content=input["content"],role="user",attachments=attachments,metadata=input.get("message_metadata"),)run=awaitself._acreate_run(input)# Submitting tool outputs to an existing run, outside the AgentExecutor# framework.else:run=awaitself.async_client.beta.threads.runs.submit_tool_outputs(**input)run=awaitself._await_for_run(run.id,run.thread_id)exceptBaseExceptionase:run_manager.on_chain_error(e)raiseetry:response=self._get_response(run)exceptBaseExceptionase:run_manager.on_chain_error(e,metadata=run.dict())raiseeelse:run_manager.on_chain_end(response)returnresponse