from __future__ import annotations
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Optional,
Sequence,
Type,
Union,
)
from langchain.agents.openai_assistant.base import OpenAIAssistantRunnable, OutputType
from langchain_core._api import beta
from langchain_core.callbacks import CallbackManager
from langchain_core.load import dumpd
from langchain_core.runnables import RunnableConfig, ensure_config
from langchain_core.tools import BaseTool
from langchain_core.utils.function_calling import convert_to_openai_tool
from pydantic import BaseModel, Field, model_validator
from typing_extensions import Self
if TYPE_CHECKING:
import openai
from openai._types import NotGiven
from openai.types.beta.assistant import ToolResources as AssistantToolResources
def _get_openai_client() -> openai.OpenAI:
"""Get the OpenAI client.
Returns:
openai.OpenAI: OpenAI client
Raises:
ImportError: If `openai` is not installed.
AttributeError: If the installed `openai` version is not compatible.
"""
try:
import openai
return openai.OpenAI(default_headers={"OpenAI-Beta": "assistants=v2"})
except ImportError as e:
raise ImportError(
"Unable to import openai, please install with `pip install openai`."
) from e
except AttributeError as e:
raise AttributeError(
"Please make sure you are using a v1.23-compatible version of openai. You "
'can install with `pip install "openai>=1.23"`.'
) from e
def _get_openai_async_client() -> openai.AsyncOpenAI:
"""Get the async OpenAI client.
Returns:
openai.AsyncOpenAI: Async OpenAI client
Raises:
ImportError: If `openai` is not installed.
AttributeError: If the installed `openai` version is not compatible.
"""
try:
import openai
return openai.AsyncOpenAI(default_headers={"OpenAI-Beta": "assistants=v2"})
except ImportError as e:
raise ImportError(
"Unable to import openai, please install with `pip install openai`."
) from e
except AttributeError as e:
raise AttributeError(
"Please make sure you are using a v1.23-compatible version of openai. You "
'can install with `pip install "openai>=1.23"`.'
) from e
def _convert_file_ids_into_attachments(file_ids: list) -> list:
"""Convert file_ids into attachments
File search and Code interpreter will be turned on by default.
Args:
file_ids (list): List of file_ids that need to be converted into attachments.
Returns:
list: List of attachments converted from file_ids.
"""
attachments = []
for id in file_ids:
attachments.append(
{
"file_id": id,
"tools": [{"type": "file_search"}, {"type": "code_interpreter"}],
}
)
return attachments
def _is_assistants_builtin_tool(
tool: Union[Dict[str, Any], Type[BaseModel], Callable, BaseTool],
) -> bool:
"""Determine if tool corresponds to OpenAI Assistants built-in.
Args:
tool (Union[Dict[str, Any], Type[BaseModel], Callable, BaseTool]):
Tool that needs to be determined.
Returns:
A boolean response of true or false indicating if the tool corresponds to
OpenAI Assistants built-in.
"""
assistants_builtin_tools = ("code_interpreter", "retrieval", "file_search")
return (
isinstance(tool, dict)
and ("type" in tool)
and (tool["type"] in assistants_builtin_tools)
)
def _get_assistants_tool(
tool: Union[Dict[str, Any], Type[BaseModel], Callable, BaseTool],
) -> Dict[str, Any]:
"""Convert a raw function/class to an OpenAI tool.
Note that OpenAI assistants supports several built-in tools,
such as "code_interpreter" and "retrieval."
Args:
tool (Union[Dict[str, Any], Type[BaseModel], Callable, BaseTool]):
Tools or functions that need to be converted to OpenAI tools.
Returns:
Dict[str, Any]: A dictionary of tools that are converted into OpenAI tools.
"""
if _is_assistants_builtin_tool(tool):
return tool # type: ignore
else:
return convert_to_openai_tool(tool)
[docs]
@beta()
class OpenAIAssistantV2Runnable(OpenAIAssistantRunnable):
"""Run an OpenAI Assistant.
Attributes:
client (Any): OpenAI or AzureOpenAI client.
async_client (Any): Async OpenAI or AzureOpenAI client.
assistant_id (str): OpenAI assistant ID.
check_every_ms (float): Frequency to check progress in milliseconds.
as_agent (bool): Whether to use the assistant as a LangChain agent.
Example using OpenAI tools:
.. code-block:: python
from langchain.agents.openai_assistant import OpenAIAssistantV2Runnable
assistant = OpenAIAssistantV2Runnable.create_assistant(
name="math assistant",
instructions="You are a personal math tutor. Write and run code to answer math questions.",
tools=[{"type": "code_interpreter"}],
model="gpt-4-1106-preview"
)
output = assistant.invoke({"content": "What's 10 - 4 raised to the 2.7"})
Example using custom tools and AgentExecutor:
.. code-block:: python
from langchain.agents.openai_assistant import OpenAIAssistantV2Runnable
from langchain.agents import AgentExecutor
from langchain.tools import E2BDataAnalysisTool
tools = [E2BDataAnalysisTool(api_key="...")]
agent = OpenAIAssistantV2Runnable.create_assistant(
name="langchain assistant e2b tool",
instructions="You are a personal math tutor. Write and run code to answer math questions.",
tools=tools,
model="gpt-4-1106-preview",
as_agent=True
)
agent_executor = AgentExecutor(agent=agent, tools=tools)
agent_executor.invoke({"content": "Analyze the data..."})
Example using custom tools and custom execution:
.. code-block:: python
from langchain.agents.openai_assistant import OpenAIAssistantV2Runnable
from langchain.agents import AgentExecutor
from langchain_core.agents import AgentFinish
from langchain.tools import E2BDataAnalysisTool
tools = [E2BDataAnalysisTool(api_key="...")]
agent = OpenAIAssistantV2Runnable.create_assistant(
name="langchain assistant e2b tool",
instructions="You are a personal math tutor. Write and run code to answer math questions.",
tools=tools,
model="gpt-4-1106-preview",
as_agent=True
)
def execute_agent(agent, tools, input):
tool_map = {tool.name: tool for tool in tools}
response = agent.invoke(input)
while not isinstance(response, AgentFinish):
tool_outputs = []
for action in response:
tool_output = tool_map[action.tool].invoke(action.tool_input)
tool_outputs.append({"output": tool_output, "tool_call_id": action.tool_call_id})
response = agent.invoke(
{
"tool_outputs": tool_outputs,
"run_id": action.run_id,
"thread_id": action.thread_id
}
)
return response
response = execute_agent(agent, tools, {"content": "What's 10 - 4 raised to the 2.7"})
next_response = execute_agent(agent, tools, {"content": "now add 17.241", "thread_id": response.thread_id})
""" # noqa: E501
client: Any = Field(default_factory=_get_openai_client)
"""OpenAI or AzureOpenAI client."""
async_client: Any = None
"""OpenAI or AzureOpenAI async client."""
assistant_id: str
"""OpenAI assistant id."""
check_every_ms: float = 1_000.0
"""Frequency with which to check run progress in milliseconds."""
as_agent: bool = False
"""Use as a LangChain agent, compatible with the AgentExecutor."""
@model_validator(mode="after")
def validate_async_client(self) -> Self:
"""Validate that the async client is set, otherwise initialize it."""
if self.async_client is None:
import openai
api_key = self.client.api_key
self.async_client = openai.AsyncOpenAI(api_key=api_key)
return self
[docs]
@classmethod
def create_assistant(
cls,
name: str,
instructions: str,
tools: Sequence[Union[BaseTool, dict]],
model: str,
*,
model_kwargs: dict[str, float] = {},
client: Optional[Union[openai.OpenAI, openai.AzureOpenAI]] = None,
tool_resources: Optional[Union[AssistantToolResources, dict, NotGiven]] = None,
extra_body: Optional[object] = None,
**kwargs: Any,
) -> OpenAIAssistantRunnable:
"""Create an OpenAI Assistant and instantiate the Runnable.
Args:
name (str): Assistant name.
instructions (str): Assistant instructions.
tools (Sequence[Union[BaseTool, dict]]): Assistant tools. Can be passed
in OpenAI format or as BaseTools.
tool_resources (Optional[Union[AssistantToolResources, dict, NotGiven]]):
Assistant tool resources. Can be passed in OpenAI format.
model (str): Assistant model to use.
client (Optional[Union[openai.OpenAI, openai.AzureOpenAI]]): OpenAI or
AzureOpenAI client. Will create default OpenAI client (Assistant v2)
if not specified.
model_kwargs: Additional model arguments. Only available for temperature
and top_p parameters.
extra_body: Additional body parameters to be passed to the assistant.
Returns:
OpenAIAssistantRunnable: The configured assistant runnable.
"""
client = client or _get_openai_client()
if tool_resources is None:
from openai._types import NOT_GIVEN
tool_resources = NOT_GIVEN
assistant = client.beta.assistants.create(
name=name,
instructions=instructions,
tools=[_get_assistants_tool(tool) for tool in tools], # type: ignore
tool_resources=tool_resources, # type: ignore[arg-type]
model=model,
extra_body=extra_body,
**model_kwargs,
)
return cls(assistant_id=assistant.id, client=client, **kwargs)
[docs]
def invoke(
self, input: dict, config: Optional[RunnableConfig] = None, **kwargs: Any
) -> OutputType:
"""Invoke the assistant.
Args:
input (dict): Runnable input dict that can have:
content: User message when starting a new run.
thread_id: Existing thread to use.
run_id: Existing run to use. Should only be supplied when providing
the tool output for a required action after an initial invocation.
file_ids: (deprecated) File ids to include in new run. Use
'attachments' instead
attachments: Assistant files to include in new run. (v2 API).
message_metadata: Metadata to associate with new message.
thread_metadata: Metadata to associate with new thread. Only relevant
when new thread being created.
instructions: Additional run instructions.
model: Override Assistant model for this run.
tools: Override Assistant tools for this run.
tool_resources: Override Assistant tool resources for this run (v2 API).
run_metadata: Metadata to associate with new run.
config (Optional[RunnableConfig]): Configuration for the run.
Returns:
OutputType: If self.as_agent, will return
Union[List[OpenAIAssistantAction], OpenAIAssistantFinish]. Otherwise,
will return OpenAI types
Union[List[ThreadMessage], List[RequiredActionFunctionToolCall]].
Raises:
BaseException: If an error occurs during the invocation.
"""
config = ensure_config(config)
callback_manager = CallbackManager.configure(
inheritable_callbacks=config.get("callbacks"),
inheritable_tags=config.get("tags"),
inheritable_metadata=config.get("metadata"),
)
run_manager = callback_manager.on_chain_start(
dumpd(self), input, name=config.get("run_name") or self.get_name()
)
files = _convert_file_ids_into_attachments(kwargs.get("file_ids", []))
attachments = kwargs.get("attachments", []) + files
try:
# Being run within AgentExecutor and there are tool outputs to submit.
if self.as_agent and input.get("intermediate_steps"):
tool_outputs = self._parse_intermediate_steps(
input["intermediate_steps"]
)
run = self.client.beta.threads.runs.submit_tool_outputs(**tool_outputs)
# Starting a new thread and a new run.
elif "thread_id" not in input:
thread = {
"messages": [
{
"role": "user",
"content": input["content"],
"attachments": attachments,
"metadata": input.get("message_metadata"),
}
],
"metadata": input.get("thread_metadata"),
}
run = self._create_thread_and_run(input, thread)
# Starting a new run in an existing thread.
elif "run_id" not in input:
_ = self.client.beta.threads.messages.create(
input["thread_id"],
content=input["content"],
role="user",
attachments=attachments,
metadata=input.get("message_metadata"),
)
run = self._create_run(input)
# Submitting tool outputs to an existing run, outside the AgentExecutor
# framework.
else:
run = self.client.beta.threads.runs.submit_tool_outputs(**input)
run = self._wait_for_run(run.id, run.thread_id)
except BaseException as e:
run_manager.on_chain_error(e)
raise e
try:
response = self._get_response(run)
except BaseException as e:
run_manager.on_chain_error(e, metadata=run.dict())
raise e
else:
run_manager.on_chain_end(response)
return response
[docs]
@classmethod
async def acreate_assistant(
cls,
name: str,
instructions: str,
tools: Sequence[Union[BaseTool, dict]],
model: str,
*,
async_client: Optional[
Union[openai.AsyncOpenAI, openai.AsyncAzureOpenAI]
] = None,
tool_resources: Optional[Union[AssistantToolResources, dict, NotGiven]] = None,
**kwargs: Any,
) -> OpenAIAssistantRunnable:
"""Create an AsyncOpenAI Assistant and instantiate the Runnable.
Args:
name (str): Assistant name.
instructions (str): Assistant instructions.
tools (Sequence[Union[BaseTool, dict]]): Assistant tools. Can be passed
in OpenAI format or as BaseTools.
tool_resources (Optional[Union[AssistantToolResources, dict, NotGiven]]):
Assistant tool resources. Can be passed in OpenAI format.
model (str): Assistant model to use.
async_client (Optional[Union[openai.OpenAI, openai.AzureOpenAI]]): OpenAI or
AzureOpenAI async client. Will create default async_client if not specified.
Returns:
AsyncOpenAIAssistantRunnable: The configured assistant runnable.
"""
async_client = async_client or _get_openai_async_client()
if tool_resources is None:
from openai._types import NOT_GIVEN
tool_resources = NOT_GIVEN
openai_tools = [_get_assistants_tool(tool) for tool in tools]
assistant = await async_client.beta.assistants.create(
name=name,
instructions=instructions,
tools=openai_tools, # type: ignore
tool_resources=tool_resources, # type: ignore[arg-type]
model=model,
)
return cls(assistant_id=assistant.id, async_client=async_client, **kwargs)
[docs]
async def ainvoke(
self, input: dict, config: Optional[RunnableConfig] = None, **kwargs: Any
) -> OutputType:
"""Async invoke assistant.
Args:
input (dict): Runnable input dict that can have:
content: User message when starting a new run.
thread_id: Existing thread to use.
run_id: Existing run to use. Should only be supplied when providing
the tool output for a required action after an initial invocation.
file_ids: (deprecated) File ids to include in new run. Use
'attachments' instead
attachments: Assistant files to include in new run. (v2 API).
message_metadata: Metadata to associate with new message.
thread_metadata: Metadata to associate with new thread. Only relevant
when new thread being created.
instructions: Additional run instructions.
model: Override Assistant model for this run.
tools: Override Assistant tools for this run.
tool_resources: Override Assistant tool resources for this run (v2 API).
run_metadata: Metadata to associate with new run.
config (Optional[RunnableConfig]): Configuration for the run.
Returns:
OutputType: If self.as_agent, will return
Union[List[OpenAIAssistantAction], OpenAIAssistantFinish]. Otherwise,
will return OpenAI types
Union[List[ThreadMessage], List[RequiredActionFunctionToolCall]].
Raises:
BaseException: If an error occurs during the invocation.
"""
config = config or {}
callback_manager = CallbackManager.configure(
inheritable_callbacks=config.get("callbacks"),
inheritable_tags=config.get("tags"),
inheritable_metadata=config.get("metadata"),
)
run_manager = callback_manager.on_chain_start(
dumpd(self), input, name=config.get("run_name") or self.get_name()
)
files = _convert_file_ids_into_attachments(kwargs.get("file_ids", []))
attachments = kwargs.get("attachments", []) + files
try:
# Being run within AgentExecutor and there are tool outputs to submit.
if self.as_agent and input.get("intermediate_steps"):
tool_outputs = self._parse_intermediate_steps(
input["intermediate_steps"]
)
run = await self.async_client.beta.threads.runs.submit_tool_outputs(
**tool_outputs
)
# Starting a new thread and a new run.
elif "thread_id" not in input:
thread = {
"messages": [
{
"role": "user",
"content": input["content"],
"attachments": attachments,
"metadata": input.get("message_metadata"),
}
],
"metadata": input.get("thread_metadata"),
}
run = await self._acreate_thread_and_run(input, thread)
# Starting a new run in an existing thread.
elif "run_id" not in input:
_ = await self.async_client.beta.threads.messages.create(
input["thread_id"],
content=input["content"],
role="user",
attachments=attachments,
metadata=input.get("message_metadata"),
)
run = await self._acreate_run(input)
# Submitting tool outputs to an existing run, outside the AgentExecutor
# framework.
else:
run = await self.async_client.beta.threads.runs.submit_tool_outputs(
**input
)
run = await self._await_for_run(run.id, run.thread_id)
except BaseException as e:
run_manager.on_chain_error(e)
raise e
try:
response = self._get_response(run)
except BaseException as e:
run_manager.on_chain_error(e, metadata=run.dict())
raise e
else:
run_manager.on_chain_end(response)
return response
def _create_run(self, input: dict) -> Any:
"""Create a new run within an existing thread.
Args:
input (dict): The input data for the new run.
Returns:
Any: The created run object.
"""
params = {
k: v
for k, v in input.items()
if k in ("instructions", "model", "tools", "tool_resources", "run_metadata")
}
return self.client.beta.threads.runs.create(
input["thread_id"],
assistant_id=self.assistant_id,
**params,
)
def _create_thread_and_run(self, input: dict, thread: dict) -> Any:
"""Create a new thread and run.
Args:
input (dict): The input data for the run.
thread (dict): The thread data to create.
Returns:
Any: The created thread and run.
"""
params = {
k: v
for k, v in input.items()
if k in ("instructions", "model", "tools", "run_metadata")
}
if tool_resources := input.get("tool_resources"):
thread["tool_resources"] = tool_resources
run = self.client.beta.threads.create_and_run(
assistant_id=self.assistant_id,
thread=thread,
**params,
)
return run
async def _acreate_run(self, input: dict) -> Any:
"""Asynchronously create a new run within an existing thread.
Args:
input (dict): The input data for the new run.
Returns:
Any: The created run object.
"""
params = {
k: v
for k, v in input.items()
if k in ("instructions", "model", "tools", "tool_resources", "run_metadata")
}
return await self.async_client.beta.threads.runs.create(
input["thread_id"],
assistant_id=self.assistant_id,
**params,
)
async def _acreate_thread_and_run(self, input: dict, thread: dict) -> Any:
"""Asynchronously create a new thread and run simultaneously.
Args:
input (dict): The input data for the run.
thread (dict): The thread data to create.
Returns:
Any: The created thread and run.
"""
params = {
k: v
for k, v in input.items()
if k in ("instructions", "model", "tools", "run_metadata")
}
if tool_resources := input.get("tool_resources"):
thread["tool_resources"] = tool_resources
run = await self.async_client.beta.threads.create_and_run(
assistant_id=self.assistant_id,
thread=thread,
**params,
)
return run