[docs]classChatYuan2(BaseChatModel):"""`Yuan2.0` Chat models API. To use, you should have the ``openai-python`` package installed, if package not installed, using ```pip install openai``` to install it. The environment variable ``YUAN2_API_KEY`` set to your API key, if not set, everyone can access apis. Any parameters that are valid to be passed to the openai.create call can be passed in, even if not explicitly saved on this class. Example: .. code-block:: python from langchain_community.chat_models import ChatYuan2 chat = ChatYuan2() """client:Any=None#: :meta private:async_client:Any=Field(default=None,exclude=True)#: :meta private:model_name:str=Field(default="yuan2",alias="model")"""Model name to use."""model_kwargs:Dict[str,Any]=Field(default_factory=dict)"""Holds any model parameters valid for `create` call not explicitly specified."""yuan2_api_key:Optional[str]=Field(default="EMPTY",alias="api_key")"""Automatically inferred from env var `YUAN2_API_KEY` if not provided."""yuan2_api_base:Optional[str]=Field(default="http://127.0.0.1:8000/v1",alias="base_url")"""Base URL path for API requests, an OpenAI compatible API server."""request_timeout:Optional[Union[float,Tuple[float,float]]]=Field(default=None,alias="timeout")"""Timeout for requests to yuan2 completion API. Default is 600 seconds."""max_retries:int=6"""Maximum number of retries to make when generating."""streaming:bool=False"""Whether to stream the results or not."""max_tokens:Optional[int]=None"""Maximum number of tokens to generate."""temperature:float=1.0"""What sampling temperature to use."""top_p:Optional[float]=0.9"""The top-p value to use for sampling."""stop:Optional[List[str]]=Field(default=["<eod>"],alias="stop_sequences")"""A list of strings to stop generation when encountered."""repeat_last_n:Optional[int]=64"Last n tokens to penalize"repeat_penalty:Optional[float]=1.18"""The penalty to apply to repeated tokens."""model_config=ConfigDict(populate_by_name=True,)@propertydeflc_secrets(self)->Dict[str,str]:return{"yuan2_api_key":"YUAN2_API_KEY"}@propertydeflc_attributes(self)->Dict[str,Any]:attributes:Dict[str,Any]={}ifself.yuan2_api_base:attributes["yuan2_api_base"]=self.yuan2_api_baseifself.yuan2_api_key:attributes["yuan2_api_key"]=self.yuan2_api_keyreturnattributes@model_validator(mode="before")@classmethoddefbuild_extra(cls,values:Dict[str,Any])->Any:"""Build extra kwargs from additional params that were passed in."""all_required_field_names=get_pydantic_field_names(cls)extra=values.get("model_kwargs",{})forfield_nameinlist(values):iffield_nameinextra:raiseValueError(f"Found {field_name} supplied twice.")iffield_namenotinall_required_field_names:logger.warning(f"""WARNING! {field_name} is not default parameter.{field_name} was transferred to model_kwargs. Please confirm that {field_name} is what you intended.""")extra[field_name]=values.pop(field_name)invalid_model_kwargs=all_required_field_names.intersection(extra.keys())ifinvalid_model_kwargs:raiseValueError(f"Parameters {invalid_model_kwargs} should be specified explicitly. "f"Instead they were passed in as part of `model_kwargs` parameter.")values["model_kwargs"]=extrareturnvalues
[docs]@pre_initdefvalidate_environment(cls,values:Dict)->Dict:"""Validate that api key and python package exists in environment."""values["yuan2_api_key"]=get_from_dict_or_env(values,"yuan2_api_key","YUAN2_API_KEY")try:importopenaiexceptImportError:raiseImportError("Could not import openai python package. ""Please install it with `pip install openai`.")client_params={"api_key":values["yuan2_api_key"],"base_url":values["yuan2_api_base"],"timeout":values["request_timeout"],"max_retries":values["max_retries"],}# generate client and async_clientifnotvalues.get("client"):values["client"]=openai.OpenAI(**client_params).chat.completionsifnotvalues.get("async_client"):values["async_client"]=openai.AsyncOpenAI(**client_params).chat.completionsreturnvalues
@propertydef_default_params(self)->Dict[str,Any]:"""Get the default parameters for calling yuan2 API."""params={"model":self.model_name,"stream":self.streaming,"temperature":self.temperature,"top_p":self.top_p,**self.model_kwargs,}ifself.max_tokensisnotNone:params["max_tokens"]=self.max_tokensifself.request_timeoutisnotNone:params["request_timeout"]=self.request_timeoutreturnparams
[docs]defcompletion_with_retry(self,**kwargs:Any)->Any:"""Use tenacity to retry the completion call."""retry_decorator=_create_retry_decorator(self)@retry_decoratordef_completion_with_retry(**kwargs:Any)->Any:returnself.client.create(**kwargs)return_completion_with_retry(**kwargs)
def_combine_llm_outputs(self,llm_outputs:List[Optional[dict]])->dict:overall_token_usage:dict={}logger.debug(f"type(llm_outputs): {type(llm_outputs)}; llm_outputs: {llm_outputs}")foroutputinllm_outputs:ifoutputisNone:# Happens in streamingcontinuetoken_usage=output["token_usage"]fork,vintoken_usage.items():ifkinoverall_token_usage:overall_token_usage[k]+=velse:overall_token_usage[k]=vreturn{"token_usage":overall_token_usage,"model_name":self.model_name}def_stream(self,messages:List[BaseMessage],stop:Optional[List[str]]=None,run_manager:Optional[CallbackManagerForLLMRun]=None,**kwargs:Any,)->Iterator[ChatGenerationChunk]:message_dicts,params=self._create_message_dicts(messages,stop)params={**params,**kwargs,"stream":True}default_chunk_class=AIMessageChunkforchunkinself.completion_with_retry(messages=message_dicts,**params):ifnotisinstance(chunk,dict):chunk=chunk.model_dump()iflen(chunk["choices"])==0:continuechoice=chunk["choices"][0]chunk=_convert_delta_to_message_chunk(choice["delta"],default_chunk_class)finish_reason=choice.get("finish_reason")generation_info=(dict(finish_reason=finish_reason)iffinish_reasonisnotNoneelseNone)default_chunk_class=chunk.__class__cg_chunk=ChatGenerationChunk(message=chunk,generation_info=generation_info,)ifrun_manager:run_manager.on_llm_new_token(chunk.content,chunk=cg_chunk)yieldcg_chunkdef_generate(self,messages:List[BaseMessage],stop:Optional[List[str]]=None,run_manager:Optional[CallbackManagerForLLMRun]=None,**kwargs:Any,)->ChatResult:ifself.streaming:stream_iter=self._stream(messages=messages,stop=stop,run_manager=run_manager,**kwargs)returngenerate_from_stream(stream_iter)message_dicts,params=self._create_message_dicts(messages,stop)params={**params,**kwargs}response=self.completion_with_retry(messages=message_dicts,**params)returnself._create_chat_result(response)def_create_message_dicts(self,messages:List[BaseMessage],stop:Optional[List[str]])->Tuple[List[Dict[str,Any]],Dict[str,Any]]:params=dict(self._invocation_params)ifstopisnotNone:if"stop"inparams:raiseValueError("`stop` found in both the input and default params.")params["stop"]=stopmessage_dicts=[_convert_message_to_dict(m)forminmessages]returnmessage_dicts,paramsdef_create_chat_result(self,response:Union[dict,BaseModel])->ChatResult:generations=[]logger.debug(f"type(response): {type(response)}; response: {response}")ifnotisinstance(response,dict):response=response.dict()forresinresponse["choices"]:message=_convert_dict_to_message(res["message"])generation_info=dict(finish_reason=res["finish_reason"])if"logprobs"inres:generation_info["logprobs"]=res["logprobs"]gen=ChatGeneration(message=message,generation_info=generation_info,)generations.append(gen)llm_output={"token_usage":response.get("usage",{}),"model_name":self.model_name,}returnChatResult(generations=generations,llm_output=llm_output)asyncdef_astream(self,messages:List[BaseMessage],stop:Optional[List[str]]=None,run_manager:Optional[AsyncCallbackManagerForLLMRun]=None,**kwargs:Any,)->AsyncIterator[ChatGenerationChunk]:message_dicts,params=self._create_message_dicts(messages,stop)params={**params,**kwargs,"stream":True}default_chunk_class=AIMessageChunkasyncforchunkinawaitacompletion_with_retry(self,messages=message_dicts,**params):ifnotisinstance(chunk,dict):chunk=chunk.model_dump()iflen(chunk["choices"])==0:continuechoice=chunk["choices"][0]chunk=_convert_delta_to_message_chunk(choice["delta"],default_chunk_class)finish_reason=choice.get("finish_reason")generation_info=(dict(finish_reason=finish_reason)iffinish_reasonisnotNoneelseNone)default_chunk_class=chunk.__class__cg_chunk=ChatGenerationChunk(message=chunk,generation_info=generation_info,)ifrun_manager:awaitrun_manager.on_llm_new_token(chunk.content,chunk=cg_chunk)yieldcg_chunkasyncdef_agenerate(self,messages:List[BaseMessage],stop:Optional[List[str]]=None,run_manager:Optional[AsyncCallbackManagerForLLMRun]=None,**kwargs:Any,)->ChatResult:ifself.streaming:stream_iter=self._astream(messages=messages,stop=stop,run_manager=run_manager,**kwargs)returnawaitagenerate_from_stream(stream_iter)message_dicts,params=self._create_message_dicts(messages,stop)params={**params,**kwargs}response=awaitacompletion_with_retry(self,messages=message_dicts,**params)returnself._create_chat_result(response)@propertydef_invocation_params(self)->Mapping[str,Any]:"""Get the parameters used to invoke the model."""yuan2_creds:Dict[str,Any]={"model":self.model_name,}return{**yuan2_creds,**self._default_params}@propertydef_llm_type(self)->str:"""Return type of chat model."""return"chat-yuan2"
def_create_retry_decorator(llm:ChatYuan2)->Callable[[Any],Any]:importopenaimin_seconds=1max_seconds=60# Wait 2^x * 1 second between each retry starting with# 4 seconds, then up to 10 seconds, then 10 seconds afterwardsreturnretry(reraise=True,stop=stop_after_attempt(llm.max_retries),wait=wait_exponential(multiplier=1,min=min_seconds,max=max_seconds),retry=(retry_if_exception_type(openai.APITimeoutError)|retry_if_exception_type(openai.APIError)|retry_if_exception_type(openai.APIConnectionError)|retry_if_exception_type(openai.RateLimitError)|retry_if_exception_type(openai.InternalServerError)),before_sleep=before_sleep_log(logger,logging.WARNING),)
[docs]asyncdefacompletion_with_retry(llm:ChatYuan2,**kwargs:Any)->Any:"""Use tenacity to retry the async completion call."""retry_decorator=_create_retry_decorator(llm)@retry_decoratorasyncdef_completion_with_retry(**kwargs:Any)->Any:# Use OpenAI's async api https://github.com/openai/openai-python#async-apireturnawaitllm.async_client.create(**kwargs)returnawait_completion_with_retry(**kwargs)
def_convert_delta_to_message_chunk(_dict:Mapping[str,Any],default_class:Type[BaseMessageChunk])->BaseMessageChunk:role=_dict.get("role")content=_dict.get("content")or""ifrole=="user"ordefault_class==HumanMessageChunk:returnHumanMessageChunk(content=content)elifrole=="assistant"ordefault_class==AIMessageChunk:returnAIMessageChunk(content=content)elifrole=="system"ordefault_class==SystemMessageChunk:returnSystemMessageChunk(content=content)elifroleordefault_class==ChatMessageChunk:returnChatMessageChunk(content=content,role=role)# type: ignore[arg-type]else:returndefault_class(content=content)# type: ignore[call-arg]def_convert_dict_to_message(_dict:Mapping[str,Any])->BaseMessage:role=_dict.get("role")ifrole=="user":returnHumanMessage(content=_dict.get("content",""))elifrole=="assistant":returnAIMessage(content=_dict.get("content",""))elifrole=="system":returnSystemMessage(content=_dict.get("content",""))else:returnChatMessage(content=_dict.get("content",""),role=role)# type: ignore[arg-type]def_convert_message_to_dict(message:BaseMessage)->dict:"""Convert a LangChain message to a dictionary. Args: message: The LangChain message. Returns: The dictionary. """message_dict:Dict[str,Any]ifisinstance(message,ChatMessage):message_dict={"role":message.role,"content":message.content}elifisinstance(message,HumanMessage):message_dict={"role":"user","content":message.content}elifisinstance(message,AIMessage):message_dict={"role":"assistant","content":message.content}elifisinstance(message,SystemMessage):message_dict={"role":"system","content":message.content}elifisinstance(message,FunctionMessage):message_dict={"role":"function","name":message.name,"content":message.content,}else:raiseValueError(f"Got unknown type {message}")if"name"inmessage.additional_kwargs:message_dict["name"]=message.additional_kwargs["name"]returnmessage_dict