Source code for langchain_cohere.llms

from __future__ import annotations

import logging
import re
from typing import Any, Dict, List, Optional

import cohere
from langchain_core.callbacks import (
    AsyncCallbackManagerForLLMRun,
    CallbackManagerForLLMRun,
)
from langchain_core.language_models.llms import LLM
from langchain_core.load.serializable import Serializable
from langchain_core.pydantic_v1 import Extra, Field, SecretStr, root_validator
from langchain_core.utils import convert_to_secret_str, get_from_dict_or_env

from .utils import _create_retry_decorator


[docs]def enforce_stop_tokens(text: str, stop: List[str]) -> str: """Cut off the text as soon as any stop words occur.""" return re.split("|".join(stop), text, maxsplit=1)[0]
logger = logging.getLogger(__name__)
[docs]def completion_with_retry(llm: Cohere, **kwargs: Any) -> Any: """Use tenacity to retry the completion call.""" retry_decorator = _create_retry_decorator(llm.max_retries) @retry_decorator def _completion_with_retry(**kwargs: Any) -> Any: return llm.client.generate(**kwargs) return _completion_with_retry(**kwargs)
[docs]def acompletion_with_retry(llm: Cohere, **kwargs: Any) -> Any: """Use tenacity to retry the completion call.""" retry_decorator = _create_retry_decorator(llm.max_retries) @retry_decorator async def _completion_with_retry(**kwargs: Any) -> Any: return await llm.async_client.generate(**kwargs) return _completion_with_retry(**kwargs)
[docs]class BaseCohere(Serializable): """Base class for Cohere models.""" client: Any = None #: :meta private: async_client: Any = None #: :meta private: model: Optional[str] = Field(default=None) """Model name to use.""" temperature: Optional[float] = None """A non-negative float that tunes the degree of randomness in generation.""" cohere_api_key: Optional[SecretStr] = None """Cohere API key. If not provided, will be read from the environment variable.""" stop: Optional[List[str]] = None streaming: bool = Field(default=False) """Whether to stream the results.""" user_agent: str = "langchain:partner" """Identifier for the application making the request.""" timeout_seconds: Optional[float] = 300 """Timeout in seconds for the Cohere API request.""" base_url: Optional[str] = None """Override the default Cohere API URL.""" @root_validator(pre=False, skip_on_failure=True) def validate_environment(cls, values: Dict) -> Dict: """Validate that api key and python package exists in environment.""" values["cohere_api_key"] = convert_to_secret_str( get_from_dict_or_env(values, "cohere_api_key", "COHERE_API_KEY") ) client_name = values["user_agent"] timeout_seconds = values.get("timeout_seconds") values["client"] = cohere.Client( api_key=values["cohere_api_key"].get_secret_value(), timeout=timeout_seconds, client_name=client_name, base_url=values["base_url"], ) values["async_client"] = cohere.AsyncClient( api_key=values["cohere_api_key"].get_secret_value(), client_name=client_name, timeout=timeout_seconds, base_url=values["base_url"], ) return values
[docs]class Cohere(LLM, BaseCohere): """Cohere large language models. To use, you should have the ``cohere`` python package installed, and the environment variable ``COHERE_API_KEY`` set with your API key, or pass it as a named parameter to the constructor. Example: .. code-block:: python from langchain_cohere import Cohere cohere = Cohere(cohere_api_key="my-api-key") """ max_tokens: Optional[int] = None """Denotes the number of tokens to predict per generation.""" k: Optional[int] = None """Number of most likely tokens to consider at each step.""" p: Optional[int] = None """Total probability mass of tokens to consider at each step.""" frequency_penalty: Optional[float] = None """Penalizes repeated tokens according to frequency. Between 0 and 1.""" presence_penalty: Optional[float] = None """Penalizes repeated tokens. Between 0 and 1.""" truncate: Optional[str] = None """Specify how the client handles inputs longer than the maximum token length: Truncate from START, END or NONE""" max_retries: int = 10 """Maximum number of retries to make when generating.""" class Config: """Configuration for this pydantic object.""" arbitrary_types_allowed = True extra = Extra.forbid @property def _default_params(self) -> Dict[str, Any]: """Configurable parameters for calling Cohere's generate API.""" base_params = { "model": self.model, "temperature": self.temperature, "max_tokens": self.max_tokens, "k": self.k, "p": self.p, "frequency_penalty": self.frequency_penalty, "presence_penalty": self.presence_penalty, "truncate": self.truncate, } return {k: v for k, v in base_params.items() if v is not None} @property def lc_secrets(self) -> Dict[str, str]: return {"cohere_api_key": "COHERE_API_KEY"} @property def _identifying_params(self) -> Dict[str, Any]: """Get the identifying parameters.""" return self._default_params @property def _llm_type(self) -> str: """Return type of llm.""" return "cohere" def _invocation_params(self, stop: Optional[List[str]], **kwargs: Any) -> dict: params = self._default_params if self.stop is not None and stop is not None: raise ValueError("`stop` found in both the input and default params.") elif self.stop is not None: params["stop_sequences"] = self.stop else: params["stop_sequences"] = stop return {**params, **kwargs} def _process_response(self, response: Any, stop: Optional[List[str]]) -> str: text = response.generations[0].text # If stop tokens are provided, Cohere's endpoint returns them. # In order to make this consistent with other endpoints, we strip them. if stop: text = enforce_stop_tokens(text, stop) return text def _call( self, prompt: str, stop: Optional[List[str]] = None, run_manager: Optional[CallbackManagerForLLMRun] = None, **kwargs: Any, ) -> str: """Call out to Cohere's generate endpoint. Args: prompt: The prompt to pass into the model. stop: Optional list of stop words to use when generating. Returns: The string generated by the model. Example: .. code-block:: python response = cohere("Tell me a joke.") """ params = self._invocation_params(stop, **kwargs) response = completion_with_retry( self, model=self.model, prompt=prompt, **params ) _stop = params.get("stop_sequences") return self._process_response(response, _stop) async def _acall( self, prompt: str, stop: Optional[List[str]] = None, run_manager: Optional[AsyncCallbackManagerForLLMRun] = None, **kwargs: Any, ) -> str: """Async call out to Cohere's generate endpoint. Args: prompt: The prompt to pass into the model. stop: Optional list of stop words to use when generating. Returns: The string generated by the model. Example: .. code-block:: python response = await cohere("Tell me a joke.") """ params = self._invocation_params(stop, **kwargs) response = await acompletion_with_retry( self, model=self.model, prompt=prompt, **params ) _stop = params.get("stop_sequences") return self._process_response(response, _stop)