from __future__ import annotations
import logging
import re
from typing import Any, Dict, List, Optional
import cohere
from langchain_core.callbacks import (
AsyncCallbackManagerForLLMRun,
CallbackManagerForLLMRun,
)
from langchain_core.language_models.llms import LLM
from langchain_core.load.serializable import Serializable
from langchain_core.pydantic_v1 import Extra, Field, SecretStr, root_validator
from langchain_core.utils import convert_to_secret_str, get_from_dict_or_env
from .utils import _create_retry_decorator
[docs]def enforce_stop_tokens(text: str, stop: List[str]) -> str:
"""Cut off the text as soon as any stop words occur."""
return re.split("|".join(stop), text, maxsplit=1)[0]
logger = logging.getLogger(__name__)
[docs]def completion_with_retry(llm: Cohere, **kwargs: Any) -> Any:
"""Use tenacity to retry the completion call."""
retry_decorator = _create_retry_decorator(llm.max_retries)
@retry_decorator
def _completion_with_retry(**kwargs: Any) -> Any:
return llm.client.generate(**kwargs)
return _completion_with_retry(**kwargs)
[docs]def acompletion_with_retry(llm: Cohere, **kwargs: Any) -> Any:
"""Use tenacity to retry the completion call."""
retry_decorator = _create_retry_decorator(llm.max_retries)
@retry_decorator
async def _completion_with_retry(**kwargs: Any) -> Any:
return await llm.async_client.generate(**kwargs)
return _completion_with_retry(**kwargs)
[docs]class BaseCohere(Serializable):
"""Base class for Cohere models."""
client: Any = None #: :meta private:
async_client: Any = None #: :meta private:
model: Optional[str] = Field(default=None)
"""Model name to use."""
temperature: Optional[float] = None
"""A non-negative float that tunes the degree of randomness in generation."""
cohere_api_key: Optional[SecretStr] = None
"""Cohere API key. If not provided, will be read from the environment variable."""
stop: Optional[List[str]] = None
streaming: bool = Field(default=False)
"""Whether to stream the results."""
user_agent: str = "langchain:partner"
"""Identifier for the application making the request."""
timeout_seconds: Optional[float] = 300
"""Timeout in seconds for the Cohere API request."""
base_url: Optional[str] = None
"""Override the default Cohere API URL."""
@root_validator(pre=False, skip_on_failure=True)
def validate_environment(cls, values: Dict) -> Dict:
"""Validate that api key and python package exists in environment."""
values["cohere_api_key"] = convert_to_secret_str(
get_from_dict_or_env(values, "cohere_api_key", "COHERE_API_KEY")
)
client_name = values["user_agent"]
timeout_seconds = values.get("timeout_seconds")
values["client"] = cohere.Client(
api_key=values["cohere_api_key"].get_secret_value(),
timeout=timeout_seconds,
client_name=client_name,
base_url=values["base_url"],
)
values["async_client"] = cohere.AsyncClient(
api_key=values["cohere_api_key"].get_secret_value(),
client_name=client_name,
timeout=timeout_seconds,
base_url=values["base_url"],
)
return values
[docs]class Cohere(LLM, BaseCohere):
"""Cohere large language models.
To use, you should have the ``cohere`` python package installed, and the
environment variable ``COHERE_API_KEY`` set with your API key, or pass
it as a named parameter to the constructor.
Example:
.. code-block:: python
from langchain_cohere import Cohere
cohere = Cohere(cohere_api_key="my-api-key")
"""
max_tokens: Optional[int] = None
"""Denotes the number of tokens to predict per generation."""
k: Optional[int] = None
"""Number of most likely tokens to consider at each step."""
p: Optional[int] = None
"""Total probability mass of tokens to consider at each step."""
frequency_penalty: Optional[float] = None
"""Penalizes repeated tokens according to frequency. Between 0 and 1."""
presence_penalty: Optional[float] = None
"""Penalizes repeated tokens. Between 0 and 1."""
truncate: Optional[str] = None
"""Specify how the client handles inputs longer than the maximum token
length: Truncate from START, END or NONE"""
max_retries: int = 10
"""Maximum number of retries to make when generating."""
class Config:
"""Configuration for this pydantic object."""
arbitrary_types_allowed = True
extra = Extra.forbid
@property
def _default_params(self) -> Dict[str, Any]:
"""Configurable parameters for calling Cohere's generate API."""
base_params = {
"model": self.model,
"temperature": self.temperature,
"max_tokens": self.max_tokens,
"k": self.k,
"p": self.p,
"frequency_penalty": self.frequency_penalty,
"presence_penalty": self.presence_penalty,
"truncate": self.truncate,
}
return {k: v for k, v in base_params.items() if v is not None}
@property
def lc_secrets(self) -> Dict[str, str]:
return {"cohere_api_key": "COHERE_API_KEY"}
@property
def _identifying_params(self) -> Dict[str, Any]:
"""Get the identifying parameters."""
return self._default_params
@property
def _llm_type(self) -> str:
"""Return type of llm."""
return "cohere"
def _invocation_params(self, stop: Optional[List[str]], **kwargs: Any) -> dict:
params = self._default_params
if self.stop is not None and stop is not None:
raise ValueError("`stop` found in both the input and default params.")
elif self.stop is not None:
params["stop_sequences"] = self.stop
else:
params["stop_sequences"] = stop
return {**params, **kwargs}
def _process_response(self, response: Any, stop: Optional[List[str]]) -> str:
text = response.generations[0].text
# If stop tokens are provided, Cohere's endpoint returns them.
# In order to make this consistent with other endpoints, we strip them.
if stop:
text = enforce_stop_tokens(text, stop)
return text
def _call(
self,
prompt: str,
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> str:
"""Call out to Cohere's generate endpoint.
Args:
prompt: The prompt to pass into the model.
stop: Optional list of stop words to use when generating.
Returns:
The string generated by the model.
Example:
.. code-block:: python
response = cohere("Tell me a joke.")
"""
params = self._invocation_params(stop, **kwargs)
response = completion_with_retry(
self, model=self.model, prompt=prompt, **params
)
_stop = params.get("stop_sequences")
return self._process_response(response, _stop)
async def _acall(
self,
prompt: str,
stop: Optional[List[str]] = None,
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> str:
"""Async call out to Cohere's generate endpoint.
Args:
prompt: The prompt to pass into the model.
stop: Optional list of stop words to use when generating.
Returns:
The string generated by the model.
Example:
.. code-block:: python
response = await cohere("Tell me a joke.")
"""
params = self._invocation_params(stop, **kwargs)
response = await acompletion_with_retry(
self, model=self.model, prompt=prompt, **params
)
_stop = params.get("stop_sequences")
return self._process_response(response, _stop)