Source code for langchain_pinecone.embeddings

import logging
from typing import Any, Dict, Iterable, List, Optional

import aiohttp
from langchain_core.embeddings import Embeddings
from langchain_core.utils import secret_from_env
from pinecone import Pinecone as PineconeClient  # type: ignore[import-untyped]
from pydantic import (
    BaseModel,
    ConfigDict,
    Field,
    PrivateAttr,
    SecretStr,
    model_validator,
)
from typing_extensions import Self

logger = logging.getLogger(__name__)

DEFAULT_BATCH_SIZE = 64


[docs] class PineconeEmbeddings(BaseModel, Embeddings): """PineconeEmbeddings embedding model. Example: .. code-block:: python from langchain_pinecone import PineconeEmbeddings model = PineconeEmbeddings(model="multilingual-e5-large") """ # Clients _client: PineconeClient = PrivateAttr(default=None) _async_client: Optional[aiohttp.ClientSession] = PrivateAttr(default=None) model: str """Model to use for example 'multilingual-e5-large'.""" # Config batch_size: Optional[int] = None """Batch size for embedding documents.""" query_params: Dict = Field(default_factory=dict) """Parameters for embedding query.""" document_params: Dict = Field(default_factory=dict) """Parameters for embedding document""" # dimension: Optional[int] = None # show_progress_bar: bool = False pinecone_api_key: SecretStr = Field( default_factory=secret_from_env( "PINECONE_API_KEY", error_message="Pinecone API key not found. Please set the PINECONE_API_KEY " "environment variable or pass it via `pinecone_api_key`.", ), alias="api_key", ) """Pinecone API key. If not provided, will look for the PINECONE_API_KEY environment variable.""" model_config = ConfigDict( extra="forbid", populate_by_name=True, protected_namespaces=(), ) @property def async_client(self) -> aiohttp.ClientSession: """Lazily initialize the async client.""" if self._async_client is None: self._async_client = aiohttp.ClientSession( headers={ "Api-Key": self.pinecone_api_key.get_secret_value(), "Content-Type": "application/json", "X-Pinecone-API-Version": "2024-10", } ) return self._async_client @model_validator(mode="before") @classmethod def set_default_config(cls, values: dict) -> Any: """Set default configuration based on model.""" default_config_map = { "multilingual-e5-large": { "batch_size": 96, "query_params": {"input_type": "query", "truncation": "END"}, "document_params": {"input_type": "passage", "truncation": "END"}, "dimension": 1024, } } model = values.get("model") if model in default_config_map: config = default_config_map[model] for key, value in config.items(): if key not in values: values[key] = value return values @model_validator(mode="after") def validate_environment(self) -> Self: """Validate that Pinecone version and credentials exist in environment.""" api_key_str = self.pinecone_api_key.get_secret_value() client = PineconeClient(api_key=api_key_str, source_tag="langchain") self._client = client # Ensure async_client is lazily initialized _ = self.async_client return self def _get_batch_iterator(self, texts: List[str]) -> Iterable: if self.batch_size is None: batch_size = DEFAULT_BATCH_SIZE else: batch_size = self.batch_size if self.show_progress_bar: try: from tqdm.auto import tqdm # type: ignore except ImportError as e: raise ImportError( "Must have tqdm installed if `show_progress_bar` is set to True. " "Please install with `pip install tqdm`." ) from e _iter = tqdm(range(0, len(texts), batch_size)) else: _iter = range(0, len(texts), batch_size) return _iter
[docs] def embed_documents(self, texts: List[str]) -> List[List[float]]: """Embed search docs.""" embeddings: List[List[float]] = [] _iter = self._get_batch_iterator(texts) for i in _iter: response = self._client.inference.embed( model=self.model, parameters=self.document_params, inputs=texts[i : i + self.batch_size], ) embeddings.extend([r["values"] for r in response]) return embeddings
[docs] async def aembed_documents(self, texts: List[str]) -> List[List[float]]: embeddings: List[List[float]] = [] _iter = self._get_batch_iterator(texts) for i in _iter: response = await self._aembed_texts( model=self.model, parameters=self.document_params, texts=texts[i : i + self.batch_size], ) embeddings.extend([r["values"] for r in response["data"]]) return embeddings
[docs] def embed_query(self, text: str) -> List[float]: """Embed query text.""" return self._client.inference.embed( model=self.model, parameters=self.query_params, inputs=[text] )[0]["values"]
[docs] async def aembed_query(self, text: str) -> List[float]: """Asynchronously embed query text.""" response = await self._aembed_texts( model=self.model, parameters=self.document_params, texts=[text], ) return response["data"][0]["values"]
async def _aembed_texts( self, texts: List[str], model: str, parameters: dict ) -> Dict: data = { "model": model, "inputs": [{"text": text} for text in texts], "parameters": parameters, } async with self.async_client.post( "https://api.pinecone.io/embed", json=data ) as response: response_data = await response.json(content_type=None) return response_data