[docs]classGoogleEmbeddingModelVersion(str,Enum):EMBEDDINGS_JUNE_2023=auto()EMBEDDINGS_NOV_2023=auto()EMBEDDINGS_DEC_2023=auto()EMBEDDINGS_MAY_2024=auto()EMBEDDINGS_NOV_2024=auto()@classmethoddef_missing_(cls,value:Any)->"GoogleEmbeddingModelVersion":if"textembedding-gecko@001"invalue.lower():returnGoogleEmbeddingModelVersion.EMBEDDINGS_JUNE_2023if("textembedding-gecko@002"invalue.lower()or"textembedding-gecko-multilingual@001"invalue.lower()):returnGoogleEmbeddingModelVersion.EMBEDDINGS_NOV_2023if"textembedding-gecko@003"invalue.lower():returnGoogleEmbeddingModelVersion.EMBEDDINGS_DEC_2023if("text-embedding-004"invalue.lower()or"text-multilingual-embedding-002"invalue.lower()or"text-embedding-preview-0409"invalue.lower()or"text-multilingual-embedding-preview-0409"invalue.lower()):returnGoogleEmbeddingModelVersion.EMBEDDINGS_MAY_2024if"text-embedding-005"invalue.lower():returnGoogleEmbeddingModelVersion.EMBEDDINGS_NOV_2024returnGoogleEmbeddingModelVersion.EMBEDDINGS_JUNE_2023@propertydeftask_type_supported(self)->bool:""" Checks if the model generation supports task type. """returnself!=GoogleEmbeddingModelVersion.EMBEDDINGS_JUNE_2023@propertydefoutput_dimensionality_supported(self)->bool:""" Checks if the model generation supports output dimensionality. """returnself==GoogleEmbeddingModelVersion.EMBEDDINGS_MAY_2024
[docs]classVertexAIEmbeddings(_VertexAICommon,Embeddings):"""Google Cloud VertexAI embedding models."""# Instance contextinstance:Dict[str,Any]={}#: :meta private:model_config=ConfigDict(extra="forbid",protected_namespaces=(),)@model_validator(mode="after")defvalidate_environment(self)->Self:"""Validates that the python package exists in environment."""values={"project":self.project,"location":self.location,"credentials":self.credentials,"api_transport":self.api_transport,"api_endpoint":self.api_endpoint,"default_metadata":self.default_metadata,}self._init_vertexai(values)_,user_agent=get_user_agent(f"{self.__class__.__name__}_{self.model_name}")withtelemetry.tool_context_manager(user_agent):if(GoogleEmbeddingModelType(self.model_name)==GoogleEmbeddingModelType.MULTIMODAL):self.client=MultiModalEmbeddingModel.from_pretrained(self.model_name)else:self.client=TextEmbeddingModel.from_pretrained(self.model_name)returnselfdef__init__(self,model_name:Optional[str]=None,project:Optional[str]=None,location:str="us-central1",request_parallelism:int=5,max_retries:int=6,credentials:Optional[Any]=None,**kwargs:Any,):"""Initialize the sentence_transformer."""ifmodel_name:kwargs["model_name"]=model_namesuper().__init__(project=project,location=location,credentials=credentials,request_parallelism=request_parallelism,max_retries=max_retries,**kwargs,)self.instance["max_batch_size"]=kwargs.get("max_batch_size",_MAX_BATCH_SIZE)self.instance["batch_size"]=self.instance["max_batch_size"]self.instance["min_batch_size"]=kwargs.get("min_batch_size",_MIN_BATCH_SIZE)self.instance["min_good_batch_size"]=self.instance["min_batch_size"]self.instance["lock"]=threading.Lock()self.instance["batch_size_validated"]=Falseself.instance["task_executor"]=ThreadPoolExecutor(max_workers=request_parallelism)retry_errors:List[Type[BaseException]]=[ResourceExhausted,ServiceUnavailable,Aborted,DeadlineExceeded,InternalServerError,]retry_decorator=create_base_retry_decorator(error_types=retry_errors,max_retries=self.max_retries)self.instance["get_embeddings_with_retry"]=retry_decorator(self.client.get_embeddings)@cached_propertydef_image_bytes_loader_client(self):returnImageBytesLoader(project=self.project)@propertydefmodel_type(self)->str:returnGoogleEmbeddingModelType(self.model_name)@propertydefmodel_version(self)->GoogleEmbeddingModelVersion:returnGoogleEmbeddingModelVersion(self.model_name)@staticmethoddef_split_by_punctuation(text:str)->List[str]:"""Splits a string by punctuation and whitespace characters."""split_by=string.punctuation+"\t\n "pattern=f"([{split_by}])"# Using re.split to split the text based on the patternreturn[segmentforsegmentinre.split(pattern,text)ifsegment]@staticmethoddef_prepare_batches(texts:List[str],batch_size:int)->List[List[str]]:"""Splits texts in batches based on current maximum batch size and maximum tokens per request. """text_index=0texts_len=len(texts)batch_token_len=0batches:List[List[str]]=[]current_batch:List[str]=[]iftexts_len==0:return[]whiletext_index<texts_len:current_text=texts[text_index]# Number of tokens per a text is conservatively estimated# as 2 times number of words, punctuation and whitespace characters.# Using `count_tokens` API will make batching too expensive.# Utilizing a tokenizer, would add a dependency that would not# necessarily be reused by the application using this class.current_text_token_cnt=(len(VertexAIEmbeddings._split_by_punctuation(current_text))*2)end_of_batch=Falseifcurrent_text_token_cnt>_MAX_TOKENS_PER_BATCH:# Current text is too big even for a single batch.# Such request will fail, but we still make a batch# so that the app can get the error from the API.iflen(current_batch)>0:# Adding current batch if not empty.batches.append(current_batch)current_batch=[current_text]text_index+=1end_of_batch=Trueelif(batch_token_len+current_text_token_cnt>_MAX_TOKENS_PER_BATCHorlen(current_batch)==batch_size):end_of_batch=Trueelse:iftext_index==texts_len-1:# Last element - even though the batch may be not big,# we still need to make it.end_of_batch=Truebatch_token_len+=current_text_token_cntcurrent_batch.append(current_text)text_index+=1ifend_of_batch:batches.append(current_batch)current_batch=[]batch_token_len=0returnbatchesdef_get_embeddings_with_retry(self,texts:List[str],embeddings_type:Optional[str]=None,dimensions:Optional[int]=None,)->List[List[float]]:"""Makes a Vertex AI model request with retry logic."""withtelemetry.tool_context_manager(self._user_agent):ifself.model_type==GoogleEmbeddingModelType.MULTIMODAL:returnself._get_multimodal_embeddings_with_retry(texts,dimensions)returnself._get_text_embeddings_with_retry(texts,embeddings_type=embeddings_type,output_dimensionality=dimensions)def_get_multimodal_embeddings_with_retry(self,texts:List[str],dimensions:Optional[int]=None)->List[List[float]]:tasks=[]fortextintexts:tasks.append(self.instance["task_executor"].submit(self.instance["get_embeddings_with_retry"],contextual_text=text,dimension=dimensions,))iflen(tasks)>0:wait(tasks)embeddings=[task.result().text_embeddingfortaskintasks]returnembeddingsdef_get_text_embeddings_with_retry(self,texts:List[str],embeddings_type:Optional[str]=None,output_dimensionality:Optional[int]=None,)->List[List[float]]:"""Makes a Vertex AI model request with retry logic."""ifembeddings_typeandself.model_version.task_type_supported:requests=[TextEmbeddingInput(text=t,task_type=embeddings_type)fortintexts]else:requests=textskwargs={}ifoutput_dimensionalityandself.model_version.output_dimensionality_supported:kwargs["output_dimensionality"]=output_dimensionalityembeddings=self.instance["get_embeddings_with_retry"](requests,**kwargs)return[embedding.valuesforembeddinginembeddings]def_prepare_and_validate_batches(self,texts:List[str],embeddings_type:Optional[str]=None)->Tuple[List[List[float]],List[List[str]]]:"""Prepares text batches with one-time validation of batch size. Batch size varies between GCP regions and individual project quotas. # Returns embeddings of the first text batch that went through, # and text batches for the rest of the texts. """batches=VertexAIEmbeddings._prepare_batches(texts,self.instance["batch_size"])# If batch size if less or equal to one that went through before,# then keep batches as they are.iflen(batches[0])<=self.instance["min_good_batch_size"]:return[],batcheswithself.instance["lock"]:# If largest possible batch size was validated# while waiting for the lock, then check for rebuilding# our batches, and return.ifself.instance["batch_size_validated"]:iflen(batches[0])<=self.instance["batch_size"]:return[],batcheselse:return[],VertexAIEmbeddings._prepare_batches(texts,self.instance["batch_size"])# Figure out the largest possible batch size by trying to push# batches and lowering their size in half after every failure.first_batch=batches[0]first_result=[]had_failure=FalsewhileTrue:try:first_result=self._get_embeddings_with_retry(first_batch,embeddings_type)breakexceptInvalidArgument:had_failure=Truefirst_batch_len=len(first_batch)iffirst_batch_len==self.instance["min_batch_size"]:raisefirst_batch_len=max(self.instance["min_batch_size"],int(first_batch_len/2))first_batch=first_batch[:first_batch_len]first_batch_len=len(first_batch)self.instance["min_good_batch_size"]=max(self.instance["min_good_batch_size"],first_batch_len)# If had a failure and recovered# or went through with the max size, then it's a legit batch size.ifhad_failureorfirst_batch_len==self.instance["max_batch_size"]:self.instance["batch_size"]=first_batch_lenself.instance["batch_size_validated"]=True# If batch size was updated,# rebuild batches with the new batch size# (texts that went through are excluded here).iffirst_batch_len!=self.instance["max_batch_size"]:batches=VertexAIEmbeddings._prepare_batches(texts[first_batch_len:],self.instance["batch_size"])else:batches=batches[1:]else:# Still figuring out max batch size.batches=batches[1:]# Returning embeddings of the first text batch that went through,# and text batches for the rest of texts.returnfirst_result,batches
[docs]defembed(self,texts:List[str],batch_size:int=0,embeddings_task_type:Optional[EmbeddingTaskTypes]=None,dimensions:Optional[int]=None,)->List[List[float]]:"""Embed a list of strings. Args: texts: List[str] The list of strings to embed. batch_size: [int] The batch size of embeddings to send to the model. If zero, then the largest batch size will be detected dynamically at the first request, starting from 250, down to 5. embeddings_task_type: [str] optional embeddings task type, one of the following RETRIEVAL_QUERY - Text is a query in a search/retrieval setting. RETRIEVAL_DOCUMENT - Text is a document in a search/retrieval setting. SEMANTIC_SIMILARITY - Embeddings will be used for Semantic Textual Similarity (STS). CLASSIFICATION - Embeddings will be used for classification. CLUSTERING - Embeddings will be used for clustering. CODE_RETRIEVAL_QUERY - Embeddings will be used for code retrieval for Java and Python. The following are only supported on preview models: QUESTION_ANSWERING FACT_VERIFICATION dimensions: [int] optional. Output embeddings dimensions. Only supported on preview models. Returns: List of embeddings, one for each text. """iflen(texts)==0:return[]embeddings:List[List[float]]=[]first_batch_result:List[List[float]]=[]ifbatch_size>0:# Fixed batch size.batches=VertexAIEmbeddings._prepare_batches(texts,batch_size)else:# Dynamic batch size, starting from 250 at the first call.first_batch_result,batches=self._prepare_and_validate_batches(texts,embeddings_task_type)# First batch result may have some embeddings already.# In such case, batches have texts that were not processed yet.embeddings.extend(first_batch_result)tasks=[]forbatchinbatches:tasks.append(self.instance["task_executor"].submit(self._get_embeddings_with_retry,texts=batch,embeddings_type=embeddings_task_type,dimensions=dimensions,))iflen(tasks)>0:wait(tasks)fortintasks:embeddings.extend(t.result())returnembeddings
[docs]defembed_documents(self,texts:List[str],batch_size:int=0,*,embeddings_task_type:EmbeddingTaskTypes="RETRIEVAL_DOCUMENT",)->List[List[float]]:"""Embed a list of documents. Args: texts: List[str] The list of texts to embed. batch_size: [int] The batch size of embeddings to send to the model. If zero, then the largest batch size will be detected dynamically at the first request, starting from 250, down to 5. Returns: List of embeddings, one for each text. """returnself.embed(texts,batch_size,embeddings_task_type)
[docs]defembed_query(self,text:str,*,embeddings_task_type:EmbeddingTaskTypes="RETRIEVAL_QUERY",)->List[float]:"""Embed a text. Args: text: The text to embed. Returns: Embedding for the text. """returnself.embed([text],1,embeddings_task_type)[0]
[docs]@deprecated(since="2.0.1",removal="3.0.0",alternative="VertexAIEmbeddings.embed_images()")defembed_image(self,image_path:str,contextual_text:Optional[str]=None,dimensions:Optional[int]=None,)->List[float]:"""Embed an image. Args: image_path: Path to image (Google Cloud Storage or web) to generate embeddings for. contextual_text: Text to generate embeddings for. Returns: Embedding for the image. """warnings.warn("The `embed_image()` API will be deprecated and replaced by \ `embed_images()`. Change your usage to \ `embed_images([image_path1, image_path2])` and note\ that the result returned will be a list of image embeddings.")ifself.model_type!=GoogleEmbeddingModelType.MULTIMODAL:raiseNotImplementedError("Only supported for multimodal models")image_loader=self._image_bytes_loader_clientbytes_image=image_loader.load_bytes(image_path)image=Image(bytes_image)result:MultiModalEmbeddingResponse=self.instance["get_embeddings_with_retry"](image=image,contextual_text=contextual_text,dimension=dimensions)returnresult.image_embedding
[docs]defembed_images(self,uris:List[str],contextual_text:Optional[str]=None,dimensions:Optional[int]=None,)->List[List[float]]:"""Embed a list of images. Args: uris: Paths to image (local, Google Cloud Storage or web) to generate embeddings for. contextual_text: Text to generate embeddings for. Returns: Embedding for the image. """ifself.model_type!=GoogleEmbeddingModelType.MULTIMODAL:raiseNotImplementedError("Only supported for multimodal models")image_loader=self._image_bytes_loader_clientembeddings=[]forimage_pathinuris:bytes_image=image_loader.load_bytes(image_path)image=Image(bytes_image)result:MultiModalEmbeddingResponse=self.instance["get_embeddings_with_retry"](image=image,contextual_text=contextual_text,dimension=dimensions)embeddings.append(result.image_embedding)returnembeddings