"""Wrapper around Anyscale Endpoint"""fromtypingimport(Any,Dict,List,Mapping,Optional,Set,)fromlangchain_core.callbacksimport(AsyncCallbackManagerForLLMRun,CallbackManagerForLLMRun,)fromlangchain_core.outputsimportGeneration,GenerationChunk,LLMResultfromlangchain_core.pydantic_v1importField,SecretStrfromlangchain_core.utilsimportconvert_to_secret_str,get_from_dict_or_env,pre_initfromlangchain_community.llms.openaiimport(BaseOpenAI,acompletion_with_retry,completion_with_retry,)fromlangchain_community.utils.openaiimportis_openai_v1DEFAULT_BASE_URL="https://api.endpoints.anyscale.com/v1"DEFAULT_MODEL="mistralai/Mixtral-8x7B-Instruct-v0.1"
[docs]defcreate_llm_result(choices:Any,prompts:List[str],token_usage:Dict[str,int],model_name:str)->LLMResult:"""Create the LLMResult from the choices and prompts."""generations=[]fori,_inenumerate(prompts):choice=choices[i]generations.append([Generation(text=choice["message"]["content"],generation_info=dict(finish_reason=choice.get("finish_reason"),logprobs=choice.get("logprobs"),),)])llm_output={"token_usage":token_usage,"model_name":model_name}returnLLMResult(generations=generations,llm_output=llm_output)
[docs]classAnyscale(BaseOpenAI):"""Anyscale large language models. To use, you should have the environment variable ``ANYSCALE_API_KEY``set with your Anyscale Endpoint, or pass it as a named parameter to the constructor. To use with Anyscale Private Endpoint, please also set ``ANYSCALE_BASE_URL``. Example: .. code-block:: python from langchain.llms import Anyscale anyscalellm = Anyscale(anyscale_api_key="ANYSCALE_API_KEY") # To leverage Ray for parallel processing @ray.remote(num_cpus=1) def send_query(llm, text): resp = llm.invoke(text) return resp futures = [send_query.remote(anyscalellm, text) for text in texts] results = ray.get(futures) """"""Key word arguments to pass to the model."""anyscale_api_base:str=Field(default=DEFAULT_BASE_URL)anyscale_api_key:SecretStr=Field(default=None)model_name:str=Field(default=DEFAULT_MODEL)prefix_messages:List=Field(default_factory=list)@classmethoddefis_lc_serializable(cls)->bool:returnFalse@pre_initdefvalidate_environment(cls,values:Dict)->Dict:"""Validate that api key and python package exists in environment."""values["anyscale_api_base"]=get_from_dict_or_env(values,"anyscale_api_base","ANYSCALE_API_BASE",default=DEFAULT_BASE_URL,)values["anyscale_api_key"]=convert_to_secret_str(get_from_dict_or_env(values,"anyscale_api_key","ANYSCALE_API_KEY"))values["model_name"]=get_from_dict_or_env(values,"model_name","MODEL_NAME",default=DEFAULT_MODEL,)try:importopenaiifis_openai_v1():client_params={"api_key":values["anyscale_api_key"].get_secret_value(),"base_url":values["anyscale_api_base"],# To do: future support# "organization": values["openai_organization"],# "timeout": values["request_timeout"],# "max_retries": values["max_retries"],# "default_headers": values["default_headers"],# "default_query": values["default_query"],# "http_client": values["http_client"],}ifnotvalues.get("client"):values["client"]=openai.OpenAI(**client_params).completionsifnotvalues.get("async_client"):values["async_client"]=openai.AsyncOpenAI(**client_params).completionselse:values["openai_api_base"]=values["anyscale_api_base"]values["openai_api_key"]=values["anyscale_api_key"].get_secret_value()values["client"]=openai.CompletionexceptImportError:raiseImportError("Could not import openai python package. ""Please install it with `pip install openai`.")ifvalues["streaming"]andvalues["n"]>1:raiseValueError("Cannot stream results when n > 1.")ifvalues["streaming"]andvalues["best_of"]>1:raiseValueError("Cannot stream results when best_of > 1.")returnvalues@propertydef_identifying_params(self)->Mapping[str,Any]:"""Get the identifying parameters."""return{**{"model_name":self.model_name},**super()._identifying_params,}@propertydef_invocation_params(self)->Dict[str,Any]:"""Get the parameters used to invoke the model."""openai_creds:Dict[str,Any]={"model":self.model_name,}ifnotis_openai_v1():openai_creds.update({"api_key":self.anyscale_api_key.get_secret_value(),"api_base":self.anyscale_api_base,})return{**openai_creds,**super()._invocation_params}@propertydef_llm_type(self)->str:"""Return type of llm."""return"Anyscale LLM"def_generate(self,prompts:List[str],stop:Optional[List[str]]=None,run_manager:Optional[CallbackManagerForLLMRun]=None,**kwargs:Any,)->LLMResult:"""Call out to OpenAI's endpoint with k unique prompts. Args: prompts: The prompts to pass into the model. stop: Optional list of stop words to use when generating. Returns: The full LLM output. Example: .. code-block:: python response = openai.generate(["Tell me a joke."]) """# TODO: write a unit test for thisparams=self._invocation_paramsparams={**params,**kwargs}sub_prompts=self.get_sub_prompts(params,prompts,stop)choices=[]token_usage:Dict[str,int]={}# Get the token usage from the response.# Includes prompt, completion, and total tokens used._keys={"completion_tokens","prompt_tokens","total_tokens"}system_fingerprint:Optional[str]=Nonefor_promptsinsub_prompts:ifself.streaming:iflen(_prompts)>1:raiseValueError("Cannot stream results with multiple prompts.")generation:Optional[GenerationChunk]=Noneforchunkinself._stream(_prompts[0],stop,run_manager,**kwargs):ifgenerationisNone:generation=chunkelse:generation+=chunkassertgenerationisnotNonechoices.append({"text":generation.text,"finish_reason":generation.generation_info.get("finish_reason")ifgeneration.generation_infoelseNone,"logprobs":generation.generation_info.get("logprobs")ifgeneration.generation_infoelseNone,})else:response=completion_with_retry(## THis is the ONLY change from BaseOpenAI()._generate()self,prompt=_prompts[0],run_manager=run_manager,**params,)ifnotisinstance(response,dict):# V1 client returns the response in an PyDantic object instead of# dict. For the transition period, we deep convert it to dict.response=response.dict()choices.extend(response["choices"])update_token_usage(_keys,response,token_usage)ifnotsystem_fingerprint:system_fingerprint=response.get("system_fingerprint")returnself.create_llm_result(choices,prompts,params,token_usage,system_fingerprint=system_fingerprint,)asyncdef_agenerate(self,prompts:List[str],stop:Optional[List[str]]=None,run_manager:Optional[AsyncCallbackManagerForLLMRun]=None,**kwargs:Any,)->LLMResult:"""Call out to OpenAI's endpoint async with k unique prompts."""params=self._invocation_paramsparams={**params,**kwargs}sub_prompts=self.get_sub_prompts(params,prompts,stop)choices=[]token_usage:Dict[str,int]={}# Get the token usage from the response.# Includes prompt, completion, and total tokens used._keys={"completion_tokens","prompt_tokens","total_tokens"}system_fingerprint:Optional[str]=Nonefor_promptsinsub_prompts:ifself.streaming:iflen(_prompts)>1:raiseValueError("Cannot stream results with multiple prompts.")generation:Optional[GenerationChunk]=Noneasyncforchunkinself._astream(_prompts[0],stop,run_manager,**kwargs):ifgenerationisNone:generation=chunkelse:generation+=chunkassertgenerationisnotNonechoices.append({"text":generation.text,"finish_reason":generation.generation_info.get("finish_reason")ifgeneration.generation_infoelseNone,"logprobs":generation.generation_info.get("logprobs")ifgeneration.generation_infoelseNone,})else:response=awaitacompletion_with_retry(## THis is the ONLY change from BaseOpenAI()._agenerate()self,prompt=_prompts[0],run_manager=run_manager,**params,)ifnotisinstance(response,dict):response=response.dict()choices.extend(response["choices"])update_token_usage(_keys,response,token_usage)returnself.create_llm_result(choices,prompts,params,token_usage,system_fingerprint=system_fingerprint,)