Source code for langchain_community.llms.sambanova

import json
from typing import Any, Dict, Iterator, List, Optional, Tuple, Union

import requests
from langchain_core.callbacks.manager import CallbackManagerForLLMRun
from langchain_core.language_models.llms import LLM
from langchain_core.outputs import GenerationChunk
from langchain_core.utils import convert_to_secret_str, get_from_dict_or_env
from pydantic import Field, SecretStr
from requests import Response


[docs] class SambaStudio(LLM): """ SambaStudio large language models. Setup: To use, you should have the environment variables ``SAMBASTUDIO_URL`` set with your SambaStudio environment URL. ``SAMBASTUDIO_API_KEY`` set with your SambaStudio endpoint API key. https://sambanova.ai/products/enterprise-ai-platform-sambanova-suite read extra documentation in https://docs.sambanova.ai/sambastudio/latest/index.html Example: .. code-block:: python from langchain_community.llms.sambanova import SambaStudio SambaStudio( sambastudio_url="your-SambaStudio-environment-URL", sambastudio_api_key="your-SambaStudio-API-key, model_kwargs={ "model" : model or expert name (set for CoE endpoints), "max_tokens" : max number of tokens to generate, "temperature" : model temperature, "top_p" : model top p, "top_k" : model top k, "do_sample" : wether to do sample "process_prompt": wether to process prompt (set for CoE generic v1 and v2 endpoints) }, ) Key init args — completion params: model: str The name of the model to use, e.g., Meta-Llama-3-70B-Instruct-4096 (set for CoE endpoints). streaming: bool Whether to use streaming handler when using non streaming methods model_kwargs: dict Extra Key word arguments to pass to the model: max_tokens: int max tokens to generate temperature: float model temperature top_p: float model top p top_k: int model top k do_sample: bool wether to do sample process_prompt: wether to process prompt (set for CoE generic v1 and v2 endpoints) Key init args — client params: sambastudio_url: str SambaStudio endpoint Url sambastudio_api_key: str SambaStudio endpoint api key Instantiate: .. code-block:: python from langchain_community.llms import SambaStudio llm = SambaStudio=( sambastudio_url = set with your SambaStudio deployed endpoint URL, sambastudio_api_key = set with your SambaStudio deployed endpoint Key, model_kwargs = { "model" : model or expert name (set for CoE endpoints), "max_tokens" : max number of tokens to generate, "temperature" : model temperature, "top_p" : model top p, "top_k" : model top k, "do_sample" : wether to do sample "process_prompt" : wether to process prompt (set for CoE generic v1 and v2 endpoints) } ) Invoke: .. code-block:: python prompt = "tell me a joke" response = llm.invoke(prompt) Stream: .. code-block:: python for chunk in llm.stream(prompt): print(chunk, end="", flush=True) Async: .. code-block:: python response = llm.ainvoke(prompt) await response """ sambastudio_url: str = Field(default="") """SambaStudio Url""" sambastudio_api_key: SecretStr = Field(default="") """SambaStudio api key""" base_url: str = Field(default="", exclude=True) """SambaStudio non streaming URL""" streaming_url: str = Field(default="", exclude=True) """SambaStudio streaming URL""" streaming: bool = Field(default=False) """Whether to use streaming handler when using non streaming methods""" model_kwargs: Optional[Dict[str, Any]] = None """Key word arguments to pass to the model."""
[docs] class Config: populate_by_name = True
@classmethod def is_lc_serializable(cls) -> bool: """Return whether this model can be serialized by Langchain.""" return True @property def lc_secrets(self) -> Dict[str, str]: return { "sambastudio_url": "sambastudio_url", "sambastudio_api_key": "sambastudio_api_key", } @property def _identifying_params(self) -> Dict[str, Any]: """Return a dictionary of identifying parameters. This information is used by the LangChain callback system, which is used for tracing purposes make it possible to monitor LLMs. """ return {"streaming": self.streaming, **{"model_kwargs": self.model_kwargs}} @property def _llm_type(self) -> str: """Return type of llm.""" return "sambastudio-llm" def __init__(self, **kwargs: Any) -> None: """init and validate environment variables""" kwargs["sambastudio_url"] = get_from_dict_or_env( kwargs, "sambastudio_url", "SAMBASTUDIO_URL" ) kwargs["sambastudio_api_key"] = convert_to_secret_str( get_from_dict_or_env(kwargs, "sambastudio_api_key", "SAMBASTUDIO_API_KEY") ) kwargs["base_url"], kwargs["streaming_url"] = self._get_sambastudio_urls( kwargs["sambastudio_url"] ) super().__init__(**kwargs) def _get_sambastudio_urls(self, url: str) -> Tuple[str, str]: """ Get streaming and non streaming URLs from the given URL Args: url: string with sambastudio base or streaming endpoint url Returns: base_url: string with url to do non streaming calls streaming_url: string with url to do streaming calls """ if "openai" in url: base_url = url stream_url = url else: if "stream" in url: base_url = url.replace("stream/", "") stream_url = url else: base_url = url if "generic" in url: stream_url = "generic/stream".join(url.split("generic")) else: raise ValueError("Unsupported URL") return base_url, stream_url def _get_tuning_params(self, stop: Optional[List[str]] = None) -> Dict[str, Any]: """ Get the tuning parameters to use when calling the LLM. Args: stop: Stop words to use when generating. Model output is cut off at the first occurrence of any of the stop substrings. Returns: The tuning parameters in the format required by api to use """ if stop is None: stop = [] # get the parameters to use when calling the LLM. _model_kwargs = self.model_kwargs or {} # handle the case where stop sequences are send in the invocation # and stop sequences has been also set in the model parameters _stop_sequences = _model_kwargs.get("stop_sequences", []) + stop if len(_stop_sequences) > 0: _model_kwargs["stop_sequences"] = _stop_sequences # set the parameters structure depending of the API if "openai" in self.sambastudio_url: if "select_expert" in _model_kwargs.keys(): _model_kwargs["model"] = _model_kwargs.pop("select_expert") if "max_tokens_to_generate" in _model_kwargs.keys(): _model_kwargs["max_tokens"] = _model_kwargs.pop( "max_tokens_to_generate" ) if "process_prompt" in _model_kwargs.keys(): _model_kwargs.pop("process_prompt") tuning_params = _model_kwargs elif "api/v2/predict/generic" in self.sambastudio_url: if "model" in _model_kwargs.keys(): _model_kwargs["select_expert"] = _model_kwargs.pop("model") if "max_tokens" in _model_kwargs.keys(): _model_kwargs["max_tokens_to_generate"] = _model_kwargs.pop( "max_tokens" ) tuning_params = _model_kwargs elif "api/predict/generic" in self.sambastudio_url: if "model" in _model_kwargs.keys(): _model_kwargs["select_expert"] = _model_kwargs.pop("model") if "max_tokens" in _model_kwargs.keys(): _model_kwargs["max_tokens_to_generate"] = _model_kwargs.pop( "max_tokens" ) tuning_params = { k: {"type": type(v).__name__, "value": str(v)} for k, v in (_model_kwargs.items()) } else: raise ValueError( f"Unsupported URL{self.sambastudio_url}" "only openai, generic v1 and generic v2 APIs are supported" ) return tuning_params def _handle_request( self, prompt: Union[List[str], str], stop: Optional[List[str]] = None, streaming: Optional[bool] = False, ) -> Response: """ Performs a post request to the LLM API. Args: prompt: The prompt to pass into the model stop: list of stop tokens streaming: wether to do a streaming call Returns: A request Response object """ if isinstance(prompt, str): prompt = [prompt] params = self._get_tuning_params(stop) # create request payload for openAI v1 API if "openai" in self.sambastudio_url: messages_dict = [{"role": "user", "content": prompt[0]}] data = {"messages": messages_dict, "stream": streaming, **params} data = {key: value for key, value in data.items() if value is not None} headers = { "Authorization": f"Bearer " f"{self.sambastudio_api_key.get_secret_value()}", "Content-Type": "application/json", } # create request payload for generic v1 API elif "api/v2/predict/generic" in self.sambastudio_url: if params.get("process_prompt", False): prompt = json.dumps( { "conversation_id": "sambaverse-conversation-id", "messages": [ {"message_id": None, "role": "user", "content": prompt[0]} ], } ) else: prompt = prompt[0] items = [{"id": "item0", "value": prompt}] params = {key: value for key, value in params.items() if value is not None} data = {"items": items, "params": params} headers = {"key": self.sambastudio_api_key.get_secret_value()} # create request payload for generic v1 API elif "api/predict/generic" in self.sambastudio_url: if params.get("process_prompt", False): if params["process_prompt"].get("value") == "True": prompt = json.dumps( { "conversation_id": "sambaverse-conversation-id", "messages": [ { "message_id": None, "role": "user", "content": prompt[0], } ], } ) else: prompt = prompt[0] else: prompt = prompt[0] if streaming: data = {"instance": prompt, "params": params} else: data = {"instances": [prompt], "params": params} headers = {"key": self.sambastudio_api_key.get_secret_value()} else: raise ValueError( f"Unsupported URL{self.sambastudio_url}" "only openai, generic v1 and generic v2 APIs are supported" ) # make the request to SambaStudio API http_session = requests.Session() if streaming: response = http_session.post( self.streaming_url, headers=headers, json=data, stream=True ) else: response = http_session.post( self.base_url, headers=headers, json=data, stream=False ) if response.status_code != 200: raise RuntimeError( f"Sambanova / complete call failed with status code " f"{response.status_code}." f"{response.text}." ) return response def _process_response(self, response: Response) -> str: """ Process a non streaming response from the api Args: response: A request Response object Returns completion: a string with model generation """ # Extract json payload form response try: response_dict = response.json() except Exception as e: raise RuntimeError( f"Sambanova /complete call failed couldn't get JSON response {e}" f"response: {response.text}" ) # process response payload for openai compatible API if "openai" in self.sambastudio_url: completion = response_dict["choices"][0]["message"]["content"] # process response payload for generic v2 API elif "api/v2/predict/generic" in self.sambastudio_url: completion = response_dict["items"][0]["value"]["completion"] # process response payload for generic v1 API elif "api/predict/generic" in self.sambastudio_url: completion = response_dict["predictions"][0]["completion"] else: raise ValueError( f"Unsupported URL{self.sambastudio_url}" "only openai, generic v1 and generic v2 APIs are supported" ) return completion def _process_stream_response(self, response: Response) -> Iterator[GenerationChunk]: """ Process a streaming response from the api Args: response: An iterable request Response object Yields: GenerationChunk: a GenerationChunk with model partial generation """ try: import sseclient except ImportError: raise ImportError( "could not import sseclient library" "Please install it with `pip install sseclient-py`." ) # process response payload for openai compatible API if "openai" in self.sambastudio_url: client = sseclient.SSEClient(response) for event in client.events(): if event.event == "error_event": raise RuntimeError( f"Sambanova /complete call failed with status code " f"{response.status_code}." f"{event.data}." ) try: # check if the response is not a final event ("[DONE]") if event.data != "[DONE]": if isinstance(event.data, str): data = json.loads(event.data) else: raise RuntimeError( f"Sambanova /complete call failed with status code " f"{response.status_code}." f"{event.data}." ) if data.get("error"): raise RuntimeError( f"Sambanova /complete call failed with status code " f"{response.status_code}." f"{event.data}." ) if len(data["choices"]) > 0: content = data["choices"][0]["delta"]["content"] else: content = "" generated_chunk = GenerationChunk(text=content) yield generated_chunk except Exception as e: raise RuntimeError( f"Error getting content chunk raw streamed response: {e}" f"data: {event.data}" ) # process response payload for generic v2 API elif "api/v2/predict/generic" in self.sambastudio_url: for line in response.iter_lines(): try: data = json.loads(line) content = data["result"]["items"][0]["value"]["stream_token"] generated_chunk = GenerationChunk(text=content) yield generated_chunk except Exception as e: raise RuntimeError( f"Error getting content chunk raw streamed response: {e}" f"line: {line}" ) # process response payload for generic v1 API elif "api/predict/generic" in self.sambastudio_url: for line in response.iter_lines(): try: data = json.loads(line) content = data["result"]["responses"][0]["stream_token"] generated_chunk = GenerationChunk(text=content) yield generated_chunk except Exception as e: raise RuntimeError( f"Error getting content chunk raw streamed response: {e}" f"line: {line}" ) else: raise ValueError( f"Unsupported URL{self.sambastudio_url}" "only openai, generic v1 and generic v2 APIs are supported" ) def _stream( self, prompt: Union[List[str], str], stop: Optional[List[str]] = None, run_manager: Optional[CallbackManagerForLLMRun] = None, **kwargs: Any, ) -> Iterator[GenerationChunk]: """Call out to Sambanova's complete endpoint. Args: prompt: The prompt to pass into the model. stop: a list of strings on which the model should stop generating. run_manager: A run manager with callbacks for the LLM. Yields: chunk: GenerationChunk with model partial generation """ response = self._handle_request(prompt, stop, streaming=True) for chunk in self._process_stream_response(response): if run_manager: run_manager.on_llm_new_token(chunk.text) yield chunk def _call( self, prompt: Union[List[str], str], stop: Optional[List[str]] = None, run_manager: Optional[CallbackManagerForLLMRun] = None, **kwargs: Any, ) -> str: """Call out to Sambanova's complete endpoint. Args: prompt: The prompt to pass into the model. stop: a list of strings on which the model should stop generating. Returns: result: string with model generation """ if self.streaming: completion = "" for chunk in self._stream( prompt=prompt, stop=stop, run_manager=run_manager, **kwargs ): completion += chunk.text return completion response = self._handle_request(prompt, stop, streaming=False) completion = self._process_response(response) return completion
[docs] class SambaNovaCloud(LLM): """ SambaNova Cloud large language models. Setup: To use, you should have the environment variables: ``SAMBANOVA_URL`` set with SambaNova Cloud URL. defaults to http://cloud.sambanova.ai/ ``SAMBANOVA_API_KEY`` set with your SambaNova Cloud API Key. Example: .. code-block:: python from langchain_community.llms.sambanova import SambaNovaCloud SambaNovaCloud( sambanova_api_key="your-SambaNovaCloud-API-key, model = model name, max_tokens = max number of tokens to generate, temperature = model temperature, top_p = model top p, top_k = model top k ) Key init args — completion params: model: str The name of the model to use, e.g., Meta-Llama-3-70B-Instruct-4096 (set for CoE endpoints). streaming: bool Whether to use streaming handler when using non streaming methods max_tokens: int max tokens to generate temperature: float model temperature top_p: float model top p top_k: int model top k Key init args — client params: sambanova_url: str SambaNovaCloud Url defaults to http://cloud.sambanova.ai/ sambanova_api_key: str SambaNovaCloud api key Instantiate: .. code-block:: python from langchain_community.llms.sambanova import SambaNovaCloud SambaNovaCloud( sambanova_api_key="your-SambaNovaCloud-API-key, model = model name, max_tokens = max number of tokens to generate, temperature = model temperature, top_p = model top p, top_k = model top k ) Invoke: .. code-block:: python prompt = "tell me a joke" response = llm.invoke(prompt) Stream: .. code-block:: python for chunk in llm.stream(prompt): print(chunk, end="", flush=True) Async: .. code-block:: python response = llm.ainvoke(prompt) await response """ sambanova_url: str = Field(default="") """SambaNova Cloud Url""" sambanova_api_key: SecretStr = Field(default="") """SambaNova Cloud api key""" model: str = Field(default="Meta-Llama-3.1-8B-Instruct") """The name of the model""" streaming: bool = Field(default=False) """Whether to use streaming handler when using non streaming methods""" max_tokens: int = Field(default=1024) """max tokens to generate""" temperature: float = Field(default=0.7) """model temperature""" top_p: Optional[float] = Field(default=None) """model top p""" top_k: Optional[int] = Field(default=None) """model top k""" stream_options: dict = Field(default={"include_usage": True}) """stream options, include usage to get generation metrics"""
[docs] class Config: populate_by_name = True
@classmethod def is_lc_serializable(cls) -> bool: """Return whether this model can be serialized by Langchain.""" return False @property def lc_secrets(self) -> Dict[str, str]: return {"sambanova_api_key": "sambanova_api_key"} @property def _identifying_params(self) -> Dict[str, Any]: """Return a dictionary of identifying parameters. This information is used by the LangChain callback system, which is used for tracing purposes make it possible to monitor LLMs. """ return { "model": self.model, "streaming": self.streaming, "max_tokens": self.max_tokens, "temperature": self.temperature, "top_p": self.top_p, "top_k": self.top_k, "stream_options": self.stream_options, } @property def _llm_type(self) -> str: """Get the type of language model used by this chat model.""" return "sambanovacloud-llm" def __init__(self, **kwargs: Any) -> None: """init and validate environment variables""" kwargs["sambanova_url"] = get_from_dict_or_env( kwargs, "sambanova_url", "SAMBANOVA_URL", default="https://api.sambanova.ai/v1/chat/completions", ) kwargs["sambanova_api_key"] = convert_to_secret_str( get_from_dict_or_env(kwargs, "sambanova_api_key", "SAMBANOVA_API_KEY") ) super().__init__(**kwargs) def _handle_request( self, prompt: Union[List[str], str], stop: Optional[List[str]] = None, streaming: Optional[bool] = False, ) -> Response: """ Performs a post request to the LLM API. Args: prompt: The prompt to pass into the model. stop: list of stop tokens Returns: A request Response object """ if isinstance(prompt, str): prompt = [prompt] messages_dict = [{"role": "user", "content": prompt[0]}] data = { "messages": messages_dict, "stream": streaming, "max_tokens": self.max_tokens, "stop": stop, "model": self.model, "temperature": self.temperature, "top_p": self.top_p, "top_k": self.top_k, } data = {key: value for key, value in data.items() if value is not None} headers = { "Authorization": f"Bearer " f"{self.sambanova_api_key.get_secret_value()}", "Content-Type": "application/json", } http_session = requests.Session() if streaming: response = http_session.post( self.sambanova_url, headers=headers, json=data, stream=True ) else: response = http_session.post( self.sambanova_url, headers=headers, json=data, stream=False ) if response.status_code != 200: raise RuntimeError( f"Sambanova / complete call failed with status code " f"{response.status_code}." f"{response.text}." ) return response def _process_response(self, response: Response) -> str: """ Process a non streaming response from the api Args: response: A request Response object Returns completion: a string with model generation """ # Extract json payload form response try: response_dict = response.json() except Exception as e: raise RuntimeError( f"Sambanova /complete call failed couldn't get JSON response {e}" f"response: {response.text}" ) completion = response_dict["choices"][0]["message"]["content"] return completion def _process_stream_response(self, response: Response) -> Iterator[GenerationChunk]: """ Process a streaming response from the api Args: response: An iterable request Response object Yields: GenerationChunk: a GenerationChunk with model partial generation """ try: import sseclient except ImportError: raise ImportError( "could not import sseclient library" "Please install it with `pip install sseclient-py`." ) client = sseclient.SSEClient(response) for event in client.events(): if event.event == "error_event": raise RuntimeError( f"Sambanova /complete call failed with status code " f"{response.status_code}." f"{event.data}." ) try: # check if the response is not a final event ("[DONE]") if event.data != "[DONE]": if isinstance(event.data, str): data = json.loads(event.data) else: raise RuntimeError( f"Sambanova /complete call failed with status code " f"{response.status_code}." f"{event.data}." ) if data.get("error"): raise RuntimeError( f"Sambanova /complete call failed with status code " f"{response.status_code}." f"{event.data}." ) if len(data["choices"]) > 0: content = data["choices"][0]["delta"]["content"] else: content = "" generated_chunk = GenerationChunk(text=content) yield generated_chunk except Exception as e: raise RuntimeError( f"Error getting content chunk raw streamed response: {e}" f"data: {event.data}" ) def _call( self, prompt: Union[List[str], str], stop: Optional[List[str]] = None, run_manager: Optional[CallbackManagerForLLMRun] = None, **kwargs: Any, ) -> str: """Call out to SambaNovaCloud complete endpoint. Args: prompt: The prompt to pass into the model. stop: Optional list of stop words to use when generating. Returns: The string generated by the model. """ if self.streaming: completion = "" for chunk in self._stream( prompt=prompt, stop=stop, run_manager=run_manager, **kwargs ): completion += chunk.text return completion response = self._handle_request(prompt, stop, streaming=False) completion = self._process_response(response) return completion def _stream( self, prompt: Union[List[str], str], stop: Optional[List[str]] = None, run_manager: Optional[CallbackManagerForLLMRun] = None, **kwargs: Any, ) -> Iterator[GenerationChunk]: """Call out to SambaNovaCloud complete endpoint. Args: prompt: The prompt to pass into the model. stop: Optional list of stop words to use when generating. Returns: The string generated by the model. """ response = self._handle_request(prompt, stop, streaming=True) for chunk in self._process_stream_response(response): if run_manager: run_manager.on_llm_new_token(chunk.text) yield chunk