import logging
from typing import Any, Dict, List, Optional, Union
from ibm_watsonx_ai import APIClient # type: ignore
from ibm_watsonx_ai.foundation_models.embeddings import Embeddings # type: ignore
from ibm_watsonx_ai.gateway import Gateway # type: ignore
from langchain_core.embeddings import Embeddings as LangChainEmbeddings
from langchain_core.utils.utils import secret_from_env
from pydantic import BaseModel, ConfigDict, Field, SecretStr, model_validator
from typing_extensions import Self
from langchain_ibm.utils import (
async_gateway_error_handler,
extract_params,
gateway_error_handler,
resolve_watsonx_credentials,
)
logger = logging.getLogger(__name__)
[docs]
class WatsonxEmbeddings(BaseModel, LangChainEmbeddings):
"""IBM watsonx.ai embedding models."""
model_id: Optional[str] = None
"""Type of model to use."""
model: Optional[str] = None
"""
Name or alias of the foundation model to use.
When using IBM’s watsonx.ai Model Gateway (public preview), you can specify any
supported third-party model—OpenAI, Anthropic, NVIDIA, Cerebras, or IBM’s own
Granite series—via a single, OpenAI-compatible interface. Models must be explicitly
provisioned (opt-in) through the Gateway to ensure secure, vendor-agnostic access
and easy switch-over without reconfiguration.
For more details on configuration and usage, see IBM watsonx Model Gateway docs: https://dataplatform.cloud.ibm.com/docs/content/wsj/analyze-data/fm-model-gateway.html?context=wx&audience=wdp
"""
project_id: Optional[str] = None
"""ID of the Watson Studio project."""
space_id: Optional[str] = None
"""ID of the Watson Studio space."""
url: SecretStr = Field(
alias="url",
default_factory=secret_from_env("WATSONX_URL", default=None), # type: ignore[assignment]
)
"""URL to the Watson Machine Learning or CPD instance."""
apikey: Optional[SecretStr] = Field(
alias="apikey", default_factory=secret_from_env("WATSONX_APIKEY", default=None)
)
"""API key to the Watson Machine Learning or CPD instance."""
token: Optional[SecretStr] = Field(
alias="token", default_factory=secret_from_env("WATSONX_TOKEN", default=None)
)
"""Token to the CPD instance."""
password: Optional[SecretStr] = Field(
alias="password",
default_factory=secret_from_env("WATSONX_PASSWORD", default=None),
)
"""Password to the CPD instance."""
username: Optional[SecretStr] = Field(
alias="username",
default_factory=secret_from_env("WATSONX_USERNAME", default=None),
)
"""Username to the CPD instance."""
instance_id: Optional[SecretStr] = Field(
alias="instance_id",
default_factory=secret_from_env("WATSONX_INSTANCE_ID", default=None),
)
"""Instance_id of the CPD instance."""
version: Optional[SecretStr] = None
"""Version of the CPD instance."""
params: Optional[Dict] = None
"""Model parameters to use during request generation."""
verify: Union[str, bool, None] = None
"""You can pass one of following as verify:
* the path to a CA_BUNDLE file
* the path of directory with certificates of trusted CAs
* True - default path to truststore will be taken
* False - no verification will be made
"""
watsonx_embed: Embeddings = Field(default=None) #: :meta private:
watsonx_embed_gateway: Gateway = Field(
default=None, exclude=True
) #: :meta private:
watsonx_client: Optional[APIClient] = Field(default=None) #: :meta private:
model_config = ConfigDict(
extra="forbid",
arbitrary_types_allowed=True,
protected_namespaces=(),
)
@model_validator(mode="after")
def validate_environment(self) -> Self:
"""Validate that credentials and python package exists in environment."""
if self.watsonx_embed_gateway is not None:
raise NotImplementedError(
"Passing the 'watsonx_embed_gateway' parameter to the "
"WatsonxEmbeddings constructor is not supported yet."
)
if isinstance(self.watsonx_embed, Embeddings):
self.model_id = getattr(self.watsonx_embed, "model_id")
self.project_id = getattr(
getattr(self.watsonx_embed, "_client"),
"default_project_id",
)
self.space_id = getattr(
getattr(self.watsonx_embed, "_client"), "default_space_id"
)
self.params = getattr(self.watsonx_embed, "params")
elif isinstance(self.watsonx_client, APIClient):
if sum(map(bool, (self.model, self.model_id))) != 1:
raise ValueError(
"The parameters 'model' and 'model_id' are mutually exclusive. "
"Please specify exactly one of these parameters when "
"initializing WatsonxEmbeddings."
)
if self.model is not None:
watsonx_embed_gateway = Gateway(
api_client=self.watsonx_client,
verify=self.verify,
)
self.watsonx_embed_gateway = watsonx_embed_gateway
else:
watsonx_embed = Embeddings(
model_id=self.model_id,
params=self.params,
api_client=self.watsonx_client,
project_id=self.project_id,
space_id=self.space_id,
verify=self.verify,
)
self.watsonx_embed = watsonx_embed
else:
if sum(map(bool, (self.model, self.model_id))) != 1:
raise ValueError(
"The parameters 'model' and 'model_id' are mutually exclusive. "
"Please specify exactly one of these parameters when "
"initializing WatsonxEmbeddings."
)
credentials = resolve_watsonx_credentials(
url=self.url,
apikey=self.apikey,
token=self.token,
password=self.password,
username=self.username,
instance_id=self.instance_id,
version=self.version,
verify=self.verify,
)
if self.model is not None:
watsonx_embed_gateway = Gateway(
credentials=credentials,
verify=self.verify,
)
self.watsonx_embed_gateway = watsonx_embed_gateway
else:
watsonx_embed = Embeddings(
model_id=self.model_id,
params=self.params,
credentials=credentials,
project_id=self.project_id,
space_id=self.space_id,
)
self.watsonx_embed = watsonx_embed
return self
@gateway_error_handler
def _call_model_gateway(
self, *, model: str, texts: List[str], **params: Any
) -> Any:
return self.watsonx_embed_gateway.embeddings.create(
model=model, input=texts, **params
)
@async_gateway_error_handler
async def _acall_model_gateway(
self, *, model: str, texts: List[str], **params: Any
) -> Any:
return await self.watsonx_embed_gateway.embeddings.acreate(
model=model, input=texts, **params
)
[docs]
def embed_documents(self, texts: List[str], **kwargs: Any) -> List[List[float]]:
"""Embed search docs."""
params = extract_params(kwargs, self.params)
if self.watsonx_embed_gateway is not None:
call_kwargs = {**kwargs, **params}
embed_response = self._call_model_gateway(
model=self.model, texts=texts, **call_kwargs
)
return [embedding["embedding"] for embedding in embed_response["data"]]
else:
return self.watsonx_embed.embed_documents(
texts=texts, **(kwargs | {"params": params})
)
[docs]
async def aembed_documents(
self, texts: List[str], **kwargs: Any
) -> List[List[float]]:
"""Asynchronous Embed search docs."""
params = extract_params(kwargs, self.params)
if self.watsonx_embed_gateway is not None:
call_kwargs = {**kwargs, **params}
embed_response = await self._acall_model_gateway(
model=self.model, texts=texts, **call_kwargs
)
return [embedding["embedding"] for embedding in embed_response["data"]]
else:
return await self.watsonx_embed.aembed_documents(
texts=texts, **(kwargs | {"params": params})
)
[docs]
def embed_query(self, text: str, **kwargs: Any) -> List[float]:
"""Embed query text."""
return self.embed_documents([text], **kwargs)[0]
[docs]
async def aembed_query(self, text: str, **kwargs: Any) -> List[float]:
"""Asynchronous Embed query text."""
embeddings = await self.aembed_documents([text], **kwargs)
return embeddings[0]