[docs]classXinference(LLM):"""`Xinference` large-scale model inference service. To use, you should have the xinference library installed: .. code-block:: bash pip install "xinference[all]" If you're simply using the services provided by Xinference, you can utilize the xinference_client package: .. code-block:: bash pip install xinference_client Check out: https://github.com/xorbitsai/inference To run, you need to start a Xinference supervisor on one server and Xinference workers on the other servers Example: To start a local instance of Xinference, run .. code-block:: bash $ xinference You can also deploy Xinference in a distributed cluster. Here are the steps: Starting the supervisor: .. code-block:: bash $ xinference-supervisor Starting the worker: .. code-block:: bash $ xinference-worker Then, launch a model using command line interface (CLI). Example: .. code-block:: bash $ xinference launch -n orca -s 3 -q q4_0 It will return a model UID. Then, you can use Xinference with LangChain. Example: .. code-block:: python from langchain_community.llms import Xinference llm = Xinference( server_url="http://0.0.0.0:9997", model_uid = {model_uid} # replace model_uid with the model UID return from launching the model ) llm.invoke( prompt="Q: where can we visit in the capital of France? A:", generate_config={"max_tokens": 1024, "stream": True}, ) Example: .. code-block:: python from langchain_community.llms import Xinference from langchain.prompts import PromptTemplate llm = Xinference( server_url="http://0.0.0.0:9997", model_uid={model_uid}, # replace model_uid with the model UID return from launching the model stream=True ) prompt = PromptTemplate( input=['country'], template="Q: where can we visit in the capital of {country}? A:" ) chain = prompt | llm chain.stream(input={'country': 'France'}) To view all the supported builtin models, run: .. code-block:: bash $ xinference list --all """# noqa: E501client:Optional[Any]=Noneserver_url:Optional[str]"""URL of the xinference server"""model_uid:Optional[str]"""UID of the launched model"""model_kwargs:Dict[str,Any]"""Keyword arguments to be passed to xinference.LLM"""def__init__(self,server_url:Optional[str]=None,model_uid:Optional[str]=None,api_key:Optional[str]=None,**model_kwargs:Any,):try:fromxinference.clientimportRESTfulClientexceptImportError:try:fromxinference_clientimportRESTfulClientexceptImportErrorase:raiseImportError("Could not import RESTfulClient from xinference. Please install it"" with `pip install xinference` or `pip install xinference_client`.")fromemodel_kwargs=model_kwargsor{}super().__init__(**{# type: ignore[arg-type]"server_url":server_url,"model_uid":model_uid,"model_kwargs":model_kwargs,})ifself.server_urlisNone:raiseValueError("Please provide server URL")ifself.model_uidisNone:raiseValueError("Please provide the model UID")self._headers:Dict[str,str]={}self._cluster_authed=Falseself._check_cluster_authenticated()ifapi_keyisnotNoneandself._cluster_authed:self._headers["Authorization"]=f"Bearer {api_key}"self.client=RESTfulClient(server_url,api_key)@propertydef_llm_type(self)->str:"""Return type of llm."""return"xinference"@propertydef_identifying_params(self)->Mapping[str,Any]:"""Get the identifying parameters."""return{**{"server_url":self.server_url},**{"model_uid":self.model_uid},**{"model_kwargs":self.model_kwargs},}def_check_cluster_authenticated(self)->None:url=f"{self.server_url}/v1/cluster/auth"response=requests.get(url)ifresponse.status_code==404:self._cluster_authed=Falseelse:ifresponse.status_code!=200:raiseRuntimeError(f"Failed to get cluster information, "f"detail: {response.json()['detail']}")response_data=response.json()self._cluster_authed=bool(response_data["auth"])def_call(self,prompt:str,stop:Optional[List[str]]=None,run_manager:Optional[CallbackManagerForLLMRun]=None,**kwargs:Any,)->str:"""Call the xinference model and return the output. Args: prompt: The prompt to use for generation. stop: Optional list of stop words to use when generating. generate_config: Optional dictionary for the configuration used for generation. Returns: The generated string by the model. """ifself.clientisNone:raiseValueError("Client is not initialized!")model=self.client.get_model(self.model_uid)generate_config:"LlamaCppGenerateConfig"=kwargs.get("generate_config",{})generate_config={**self.model_kwargs,**generate_config}ifstop:generate_config["stop"]=stopifgenerate_configandgenerate_config.get("stream"):combined_text_output=""fortokeninself._stream_generate(model=model,prompt=prompt,run_manager=run_manager,generate_config=generate_config,):combined_text_output+=tokenreturncombined_text_outputelse:completion=model.generate(prompt=prompt,generate_config=generate_config)returncompletion["choices"][0]["text"]def_stream_generate(self,model:Union["RESTfulGenerateModelHandle","RESTfulChatModelHandle"],prompt:str,run_manager:Optional[CallbackManagerForLLMRun]=None,generate_config:Optional["LlamaCppGenerateConfig"]=None,)->Generator[str,None,None]:""" Args: prompt: The prompt to use for generation. model: The model used for generation. stop: Optional list of stop words to use when generating. generate_config: Optional dictionary for the configuration used for generation. Yields: A string token. """streaming_response=model.generate(prompt=prompt,generate_config=generate_config)forchunkinstreaming_response:ifisinstance(chunk,dict):choices=chunk.get("choices",[])ifchoices:choice=choices[0]ifisinstance(choice,dict):token=choice.get("text","")log_probs=choice.get("logprobs")ifrun_manager:run_manager.on_llm_new_token(token=token,verbose=self.verbose,log_probs=log_probs)yieldtokendef_stream(self,prompt:str,stop:Optional[List[str]]=None,run_manager:Optional[CallbackManagerForLLMRun]=None,**kwargs:Any,)->Iterator[GenerationChunk]:generate_config=kwargs.get("generate_config",{})generate_config={**self.model_kwargs,**generate_config}ifstop:generate_config["stop"]=stopforstream_respinself._create_generate_stream(prompt,generate_config):ifstream_resp:chunk=self._stream_response_to_generation_chunk(stream_resp)ifrun_manager:run_manager.on_llm_new_token(chunk.text,verbose=self.verbose,)yieldchunkdef_create_generate_stream(self,prompt:str,generate_config:Optional[Dict[str,List[str]]]=None)->Iterator[str]:ifself.clientisNone:raiseValueError("Client is not initialized!")model=self.client.get_model(self.model_uid)yield frommodel.generate(prompt=prompt,generate_config=generate_config)@staticmethoddef_stream_response_to_generation_chunk(stream_response:str,)->GenerationChunk:"""Convert a stream response to a generation chunk."""token=""ifisinstance(stream_response,dict):choices=stream_response.get("choices",[])ifchoices:choice=choices[0]ifisinstance(choice,dict):token=choice.get("text","")returnGenerationChunk(text=token,generation_info=dict(finish_reason=choice.get("finish_reason",None),logprobs=choice.get("logprobs",None),),)else:raiseTypeError("choice type error!")else:returnGenerationChunk(text=token)else:raiseTypeError("stream_response type error!")asyncdef_astream(self,prompt:str,stop:Optional[List[str]]=None,run_manager:Optional[AsyncCallbackManagerForLLMRun]=None,**kwargs:Any,)->AsyncIterator[GenerationChunk]:generate_config=kwargs.get("generate_config",{})generate_config={**self.model_kwargs,**generate_config}ifstop:generate_config["stop"]=stopasyncforstream_respinself._acreate_generate_stream(prompt,generate_config):ifstream_resp:chunk=self._stream_response_to_generation_chunk(stream_resp)ifrun_manager:awaitrun_manager.on_llm_new_token(chunk.text,verbose=self.verbose,)yieldchunkasyncdef_acreate_generate_stream(self,prompt:str,generate_config:Optional[Dict[str,List[str]]]=None)->AsyncIterator[str]:request_body:Dict[str,Any]={"model":self.model_uid,"prompt":prompt}ifgenerate_configisnotNone:forkey,valueingenerate_config.items():request_body[key]=valuestream=bool(generate_configandgenerate_config.get("stream"))asyncwithaiohttp.ClientSession()assession:asyncwithsession.post(url=f"{self.server_url}/v1/completions",json=request_body,)asresponse:ifresponse.status!=200:ifresponse.status==404:raiseFileNotFoundError("astream call failed with status code 404.")else:optional_detail=response.textraiseValueError(f"astream call failed with status code {response.status}."f" Details: {optional_detail}")asyncforlineinresponse.content:ifnotstream:yieldjson.loads(line)else:json_str=line.decode("utf-8")ifline.startswith(b"data:"):json_str=json_str[len(b"data:"):].strip()ifnotjson_str:continueyieldjson.loads(json_str)