from__future__importannotationsimportasyncioimportfunctoolsimportloggingfromtypingimport(Any,AsyncIterable,AsyncIterator,Callable,Dict,Iterable,Iterator,List,Mapping,Optional,Tuple,TypeVar,)fromlangchain_core.callbacksimport(AsyncCallbackManagerForLLMRun,CallbackManagerForLLMRun,)fromlangchain_core.language_models.llmsimportBaseLLMfromlangchain_core.outputsimportGeneration,GenerationChunk,LLMResultfromlangchain_core.utilsimportget_from_dict_or_env,pre_initfrompydanticimportFieldfromrequests.exceptionsimportHTTPErrorfromtenacityimport(before_sleep_log,retry,retry_if_exception_type,stop_after_attempt,wait_exponential,)logger=logging.getLogger(__name__)T=TypeVar("T")def_create_retry_decorator(llm:Tongyi)->Callable[[Any],Any]:min_seconds=1max_seconds=4# Wait 2^x * 1 second between each retry starting with# 4 seconds, then up to 10 seconds, then 10 seconds afterwardreturnretry(reraise=True,stop=stop_after_attempt(llm.max_retries),wait=wait_exponential(multiplier=1,min=min_seconds,max=max_seconds),retry=(retry_if_exception_type(HTTPError)),before_sleep=before_sleep_log(logger,logging.WARNING),)
[docs]defcheck_response(resp:Any)->Any:"""Check the response from the completion call."""ifresp["status_code"]==200:returnrespelifresp["status_code"]in[400,401]:raiseValueError(f"request_id: {resp['request_id']}\n "f"status_code: {resp['status_code']}\n "f"code: {resp['code']}\n message: {resp['message']}")else:raiseHTTPError(f"HTTP error occurred: status_code: {resp['status_code']}\n "f"code: {resp['code']}\n message: {resp['message']}",response=resp,)
[docs]defgenerate_with_retry(llm:Tongyi,**kwargs:Any)->Any:"""Use tenacity to retry the completion call."""retry_decorator=_create_retry_decorator(llm)@retry_decoratordef_generate_with_retry(**_kwargs:Any)->Any:resp=llm.client.call(**_kwargs)returncheck_response(resp)return_generate_with_retry(**kwargs)
[docs]defstream_generate_with_retry(llm:Tongyi,**kwargs:Any)->Any:"""Use tenacity to retry the completion call."""retry_decorator=_create_retry_decorator(llm)@retry_decoratordef_stream_generate_with_retry(**_kwargs:Any)->Any:responses=llm.client.call(**_kwargs)forrespinresponses:yieldcheck_response(resp)return_stream_generate_with_retry(**kwargs)
[docs]asyncdefastream_generate_with_retry(llm:Tongyi,**kwargs:Any)->Any:"""Async version of `stream_generate_with_retry`. Because the dashscope SDK doesn't provide an async API, we wrap `stream_generate_with_retry` with an async generator."""class_AioTongyiGenerator:def__init__(self,_llm:Tongyi,**_kwargs:Any):self.generator=stream_generate_with_retry(_llm,**_kwargs)def__aiter__(self)->AsyncIterator[Any]:returnselfasyncdef__anext__(self)->Any:value=awaitasyncio.get_running_loop().run_in_executor(None,self._safe_next)ifvalueisnotNone:returnvalueelse:raiseStopAsyncIterationdef_safe_next(self)->Any:try:returnnext(self.generator)exceptStopIteration:returnNoneasyncforchunkin_AioTongyiGenerator(llm,**kwargs):yieldchunk
[docs]defgenerate_with_last_element_mark(iterable:Iterable[T])->Iterator[Tuple[T,bool]]:"""Generate elements from an iterable, and a boolean indicating if it is the last element."""iterator=iter(iterable)try:item=next(iterator)exceptStopIteration:returnfornext_iteminiterator:yielditem,Falseitem=next_itemyielditem,True
[docs]asyncdefagenerate_with_last_element_mark(iterable:AsyncIterable[T],)->AsyncIterator[Tuple[T,bool]]:"""Generate elements from an async iterable, and a boolean indicating if it is the last element."""iterator=iterable.__aiter__()try:item=awaititerator.__anext__()exceptStopAsyncIteration:returnasyncfornext_iteminiterator:yielditem,Falseitem=next_itemyielditem,True
[docs]classTongyi(BaseLLM):"""Tongyi completion model integration. Setup: Install ``dashscope`` and set environment variables ``DASHSCOPE_API_KEY``. .. code-block:: bash pip install dashscope export DASHSCOPE_API_KEY="your-api-key" Key init args — completion params: model: str Name of Tongyi model to use. top_p: float Total probability mass of tokens to consider at each step. streaming: bool Whether to stream the results or not. Key init args — client params: api_key: Optional[str] Dashscope API KEY. If not passed in will be read from env var DASHSCOPE_API_KEY. max_retries: int Maximum number of retries to make when generating. See full list of supported init args and their descriptions in the params section. Instantiate: .. code-block:: python from langchain_community.llms import Tongyi llm = Tongyi( model="qwen-max", # top_p="...", # api_key="...", # other params... ) Invoke: .. code-block:: python input_text = "用50个字左右阐述,生命的意义在于" llm.invoke(input_text) .. code-block:: python '探索、成长、连接与爱——在有限的时间里,不断学习、体验、贡献并寻找与世界和谐共存之道,让每一刻充满价值与意义。' Stream: .. code-block:: python for chunk in llm.stream(input_text): print(chunk) .. code-block:: python 探索 | 、 | 成长 | 、连接与爱。 | 在有限的时间里,寻找个人价值, | 贡献于他人,共同体验世界的美好 | ,让世界因自己的存在而更 | 温暖。 Async: .. code-block:: python await llm.ainvoke(input_text) # stream: # async for chunk in llm.astream(input_text): # print(chunk) # batch: # await llm.abatch([input_text]) .. code-block:: python '探索、成长、连接与爱。在有限的时间里,寻找个人价值,贡献于他人和社会,体验丰富多彩的情感与经历,不断学习进步,让世界因自己的存在而更美好。' """# noqa: E501@propertydeflc_secrets(self)->Dict[str,str]:return{"dashscope_api_key":"DASHSCOPE_API_KEY"}client:Any=None#: :meta private:model_name:str=Field(default="qwen-plus",alias="model")"""Model name to use."""model_kwargs:Dict[str,Any]=Field(default_factory=dict)top_p:float=0.8"""Total probability mass of tokens to consider at each step."""dashscope_api_key:Optional[str]=Field(default=None,alias="api_key")"""Dashscope api key provide by Alibaba Cloud."""streaming:bool=False"""Whether to stream the results or not."""max_retries:int=10"""Maximum number of retries to make when generating."""@propertydef_llm_type(self)->str:"""Return type of llm."""return"tongyi"
[docs]@pre_initdefvalidate_environment(cls,values:Dict)->Dict:"""Validate that api key and python package exists in environment."""values["dashscope_api_key"]=get_from_dict_or_env(values,["dashscope_api_key","api_key"],"DASHSCOPE_API_KEY")try:importdashscopeexceptImportError:raiseImportError("Could not import dashscope python package. ""Please install it with `pip install dashscope`.")try:values["client"]=dashscope.GenerationexceptAttributeError:raiseValueError("`dashscope` has no `Generation` attribute, this is likely ""due to an old version of the dashscope package. Try upgrading it ""with `pip install --upgrade dashscope`.")returnvalues
@propertydef_default_params(self)->Dict[str,Any]:"""Get the default parameters for calling Tongyi Qwen API."""normal_params={"model":self.model_name,"top_p":self.top_p,"api_key":self.dashscope_api_key,}return{**normal_params,**self.model_kwargs}@propertydef_identifying_params(self)->Mapping[str,Any]:return{"model_name":self.model_name,**super()._identifying_params}def_generate(self,prompts:List[str],stop:Optional[List[str]]=None,run_manager:Optional[CallbackManagerForLLMRun]=None,**kwargs:Any,)->LLMResult:generations=[]ifself.streaming:iflen(prompts)>1:raiseValueError("Cannot stream results with multiple prompts.")generation:Optional[GenerationChunk]=Noneforchunkinself._stream(prompts[0],stop,run_manager,**kwargs):ifgenerationisNone:generation=chunkelse:generation+=chunkassertgenerationisnotNonegenerations.append([self._chunk_to_generation(generation)])else:params:Dict[str,Any]=self._invocation_params(stop=stop,**kwargs)forpromptinprompts:completion=generate_with_retry(self,prompt=prompt,**params)generations.append([Generation(**self._generation_from_qwen_resp(completion))])returnLLMResult(generations=generations,llm_output={"model_name":self.model_name,},)asyncdef_agenerate(self,prompts:List[str],stop:Optional[List[str]]=None,run_manager:Optional[AsyncCallbackManagerForLLMRun]=None,**kwargs:Any,)->LLMResult:generations=[]ifself.streaming:iflen(prompts)>1:raiseValueError("Cannot stream results with multiple prompts.")generation:Optional[GenerationChunk]=Noneasyncforchunkinself._astream(prompts[0],stop,run_manager,**kwargs):ifgenerationisNone:generation=chunkelse:generation+=chunkassertgenerationisnotNonegenerations.append([self._chunk_to_generation(generation)])else:params:Dict[str,Any]=self._invocation_params(stop=stop,**kwargs)forpromptinprompts:completion=awaitasyncio.get_running_loop().run_in_executor(None,functools.partial(generate_with_retry,**{"llm":self,"prompt":prompt,**params}),)generations.append([Generation(**self._generation_from_qwen_resp(completion))])returnLLMResult(generations=generations,llm_output={"model_name":self.model_name,},)def_stream(self,prompt:str,stop:Optional[List[str]]=None,run_manager:Optional[CallbackManagerForLLMRun]=None,**kwargs:Any,)->Iterator[GenerationChunk]:params:Dict[str,Any]=self._invocation_params(stop=stop,stream=True,**kwargs)forstream_resp,is_last_chunkingenerate_with_last_element_mark(stream_generate_with_retry(self,prompt=prompt,**params)):chunk=GenerationChunk(**self._generation_from_qwen_resp(stream_resp,is_last_chunk))ifrun_manager:run_manager.on_llm_new_token(chunk.text,chunk=chunk,verbose=self.verbose,)yieldchunkasyncdef_astream(self,prompt:str,stop:Optional[List[str]]=None,run_manager:Optional[AsyncCallbackManagerForLLMRun]=None,**kwargs:Any,)->AsyncIterator[GenerationChunk]:params:Dict[str,Any]=self._invocation_params(stop=stop,stream=True,**kwargs)asyncforstream_resp,is_last_chunkinagenerate_with_last_element_mark(astream_generate_with_retry(self,prompt=prompt,**params)):chunk=GenerationChunk(**self._generation_from_qwen_resp(stream_resp,is_last_chunk))ifrun_manager:awaitrun_manager.on_llm_new_token(chunk.text,chunk=chunk,verbose=self.verbose,)yieldchunkdef_invocation_params(self,stop:Any,**kwargs:Any)->Dict[str,Any]:params={**self._default_params,**kwargs,}ifstopisnotNone:params["stop"]=stopifparams.get("stream"):params["incremental_output"]=Truereturnparams@staticmethoddef_generation_from_qwen_resp(resp:Any,is_last_chunk:bool=True)->Dict[str,Any]:# According to the response from dashscope,# each chunk's `generation_info` overwrites the previous one.# Besides, The `merge_dicts` method,# which is used to concatenate `generation_info` in `GenerationChunk`,# does not support merging of int type values.# Therefore, we adopt the `generation_info` of the last chunk# and discard the `generation_info` of the intermediate chunks.ifis_last_chunk:returndict(text=resp["output"]["text"],generation_info=dict(finish_reason=resp["output"]["finish_reason"],request_id=resp["request_id"],token_usage=dict(resp["usage"]),),)else:returndict(text=resp["output"]["text"])@staticmethoddef_chunk_to_generation(chunk:GenerationChunk)->Generation:returnGeneration(text=chunk.text,generation_info=chunk.generation_info,)