"""IBM watsonx.ai large language chat models wrapper."""
import hashlib
import json
import logging
from operator import itemgetter
from typing import (
Any,
Callable,
Dict,
Iterator,
List,
Literal,
Mapping,
Optional,
Sequence,
Tuple,
Type,
TypedDict,
Union,
cast,
)
from ibm_watsonx_ai import APIClient, Credentials # type: ignore
from ibm_watsonx_ai.foundation_models import ModelInference # type: ignore
from ibm_watsonx_ai.foundation_models.schema import ( # type: ignore
BaseSchema,
TextChatParameters,
)
from langchain_core.callbacks import CallbackManagerForLLMRun
from langchain_core.language_models import LanguageModelInput
from langchain_core.language_models.chat_models import (
BaseChatModel,
LangSmithParams,
generate_from_stream,
)
from langchain_core.messages import (
AIMessage,
AIMessageChunk,
BaseMessage,
BaseMessageChunk,
ChatMessage,
ChatMessageChunk,
FunctionMessage,
FunctionMessageChunk,
HumanMessage,
HumanMessageChunk,
InvalidToolCall,
SystemMessage,
SystemMessageChunk,
ToolCall,
ToolMessage,
ToolMessageChunk,
)
from langchain_core.messages.ai import UsageMetadata
from langchain_core.messages.tool import tool_call_chunk
from langchain_core.output_parsers import JsonOutputParser, PydanticOutputParser
from langchain_core.output_parsers.base import OutputParserLike
from langchain_core.output_parsers.openai_tools import (
JsonOutputKeyToolsParser,
PydanticToolsParser,
make_invalid_tool_call,
parse_tool_call,
)
from langchain_core.outputs import ChatGeneration, ChatGenerationChunk, ChatResult
from langchain_core.runnables import Runnable, RunnableMap, RunnablePassthrough
from langchain_core.tools import BaseTool
from langchain_core.utils._merge import merge_dicts
from langchain_core.utils.function_calling import (
convert_to_openai_function,
convert_to_openai_tool,
)
from langchain_core.utils.pydantic import is_basemodel_subclass
from langchain_core.utils.utils import secret_from_env
from pydantic import BaseModel, ConfigDict, Field, SecretStr, model_validator
from typing_extensions import Self
from langchain_ibm.utils import (
check_duplicate_chat_params,
check_for_attribute,
extract_chat_params,
)
logger = logging.getLogger(__name__)
def _convert_dict_to_message(_dict: Mapping[str, Any], call_id: str) -> BaseMessage:
"""Convert a dictionary to a LangChain message.
Args:
_dict: The dictionary.
call_id: call id
Returns:
The LangChain message.
"""
role = _dict.get("role")
name = _dict.get("name")
id_ = call_id
if role == "user":
return HumanMessage(content=_dict.get("content", ""), id=id_, name=name)
elif role == "assistant":
content = _dict.get("content", "") or ""
additional_kwargs: Dict = {}
if function_call := _dict.get("function_call"):
additional_kwargs["function_call"] = dict(function_call)
tool_calls = []
invalid_tool_calls = []
if raw_tool_calls := _dict.get("tool_calls"):
additional_kwargs["tool_calls"] = raw_tool_calls
for raw_tool_call in raw_tool_calls:
try:
tool_calls.append(parse_tool_call(raw_tool_call, return_id=True))
except Exception as e:
invalid_tool_calls.append(
make_invalid_tool_call(raw_tool_call, str(e))
)
return AIMessage(
content=content,
additional_kwargs=additional_kwargs,
name=name,
id=id_,
tool_calls=tool_calls,
invalid_tool_calls=invalid_tool_calls,
)
elif role == "system":
return SystemMessage(content=_dict.get("content", ""), name=name, id=id_)
elif role == "function":
return FunctionMessage(
content=_dict.get("content", ""), name=cast(str, _dict.get("name")), id=id_
)
elif role == "tool":
additional_kwargs = {}
if "name" in _dict:
additional_kwargs["name"] = _dict["name"]
return ToolMessage(
content=_dict.get("content", ""),
tool_call_id=cast(str, _dict.get("tool_call_id")),
additional_kwargs=additional_kwargs,
name=name,
id=id_,
)
else:
return ChatMessage(content=_dict.get("content", ""), role=role, id=id_) # type: ignore[arg-type]
def _format_message_content(content: Any) -> Any:
"""Format message content."""
if content and isinstance(content, list):
# Remove unexpected block types
formatted_content = []
for block in content:
if (
isinstance(block, dict)
and "type" in block
and block["type"] == "tool_use"
):
continue
else:
formatted_content.append(block)
else:
formatted_content = content
return formatted_content
def _base62_encode(num: int) -> str:
"""Encodes a number in base62 and ensures result is of a specified length."""
base62 = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
if num == 0:
return base62[0]
arr = []
base = len(base62)
while num:
num, rem = divmod(num, base)
arr.append(base62[rem])
arr.reverse()
return "".join(arr)
def _convert_tool_call_id_to_mistral_compatible(tool_call_id: str) -> str:
"""Convert a tool call ID to a Mistral-compatible format"""
hash_bytes = hashlib.sha256(tool_call_id.encode()).digest()
hash_int = int.from_bytes(hash_bytes, byteorder="big")
base62_str = _base62_encode(hash_int)
if len(base62_str) >= 9:
return base62_str[:9]
else:
return base62_str.rjust(9, "0")
def _convert_message_to_dict(message: BaseMessage, model_id: str | None) -> dict:
"""Convert a LangChain message to a dictionary.
Args:
message: The LangChain message.
model_id: Type of model to use.
Returns:
The dictionary.
"""
message_dict: Dict[str, Any] = {"content": _format_message_content(message.content)}
if (name := message.name or message.additional_kwargs.get("name")) is not None:
message_dict["name"] = name
# populate role and additional message data
if isinstance(message, ChatMessage):
message_dict["role"] = message.role
elif isinstance(message, HumanMessage):
message_dict["role"] = "user"
elif isinstance(message, AIMessage):
message_dict["role"] = "assistant"
if "function_call" in message.additional_kwargs:
message_dict["function_call"] = message.additional_kwargs["function_call"]
if message.tool_calls or message.invalid_tool_calls:
message_dict["tool_calls"] = [
_lc_tool_call_to_watsonx_tool_call(tc) for tc in message.tool_calls
] + [
_lc_invalid_tool_call_to_watsonx_tool_call(tc)
for tc in message.invalid_tool_calls
]
elif "tool_calls" in message.additional_kwargs:
message_dict["tool_calls"] = message.additional_kwargs["tool_calls"]
tool_call_supported_props = {"id", "type", "function"}
message_dict["tool_calls"] = [
{k: v for k, v in tool_call.items() if k in tool_call_supported_props}
for tool_call in message_dict["tool_calls"]
]
else:
pass
# If tool calls present, content null value should be None not empty string.
if "function_call" in message_dict or "tool_calls" in message_dict:
message_dict["content"] = message_dict["content"] or None
# Workaround for "mistralai/mistral-large" model when id < 9
if model_id and model_id.startswith("mistralai"):
tool_calls = message_dict.get("tool_calls", [])
if (
isinstance(tool_calls, list)
and tool_calls
and isinstance(tool_calls[0], dict)
):
tool_call_id = tool_calls[0].get("id", "")
if len(tool_call_id) < 9:
tool_call_id = _convert_tool_call_id_to_mistral_compatible(
tool_call_id
)
message_dict["tool_calls"][0]["id"] = tool_call_id
elif isinstance(message, SystemMessage):
message_dict["role"] = "system"
elif isinstance(message, FunctionMessage):
message_dict["role"] = "function"
elif isinstance(message, ToolMessage):
message_dict["role"] = "tool"
message_dict["tool_call_id"] = message.tool_call_id
# Workaround for "mistralai/mistral-large" model when tool_call_id < 9
if model_id and model_id.startswith("mistralai"):
tool_call_id = message_dict.get("tool_call_id", "")
if len(tool_call_id) < 9:
tool_call_id = _convert_tool_call_id_to_mistral_compatible(tool_call_id)
message_dict["tool_call_id"] = tool_call_id
supported_props = {"content", "role", "tool_call_id"}
message_dict = {k: v for k, v in message_dict.items() if k in supported_props}
else:
raise TypeError(f"Got unknown type {message}")
return message_dict
def _convert_delta_to_message_chunk(
_dict: Mapping[str, Any],
default_class: Type[BaseMessageChunk],
call_id: str,
is_first_tool_chunk: bool,
) -> BaseMessageChunk:
id_ = call_id
role = cast(str, _dict.get("role"))
content = cast(str, _dict.get("content") or "")
additional_kwargs: Dict = {}
if _dict.get("function_call"):
function_call = dict(_dict["function_call"])
if "name" in function_call and function_call["name"] is None:
function_call["name"] = ""
additional_kwargs["function_call"] = function_call
tool_call_chunks = []
if raw_tool_calls := _dict.get("tool_calls"):
additional_kwargs["tool_calls"] = raw_tool_calls
try:
tool_call_chunks = [
tool_call_chunk(
name=rtc["function"].get("name") if is_first_tool_chunk else None,
args=rtc["function"].get("arguments"),
id=rtc.get("id") if is_first_tool_chunk else None,
index=rtc["index"],
)
for rtc in raw_tool_calls
]
except KeyError:
pass
if role == "user" or default_class == HumanMessageChunk:
return HumanMessageChunk(content=content, id=id_)
elif role == "assistant" or default_class == AIMessageChunk:
return AIMessageChunk(
content=content,
additional_kwargs=additional_kwargs,
id=id_,
tool_call_chunks=tool_call_chunks, # type: ignore[arg-type]
)
elif role == "system" or default_class == SystemMessageChunk:
return SystemMessageChunk(content=content, id=id_)
elif role == "function" or default_class == FunctionMessageChunk:
return FunctionMessageChunk(content=content, name=_dict["name"], id=id_)
elif role == "tool" or default_class == ToolMessageChunk:
return ToolMessageChunk(
content=content, tool_call_id=_dict["tool_call_id"], id=id_
)
elif role or default_class == ChatMessageChunk:
return ChatMessageChunk(content=content, role=role, id=id_)
else:
return default_class(content=content, id=id_) # type: ignore
def _convert_chunk_to_generation_chunk(
chunk: dict,
default_chunk_class: Type,
base_generation_info: Optional[Dict],
is_first_chunk: bool,
is_first_tool_chunk: bool,
) -> Optional[ChatGenerationChunk]:
token_usage = chunk.get("usage")
choices = chunk.get("choices", [])
usage_metadata: Optional[UsageMetadata] = (
_create_usage_metadata(token_usage, is_first_chunk) if token_usage else None
)
if len(choices) == 0:
# logprobs is implicitly None
generation_chunk = ChatGenerationChunk(
message=default_chunk_class(content="", usage_metadata=usage_metadata)
)
return generation_chunk
choice = choices[0]
if choice["delta"] is None:
return None
message_chunk = _convert_delta_to_message_chunk(
choice["delta"], default_chunk_class, chunk["id"], is_first_tool_chunk
)
generation_info = {**base_generation_info} if base_generation_info else {}
if finish_reason := choice.get("finish_reason"):
generation_info["finish_reason"] = finish_reason
if model_name := chunk.get("model"):
generation_info["model_name"] = model_name
if system_fingerprint := chunk.get("system_fingerprint"):
generation_info["system_fingerprint"] = system_fingerprint
logprobs = choice.get("logprobs")
if logprobs:
generation_info["logprobs"] = logprobs
if usage_metadata and isinstance(message_chunk, AIMessageChunk):
message_chunk.usage_metadata = usage_metadata
generation_chunk = ChatGenerationChunk(
message=message_chunk, generation_info=generation_info or None
)
return generation_chunk
class _FunctionCall(TypedDict):
name: str
[docs]
class ChatWatsonx(BaseChatModel):
"""IBM watsonx.ai large language chat models.
.. dropdown:: Setup
:open:
To use, you should have ``langchain_ibm`` python package installed,
and the environment variable ``WATSONX_APIKEY`` set with your API key, or pass
it as a named parameter to the constructor.
.. code-block:: bash
pip install -U langchain-ibm
export WATSONX_APIKEY="your-api-key"
Example:
.. code-block:: python
from ibm_watsonx_ai.metanames import GenTextParamsMetaNames
parameters = {
GenTextParamsMetaNames.DECODING_METHOD: "sample",
GenTextParamsMetaNames.MAX_NEW_TOKENS: 100,
GenTextParamsMetaNames.MIN_NEW_TOKENS: 1,
GenTextParamsMetaNames.TEMPERATURE: 0.5,
GenTextParamsMetaNames.TOP_K: 50,
GenTextParamsMetaNames.TOP_P: 1,
}
from langchain_ibm import ChatWatsonx
watsonx_llm = ChatWatsonx(
model_id="meta-llama/llama-3-70b-instruct",
url="https://us-south.ml.cloud.ibm.com",
apikey="*****",
project_id="*****",
params=parameters,
)
"""
model_id: Optional[str] = None
"""Type of model to use."""
deployment_id: Optional[str] = None
"""Type of deployed model to use."""
project_id: Optional[str] = None
"""ID of the Watson Studio project."""
space_id: Optional[str] = None
"""ID of the Watson Studio space."""
url: SecretStr = Field(
alias="url",
default_factory=secret_from_env("WATSONX_URL", default=None), # type: ignore[assignment]
)
"""URL to the Watson Machine Learning or CPD instance."""
apikey: Optional[SecretStr] = Field(
alias="apikey", default_factory=secret_from_env("WATSONX_APIKEY", default=None)
)
"""API key to the Watson Machine Learning or CPD instance."""
token: Optional[SecretStr] = Field(
alias="token", default_factory=secret_from_env("WATSONX_TOKEN", default=None)
)
"""Token to the CPD instance."""
password: Optional[SecretStr] = Field(
alias="password",
default_factory=secret_from_env("WATSONX_PASSWORD", default=None),
)
"""Password to the CPD instance."""
username: Optional[SecretStr] = Field(
alias="username",
default_factory=secret_from_env("WATSONX_USERNAME", default=None),
)
"""Username to the CPD instance."""
instance_id: Optional[SecretStr] = Field(
alias="instance_id",
default_factory=secret_from_env("WATSONX_INSTANCE_ID", default=None),
)
"""Instance_id of the CPD instance."""
version: Optional[SecretStr] = None
"""Version of the CPD instance."""
params: Optional[Union[dict, TextChatParameters]] = None
"""Model parameters to use during request generation.
Note:
`ValueError` is raised if the same Chat generation parameter is provided
within the params attribute and as keyword argument."""
frequency_penalty: Optional[float] = None
"""Positive values penalize new tokens based on their existing frequency in the
text so far, decreasing the model's likelihood to repeat the same line verbatim."""
logprobs: Optional[bool] = None
"""Whether to return log probabilities of the output tokens or not.
If true, returns the log probabilities of each output token returned
in the content of message."""
top_logprobs: Optional[int] = None
"""An integer specifying the number of most likely tokens to return at each
token position, each with an associated log probability. The option logprobs
must be set to true if this parameter is used."""
max_tokens: Optional[int] = None
"""The maximum number of tokens that can be generated in the chat completion.
The total length of input tokens and generated tokens is limited by the
model's context length."""
n: Optional[int] = None
"""How many chat completion choices to generate for each input message.
Note that you will be charged based on the number of generated tokens across
all of the choices. Keep n as 1 to minimize costs."""
presence_penalty: Optional[float] = None
"""Positive values penalize new tokens based on whether they appear in the
text so far, increasing the model's likelihood to talk about new topics."""
temperature: Optional[float] = None
"""What sampling temperature to use. Higher values like 0.8 will make the
output more random, while lower values like 0.2 will make it more focused
and deterministic.
We generally recommend altering this or top_p but not both."""
response_format: Optional[dict] = None
"""The chat response format parameters."""
top_p: Optional[float] = None
"""An alternative to sampling with temperature, called nucleus sampling,
where the model considers the results of the tokens with top_p probability
mass. So 0.1 means only the tokens comprising the top 10% probability mass
are considered.
We generally recommend altering this or temperature but not both."""
time_limit: Optional[int] = None
"""Time limit in milliseconds - if not completed within this time,
generation will stop."""
verify: Union[str, bool, None] = None
"""You can pass one of following as verify:
* the path to a CA_BUNDLE file
* the path of directory with certificates of trusted CAs
* True - default path to truststore will be taken
* False - no verification will be made"""
validate_model: bool = True
"""Model ID validation."""
streaming: bool = False
"""Whether to stream the results or not."""
watsonx_model: ModelInference = Field(default=None, exclude=True) #: :meta private:
watsonx_client: Optional[APIClient] = Field(default=None, exclude=True)
model_config = ConfigDict(populate_by_name=True)
@classmethod
def is_lc_serializable(cls) -> bool:
return False
@property
def _llm_type(self) -> str:
"""Return the type of chat model."""
return "watsonx-chat"
def _get_ls_params(
self, stop: Optional[List[str]] = None, **kwargs: Any
) -> LangSmithParams:
"""Get standard params for tracing."""
params = super()._get_ls_params(stop=stop, **kwargs)
params["ls_provider"] = "ibm"
if self.model_id:
params["ls_model_name"] = self.model_id
return params
@property
def lc_secrets(self) -> Dict[str, str]:
"""A map of constructor argument names to secret ids.
For example:
{
"url": "WATSONX_URL",
"apikey": "WATSONX_APIKEY",
"token": "WATSONX_TOKEN",
"password": "WATSONX_PASSWORD",
"username": "WATSONX_USERNAME",
"instance_id": "WATSONX_INSTANCE_ID",
}
"""
return {
"url": "WATSONX_URL",
"apikey": "WATSONX_APIKEY",
"token": "WATSONX_TOKEN",
"password": "WATSONX_PASSWORD",
"username": "WATSONX_USERNAME",
"instance_id": "WATSONX_INSTANCE_ID",
}
@model_validator(mode="after")
def validate_environment(self) -> Self:
"""Validate that credentials and python package exists in environment."""
self.params = self.params or {}
if isinstance(self.params, BaseSchema):
self.params = self.params.to_dict()
check_duplicate_chat_params(self.params, self.__dict__)
self.params.update(
{
k: v
for k, v in {
"frequency_penalty": self.frequency_penalty,
"logprobs": self.logprobs,
"top_logprobs": self.top_logprobs,
"max_tokens": self.max_tokens,
"n": self.n,
"presence_penalty": self.presence_penalty,
"response_format": self.response_format,
"temperature": self.temperature,
"top_p": self.top_p,
"time_limit": self.time_limit,
}.items()
if v is not None
}
)
if isinstance(self.watsonx_client, APIClient):
watsonx_model = ModelInference(
model_id=self.model_id,
params=self.params,
api_client=self.watsonx_client,
project_id=self.project_id,
space_id=self.space_id,
verify=self.verify,
validate=self.validate_model,
)
self.watsonx_model = watsonx_model
else:
check_for_attribute(self.url, "url", "WATSONX_URL")
if "cloud.ibm.com" in self.url.get_secret_value():
if not self.token and not self.apikey:
raise ValueError(
"Did not find 'apikey' or 'token',"
" please add an environment variable"
" `WATSONX_APIKEY` or 'WATSONX_TOKEN' "
"which contains it,"
" or pass 'apikey' or 'token'"
" as a named parameter."
)
else:
if not self.token and not self.password and not self.apikey:
raise ValueError(
"Did not find 'token', 'password' or 'apikey',"
" please add an environment variable"
" `WATSONX_TOKEN`, 'WATSONX_PASSWORD' or 'WATSONX_APIKEY' "
"which contains it,"
" or pass 'token', 'password' or 'apikey'"
" as a named parameter."
)
elif self.token:
check_for_attribute(self.token, "token", "WATSONX_TOKEN")
elif self.password:
check_for_attribute(self.password, "password", "WATSONX_PASSWORD")
check_for_attribute(self.username, "username", "WATSONX_USERNAME")
elif self.apikey:
check_for_attribute(self.apikey, "apikey", "WATSONX_APIKEY")
check_for_attribute(self.username, "username", "WATSONX_USERNAME")
if not self.instance_id:
check_for_attribute(
self.instance_id, "instance_id", "WATSONX_INSTANCE_ID"
)
credentials = Credentials(
url=self.url.get_secret_value() if self.url else None,
api_key=self.apikey.get_secret_value() if self.apikey else None,
token=self.token.get_secret_value() if self.token else None,
password=self.password.get_secret_value() if self.password else None,
username=self.username.get_secret_value() if self.username else None,
instance_id=self.instance_id.get_secret_value()
if self.instance_id
else None,
version=self.version.get_secret_value() if self.version else None,
verify=self.verify,
)
watsonx_chat = ModelInference(
model_id=self.model_id,
deployment_id=self.deployment_id,
credentials=credentials,
params=self.params,
project_id=self.project_id,
space_id=self.space_id,
verify=self.verify,
validate=self.validate_model,
)
self.watsonx_model = watsonx_chat
return self
def _generate(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> ChatResult:
if self.streaming:
stream_iter = self._stream(
messages, stop=stop, run_manager=run_manager, **kwargs
)
return generate_from_stream(stream_iter)
message_dicts, params = self._create_message_dicts(messages, stop, **kwargs)
updated_params = self._merge_params(params, kwargs)
response = self.watsonx_model.chat(
messages=message_dicts, **(kwargs | {"params": updated_params})
)
return self._create_chat_result(response)
def _stream(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> Iterator[ChatGenerationChunk]:
message_dicts, params = self._create_message_dicts(messages, stop, **kwargs)
updated_params = self._merge_params(params, kwargs)
default_chunk_class: Type[BaseMessageChunk] = AIMessageChunk
base_generation_info: dict = {}
is_first_chunk = True
is_first_tool_chunk = True
for chunk in self.watsonx_model.chat_stream(
messages=message_dicts, **(kwargs | {"params": updated_params})
):
if not isinstance(chunk, dict):
chunk = chunk.model_dump()
generation_chunk = _convert_chunk_to_generation_chunk(
chunk,
default_chunk_class,
base_generation_info if is_first_chunk else {},
is_first_chunk,
is_first_tool_chunk,
)
if generation_chunk is None:
continue
default_chunk_class = generation_chunk.message.__class__
logprobs = (generation_chunk.generation_info or {}).get("logprobs")
if run_manager:
run_manager.on_llm_new_token(
generation_chunk.text, chunk=generation_chunk, logprobs=logprobs
)
if hasattr(generation_chunk.message, "tool_calls") and isinstance(
generation_chunk.message.tool_calls, list
):
first_tool_call = (
generation_chunk.message.tool_calls[0]
if generation_chunk.message.tool_calls
else None
)
if isinstance(first_tool_call, dict) and first_tool_call.get("name"):
is_first_tool_chunk = False
is_first_chunk = False
yield generation_chunk
@staticmethod
def _merge_params(params: dict, kwargs: dict) -> dict:
param_updates = {}
for k in [
"frequency_penalty",
"logprobs",
"top_logprobs",
"max_tokens",
"n",
"presence_penalty",
"response_format",
"temperature",
"top_p",
"time_limit",
]:
if kwargs.get(k) is not None:
param_updates[k] = kwargs.pop(k)
if kwargs.get("params"):
merged_params = merge_dicts(params, param_updates)
else:
merged_params = params | param_updates
return merged_params
def _create_message_dicts(
self, messages: List[BaseMessage], stop: Optional[List[str]], **kwargs: Any
) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]:
params = extract_chat_params(kwargs, self.params)
if stop is not None:
if params and "stop_sequences" in params:
raise ValueError(
"`stop_sequences` found in both the input and default params."
)
params = (params or {}) | {"stop_sequences": stop}
message_dicts = [_convert_message_to_dict(m, self.model_id) for m in messages]
return message_dicts, params or {}
def _create_chat_result(
self, response: dict, generation_info: Optional[Dict] = None
) -> ChatResult:
generations = []
if response.get("error"):
raise ValueError(response.get("error"))
token_usage = response.get("usage", {})
for res in response["choices"]:
message = _convert_dict_to_message(res["message"], response["id"])
if token_usage and isinstance(message, AIMessage):
message.usage_metadata = _create_usage_metadata(token_usage, True)
generation_info = generation_info or {}
generation_info["finish_reason"] = (
res.get("finish_reason")
if res.get("finish_reason") is not None
else generation_info.get("finish_reason")
)
if "logprobs" in res:
generation_info["logprobs"] = res["logprobs"]
gen = ChatGeneration(message=message, generation_info=generation_info)
generations.append(gen)
llm_output = {
"token_usage": token_usage,
"model_name": response.get("model_id", self.model_id),
"system_fingerprint": response.get("system_fingerprint", ""),
}
return ChatResult(generations=generations, llm_output=llm_output)
[docs]
def bind_functions(
self,
functions: Sequence[Union[Dict[str, Any], Type[BaseModel], Callable, BaseTool]],
function_call: Optional[
Union[_FunctionCall, str, Literal["auto", "none"]]
] = None,
**kwargs: Any,
) -> Runnable[LanguageModelInput, BaseMessage]:
"""Bind functions (and other objects) to this chat model.
Assumes model is compatible with IBM watsonx.ai function-calling API.
Args:
functions: A list of function definitions to bind to this chat model.
Can be a dictionary, pydantic model, or callable. Pydantic
models and callables will be automatically converted to
their schema dictionary representation.
function_call: Which function to require the model to call.
Must be the name of the single provided function or
"auto" to automatically determine which function to call
(if any).
**kwargs: Any additional parameters to pass to the
:class:`~langchain.runnable.Runnable` constructor.
"""
formatted_functions = [convert_to_openai_function(fn) for fn in functions]
if function_call is not None:
function_call = (
{"name": function_call}
if isinstance(function_call, str)
and function_call not in ("auto", "none")
else function_call
)
if isinstance(function_call, dict) and len(formatted_functions) != 1:
raise ValueError(
"When specifying `function_call`, you must provide exactly one "
"function."
)
if (
isinstance(function_call, dict)
and formatted_functions[0]["name"] != function_call["name"]
):
raise ValueError(
f"Function call {function_call} was specified, but the only "
f"provided function was {formatted_functions[0]['name']}."
)
kwargs = {**kwargs, "function_call": function_call}
return super().bind(
functions=formatted_functions,
**kwargs,
)
[docs]
def with_structured_output(
self,
schema: Optional[Union[Dict, Type]] = None,
*,
method: Literal["function_calling", "json_mode"] = "function_calling",
include_raw: bool = False,
**kwargs: Any,
) -> Runnable[LanguageModelInput, Union[Dict, BaseModel]]:
"""Model wrapper that returns outputs formatted to match the given schema.
Args:
schema: The output schema as a dict or a Pydantic class. If a Pydantic class
then the model output will be an object of that class. If a dict then
the model output will be a dict. With a Pydantic class the returned
attributes will be validated, whereas with a dict they will not be. If
`method` is "function_calling" and `schema` is a dict, then the dict
must match the IBM watsonx.ai function-calling spec.
method: The method for steering model generation, either "function_calling"
or "json_mode". If "function_calling" then the schema will be converted
to an IBM watsonx.ai function and the returned model will make use of the
function-calling API. If "json_mode" then IBM watsonx.ai's JSON mode will be
used. Note that if using "json_mode" then you must include instructions
for formatting the output into the desired schema into the model call.
include_raw: If False then only the parsed structured output is returned. If
an error occurs during model output parsing it will be raised. If True
then both the raw model response (a BaseMessage) and the parsed model
response will be returned. If an error occurs during output parsing it
will be caught and returned as well. The final output is always a dict
with keys "raw", "parsed", and "parsing_error".
Returns:
A Runnable that takes any ChatModel input and returns as output:
If include_raw is True then a dict with keys:
raw: BaseMessage
parsed: Optional[_DictOrPydantic]
parsing_error: Optional[BaseException]
If include_raw is False then just _DictOrPydantic is returned,
where _DictOrPydantic depends on the schema:
If schema is a Pydantic class then _DictOrPydantic is the Pydantic
class.
If schema is a dict then _DictOrPydantic is a dict.
Example: Function-calling, Pydantic schema (method="function_calling", include_raw=False):
.. code-block:: python
from langchain_ibm import ChatWatsonx
from pydantic import BaseModel
class AnswerWithJustification(BaseModel):
'''An answer to the user question along with justification for the answer.'''
answer: str
justification: str
llm = ChatWatsonx(...)
structured_llm = llm.with_structured_output(AnswerWithJustification)
structured_llm.invoke("What weighs more a pound of bricks or a pound of feathers")
# -> AnswerWithJustification(
# answer='They weigh the same',
# justification='Both a pound of bricks and a pound of feathers weigh one pound. The weight is the same, but the volume or density of the objects may differ.'
# )
Example: Function-calling, Pydantic schema (method="function_calling", include_raw=True):
.. code-block:: python
from langchain_ibm import ChatWatsonx
from pydantic import BaseModel
class AnswerWithJustification(BaseModel):
'''An answer to the user question along with justification for the answer.'''
answer: str
justification: str
llm = ChatWatsonx(...)
structured_llm = llm.with_structured_output(AnswerWithJustification, include_raw=True)
structured_llm.invoke("What weighs more a pound of bricks or a pound of feathers")
# -> {
# 'raw': AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_Ao02pnFYXD6GN1yzc0uXPsvF', 'function': {'arguments': '{"answer":"They weigh the same.","justification":"Both a pound of bricks and a pound of feathers weigh one pound. The weight is the same, but the volume or density of the objects may differ."}', 'name': 'AnswerWithJustification'}, 'type': 'function'}]}),
# 'parsed': AnswerWithJustification(answer='They weigh the same.', justification='Both a pound of bricks and a pound of feathers weigh one pound. The weight is the same, but the volume or density of the objects may differ.'),
# 'parsing_error': None
# }
Example: Function-calling, dict schema (method="function_calling", include_raw=False):
.. code-block:: python
from langchain_ibm import ChatWatsonx
from pydantic import BaseModel
from langchain_core.utils.function_calling import convert_to_openai_tool
class AnswerWithJustification(BaseModel):
'''An answer to the user question along with justification for the answer.'''
answer: str
justification: str
dict_schema = convert_to_openai_tool(AnswerWithJustification)
llm = ChatWatsonx(...)
structured_llm = llm.with_structured_output(dict_schema)
structured_llm.invoke("What weighs more a pound of bricks or a pound of feathers")
# -> {
# 'answer': 'They weigh the same',
# 'justification': 'Both a pound of bricks and a pound of feathers weigh one pound. The weight is the same, but the volume and density of the two substances differ.'
# }
Example: JSON mode, Pydantic schema (method="json_mode", include_raw=True):
.. code-block::
from langchain_ibm import ChatWatsonx
from pydantic import BaseModel
class AnswerWithJustification(BaseModel):
answer: str
justification: str
llm = ChatWatsonx(...)
structured_llm = llm.with_structured_output(
AnswerWithJustification,
method="json_mode",
include_raw=True
)
structured_llm.invoke(
"Answer the following question. "
"Make sure to return a JSON blob with keys 'answer' and 'justification'.\n\n"
"What's heavier a pound of bricks or a pound of feathers?"
)
# -> {
# 'raw': AIMessage(content='{\n "answer": "They are both the same weight.",\n "justification": "Both a pound of bricks and a pound of feathers weigh one pound. The difference lies in the volume and density of the materials, not the weight." \n}'),
# 'parsed': AnswerWithJustification(answer='They are both the same weight.', justification='Both a pound of bricks and a pound of feathers weigh one pound. The difference lies in the volume and density of the materials, not the weight.'),
# 'parsing_error': None
# }
Example: JSON mode, no schema (schema=None, method="json_mode", include_raw=True):
.. code-block::
from langchain_ibm import ChatWatsonx
structured_llm = llm.with_structured_output(method="json_mode", include_raw=True)
structured_llm.invoke(
"Answer the following question. "
"Make sure to return a JSON blob with keys 'answer' and 'justification'.\n\n"
"What's heavier a pound of bricks or a pound of feathers?"
)
# -> {
# 'raw': AIMessage(content='{\n "answer": "They are both the same weight.",\n "justification": "Both a pound of bricks and a pound of feathers weigh one pound. The difference lies in the volume and density of the materials, not the weight." \n}'),
# 'parsed': {
# 'answer': 'They are both the same weight.',
# 'justification': 'Both a pound of bricks and a pound of feathers weigh one pound. The difference lies in the volume and density of the materials, not the weight.'
# },
# 'parsing_error': None
# }
""" # noqa: E501
if kwargs:
raise ValueError(f"Received unsupported arguments {kwargs}")
is_pydantic_schema = isinstance(schema, type) and is_basemodel_subclass(schema)
if method == "function_calling":
if schema is None:
raise ValueError(
"schema must be specified when method is 'function_calling'. "
"Received None."
)
# specifying a tool.
tool_name = convert_to_openai_tool(schema)["function"]["name"]
tool_choice = {"type": "function", "function": {"name": tool_name}}
llm = self.bind_tools([schema], tool_choice=tool_choice)
if is_pydantic_schema:
output_parser: OutputParserLike = PydanticToolsParser(
tools=[schema], # type: ignore[list-item]
first_tool_only=True, # type: ignore[list-item]
)
else:
key_name = convert_to_openai_tool(schema)["function"]["name"]
output_parser = JsonOutputKeyToolsParser(
key_name=key_name, first_tool_only=True
)
elif method == "json_mode":
llm = self.bind(response_format={"type": "json_object"})
output_parser = (
PydanticOutputParser(pydantic_object=schema) # type: ignore[type-var, arg-type]
if is_pydantic_schema
else JsonOutputParser()
)
if include_raw:
parser_assign = RunnablePassthrough.assign(
parsed=itemgetter("raw") | output_parser, parsing_error=lambda _: None
)
parser_none = RunnablePassthrough.assign(parsed=lambda _: None)
parser_with_fallback = parser_assign.with_fallbacks(
[parser_none], exception_key="parsing_error"
)
return RunnableMap(raw=llm) | parser_with_fallback
else:
return llm | output_parser
def _is_pydantic_class(obj: Any) -> bool:
return isinstance(obj, type) and issubclass(obj, BaseModel)
def _lc_tool_call_to_watsonx_tool_call(tool_call: ToolCall) -> dict:
return {
"type": "function",
"id": tool_call["id"],
"function": {
"name": tool_call["name"],
"arguments": json.dumps(tool_call["args"]),
},
}
def _lc_invalid_tool_call_to_watsonx_tool_call(
invalid_tool_call: InvalidToolCall,
) -> dict:
return {
"type": "function",
"id": invalid_tool_call["id"],
"function": {
"name": invalid_tool_call["name"],
"arguments": invalid_tool_call["args"],
},
}
def _create_usage_metadata(
oai_token_usage: dict, is_first_chunk: bool
) -> UsageMetadata:
input_tokens = oai_token_usage.get("prompt_tokens", 0) if is_first_chunk else 0
output_tokens = oai_token_usage.get("completion_tokens", 0)
total_tokens = oai_token_usage.get("total_tokens", input_tokens + output_tokens)
return UsageMetadata(
input_tokens=input_tokens,
output_tokens=output_tokens,
total_tokens=total_tokens,
)