[docs]classHuggingFacePipeline(BaseLLM):"""HuggingFace Pipeline API. To use, you should have the ``transformers`` python package installed. Only supports `text-generation`, `text2text-generation`, `summarization` and `translation` for now. Example using from_model_id: .. code-block:: python from langchain_huggingface import HuggingFacePipeline hf = HuggingFacePipeline.from_model_id( model_id="gpt2", task="text-generation", pipeline_kwargs={"max_new_tokens": 10}, ) Example passing pipeline in directly: .. code-block:: python from langchain_huggingface import HuggingFacePipeline from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline model_id = "gpt2" tokenizer = AutoTokenizer.from_pretrained(model_id) model = AutoModelForCausalLM.from_pretrained(model_id) pipe = pipeline( "text-generation", model=model, tokenizer=tokenizer, max_new_tokens=10 ) hf = HuggingFacePipeline(pipeline=pipe) """pipeline:Any=None#: :meta private:model_id:Optional[str]=None"""The model name. If not set explicitly by the user, it will be inferred from the provided pipeline (if available). If neither is provided, the DEFAULT_MODEL_ID will be used."""model_kwargs:Optional[dict]=None"""Keyword arguments passed to the model."""pipeline_kwargs:Optional[dict]=None"""Keyword arguments passed to the pipeline."""batch_size:int=DEFAULT_BATCH_SIZE"""Batch size to use when passing multiple documents to generate."""model_config=ConfigDict(extra="forbid",)@model_validator(mode="before")@classmethoddefpre_init_validator(cls,values:Dict[str,Any])->Dict[str,Any]:"""Ensure model_id is set either by pipeline or user input."""if"model_id"notinvalues:if"pipeline"invaluesandvalues["pipeline"]:values["model_id"]=values["pipeline"].model.name_or_pathelse:values["model_id"]=DEFAULT_MODEL_IDreturnvalues
[docs]@classmethoddeffrom_model_id(cls,model_id:str,task:str,backend:str="default",device:Optional[int]=None,device_map:Optional[str]=None,model_kwargs:Optional[dict]=None,pipeline_kwargs:Optional[dict]=None,batch_size:int=DEFAULT_BATCH_SIZE,**kwargs:Any,)->HuggingFacePipeline:"""Construct the pipeline object from model_id and task."""try:fromtransformersimport(# type: ignore[import]AutoModelForCausalLM,AutoModelForSeq2SeqLM,AutoTokenizer,)fromtransformersimportpipelineashf_pipeline# type: ignore[import]exceptImportError:raiseValueError("Could not import transformers python package. ""Please install it with `pip install transformers`.")_model_kwargs=model_kwargs.copy()ifmodel_kwargselse{}ifdevice_mapisnotNone:ifdeviceisnotNone:raiseValueError("Both `device` and `device_map` are specified. ""`device` will override `device_map`. ""You will most likely encounter unexpected behavior.""Please remove `device` and keep ""`device_map`.")if"device_map"in_model_kwargs:raiseValueError("`device_map` is already specified in `model_kwargs`.")_model_kwargs["device_map"]=device_maptokenizer=AutoTokenizer.from_pretrained(model_id,**_model_kwargs)ifbackendin{"openvino","ipex"}:iftasknotinVALID_TASKS:raiseValueError(f"Got invalid task {task}, "f"currently only {VALID_TASKS} are supported")err_msg=f'Backend: {backend}{IMPORT_ERROR.format(f"optimum[{backend}]")}'ifnotis_optimum_intel_available():raiseImportError(err_msg)# TODO: upgrade _MIN_OPTIMUM_VERSION to 1.22 after releasemin_optimum_version=("1.22"ifbackend=="ipex"andtask!="text-generation"else_MIN_OPTIMUM_VERSION)ifis_optimum_intel_version("<",min_optimum_version):raiseImportError(f"Backend: {backend} requires optimum-intel>="f"{min_optimum_version}. You can install it with pip: ""`pip install --upgrade --upgrade-strategy eager "f"`optimum[{backend}]`.")ifbackend=="openvino":ifnotis_openvino_available():raiseImportError(err_msg)fromoptimum.intelimport(# type: ignore[import]OVModelForCausalLM,OVModelForSeq2SeqLM,)model_cls=(OVModelForCausalLMiftask=="text-generation"elseOVModelForSeq2SeqLM)else:ifnotis_ipex_available():raiseImportError(err_msg)iftask=="text-generation":fromoptimum.intelimport(IPEXModelForCausalLM,# type: ignore[import])model_cls=IPEXModelForCausalLMelse:fromoptimum.intelimport(IPEXModelForSeq2SeqLM,# type: ignore[import])model_cls=IPEXModelForSeq2SeqLMelse:model_cls=(AutoModelForCausalLMiftask=="text-generation"elseAutoModelForSeq2SeqLM)model=model_cls.from_pretrained(model_id,**_model_kwargs)iftokenizer.pad_tokenisNone:ifmodel.config.pad_token_idisnotNone:tokenizer.pad_token_id=model.config.pad_token_idelifmodel.config.eos_token_idisnotNoneandisinstance(model.config.eos_token_id,int):tokenizer.pad_token_id=model.config.eos_token_ideliftokenizer.eos_token_idisnotNone:tokenizer.pad_token_id=tokenizer.eos_token_idelse:tokenizer.add_special_tokens({"pad_token":"[PAD]"})if((getattr(model,"is_loaded_in_4bit",False)orgetattr(model,"is_loaded_in_8bit",False))anddeviceisnotNoneandbackend=="default"):logger.warning(f"Setting the `device` argument to None from {device} to avoid ""the error caused by attempting to move the model that was already ""loaded on the GPU using the Accelerate module to the same or ""another device.")device=Noneif(deviceisnotNoneandimportlib.util.find_spec("torch")isnotNoneandbackend=="default"):importtorchcuda_device_count=torch.cuda.device_count()ifdevice<-1or(device>=cuda_device_count):raiseValueError(f"Got device=={device}, "f"device is required to be within [-1, {cuda_device_count})")ifdevice_mapisnotNoneanddevice<0:device=NoneifdeviceisnotNoneanddevice<0andcuda_device_count>0:logger.warning("Device has %d GPUs available. ""Provide device={deviceId} to `from_model_id` to use available""GPUs for execution. deviceId is -1 (default) for CPU and ""can be a positive integer associated with CUDA device id.",cuda_device_count,)ifdeviceisnotNoneanddevice_mapisnotNoneandbackend=="openvino":logger.warning("Please set device for OpenVINO through: `model_kwargs`")if"trust_remote_code"in_model_kwargs:_model_kwargs={k:vfork,vin_model_kwargs.items()ifk!="trust_remote_code"}_pipeline_kwargs=pipeline_kwargsor{}pipeline=hf_pipeline(task=task,model=model,tokenizer=tokenizer,device=device,batch_size=batch_size,model_kwargs=_model_kwargs,**_pipeline_kwargs,)ifpipeline.tasknotinVALID_TASKS:raiseValueError(f"Got invalid task {pipeline.task}, "f"currently only {VALID_TASKS} are supported")returncls(pipeline=pipeline,model_id=model_id,model_kwargs=_model_kwargs,pipeline_kwargs=_pipeline_kwargs,batch_size=batch_size,**kwargs,)
@propertydef_identifying_params(self)->Mapping[str,Any]:"""Get the identifying parameters."""return{"model_id":self.model_id,"model_kwargs":self.model_kwargs,"pipeline_kwargs":self.pipeline_kwargs,}@propertydef_llm_type(self)->str:return"huggingface_pipeline"def_generate(self,prompts:List[str],stop:Optional[List[str]]=None,run_manager:Optional[CallbackManagerForLLMRun]=None,**kwargs:Any,)->LLMResult:# List to hold all resultstext_generations:List[str]=[]pipeline_kwargs=kwargs.get("pipeline_kwargs",{})skip_prompt=kwargs.get("skip_prompt",False)foriinrange(0,len(prompts),self.batch_size):batch_prompts=prompts[i:i+self.batch_size]# Process batch of promptsresponses=self.pipeline(batch_prompts,**pipeline_kwargs,)# Process each response in the batchforj,responseinenumerate(responses):ifisinstance(response,list):# if model returns multiple generations, pick the top oneresponse=response[0]ifself.pipeline.task=="text-generation":text=response["generated_text"]elifself.pipeline.task=="text2text-generation":text=response["generated_text"]elifself.pipeline.task=="summarization":text=response["summary_text"]elifself.pipeline.taskin"translation":text=response["translation_text"]else:raiseValueError(f"Got invalid task {self.pipeline.task}, "f"currently only {VALID_TASKS} are supported")ifskip_prompt:text=text[len(batch_prompts[j]):]# Append the processed text to resultstext_generations.append(text)returnLLMResult(generations=[[Generation(text=text)]fortextintext_generations])def_stream(self,prompt:str,stop:Optional[List[str]]=None,run_manager:Optional[CallbackManagerForLLMRun]=None,**kwargs:Any,)->Iterator[GenerationChunk]:fromthreadingimportThreadimporttorchfromtransformersimport(StoppingCriteria,StoppingCriteriaList,TextIteratorStreamer,)pipeline_kwargs=kwargs.get("pipeline_kwargs",{})skip_prompt=kwargs.get("skip_prompt",True)ifstopisnotNone:stop=self.pipeline.tokenizer.convert_tokens_to_ids(stop)stopping_ids_list=stopor[]classStopOnTokens(StoppingCriteria):def__call__(self,input_ids:torch.LongTensor,scores:torch.FloatTensor,**kwargs:Any,)->bool:forstop_idinstopping_ids_list:ifinput_ids[0][-1]==stop_id:returnTruereturnFalsestopping_criteria=StoppingCriteriaList([StopOnTokens()])streamer=TextIteratorStreamer(self.pipeline.tokenizer,timeout=60.0,skip_prompt=skip_prompt,skip_special_tokens=True,)generation_kwargs=dict(text_inputs=prompt,streamer=streamer,stopping_criteria=stopping_criteria,**pipeline_kwargs,)t1=Thread(target=self.pipeline,kwargs=generation_kwargs)t1.start()forcharinstreamer:chunk=GenerationChunk(text=char)ifrun_manager:run_manager.on_llm_new_token(chunk.text,chunk=chunk)yieldchunk