Source code for elasticsearch.helpers.vectorstore._sync.strategies
# Licensed to Elasticsearch B.V. under one or more contributor# license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright# ownership. Elasticsearch B.V. licenses this file to you under# the Apache License, Version 2.0 (the "License"); you may# not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.fromabcimportABC,abstractmethodfromtypingimportAny,Dict,List,Optional,Tuple,Union,castfromelasticsearchimportElasticsearchfromelasticsearch.helpers.vectorstore._sync._utilsimportmodel_must_be_deployedfromelasticsearch.helpers.vectorstore._utilsimportDistanceMetricclassRetrievalStrategy(ABC):
[docs]@abstractmethoddefes_query(self,*,query:Optional[str],query_vector:Optional[List[float]],text_field:str,vector_field:str,k:int,num_candidates:int,filter:List[Dict[str,Any]]=[],)->Dict[str,Any]:""" Returns the Elasticsearch query body for the given parameters. The store will execute the query. :param query: The text query. Can be None if query_vector is given. :param k: The total number of results to retrieve. :param num_candidates: The number of results to fetch initially in knn search. :param filter: List of filter clauses to apply to the query. :param query_vector: The query vector. Can be None if a query string is given. :return: The Elasticsearch query body. """
[docs]@abstractmethoddefes_mappings_settings(self,*,text_field:str,vector_field:str,num_dimensions:Optional[int],)->Tuple[Dict[str,Any],Dict[str,Any]]:""" Create the required index and do necessary preliminary work, like creating inference pipelines or checking if a required model was deployed. :param client: Elasticsearch client connection. :param text_field: The field containing the text data in the index. :param vector_field: The field containing the vector representations in the index. :param num_dimensions: If vectors are indexed, how many dimensions do they have. :return: Dictionary with field and field type pairs that describe the schema. """
[docs]defbefore_index_creation(self,*,client:Elasticsearch,text_field:str,vector_field:str)->None:""" Executes before the index is created. Used for setting up any required Elasticsearch resources like a pipeline. Defaults to a no-op. :param client: The Elasticsearch client. :param text_field: The field containing the text data in the index. :param vector_field: The field containing the vector representations in the index. """pass
[docs]defneeds_inference(self)->bool:""" Some retrieval strategies index embedding vectors and allow search by embedding vector, for example the `DenseVectorStrategy` strategy. Mapping a user input query string to an embedding vector is called inference. Inference can be applied in Elasticsearch (using a `model_id`) or outside of Elasticsearch (using an `EmbeddingService` defined on the `VectorStore`). In the latter case, this method has to return True. """returnFalse
classSparseVectorStrategy(RetrievalStrategy):"""Sparse retrieval strategy using the `text_expansion` processor."""
[docs]defes_query(self,*,query:Optional[str],query_vector:Optional[List[float]],text_field:str,vector_field:str,k:int,num_candidates:int,filter:List[Dict[str,Any]]=[],)->Dict[str,Any]:ifquery_vector:raiseValueError("Cannot do sparse retrieval with a query_vector. ""Inference is currently always applied in Elasticsearch.")ifqueryisNone:raiseValueError("please specify a query string")return{"query":{"bool":{"must":[{"text_expansion":{f"{vector_field}.{self._tokens_field}":{"model_id":self.model_id,"model_text":query,}}}],"filter":filter,}}}
[docs]defbefore_index_creation(self,*,client:Elasticsearch,text_field:str,vector_field:str)->None:ifself.model_id:model_must_be_deployed(client,self.model_id)# Create a pipeline for the modelclient.ingest.put_pipeline(id=self._pipeline_name,description="Embedding pipeline for Python VectorStore",processors=[{"inference":{"model_id":self.model_id,"target_field":vector_field,"field_map":{text_field:"text_field"},"inference_config":{"text_expansion":{"results_field":self._tokens_field}},}}],)
[docs]def__init__(self,*,distance:DistanceMetric=DistanceMetric.COSINE,model_id:Optional[str]=None,hybrid:bool=False,rrf:Union[bool,Dict[str,Any]]=True,text_field:Optional[str]="text_field",):ifhybridandnottext_field:raiseValueError("to enable hybrid you have to specify a text_field (for BM25Strategy matching)")self.distance=distanceself.model_id=model_idself.hybrid=hybridself.rrf=rrfself.text_field=text_field
[docs]defes_query(self,*,query:Optional[str],query_vector:Optional[List[float]],text_field:str,vector_field:str,k:int,num_candidates:int,filter:List[Dict[str,Any]]=[],)->Dict[str,Any]:knn={"filter":filter,"field":vector_field,"k":k,"num_candidates":num_candidates,}ifquery_vectorisnotNone:knn["query_vector"]=query_vectorelse:# Inference in Elasticsearch. When initializing we make sure to always have# a model_id if don't have an embedding_service.knn["query_vector_builder"]={"text_embedding":{"model_id":self.model_id,"model_text":query,}}ifself.hybrid:returnself._hybrid(query=cast(str,query),knn=knn,filter=filter)return{"knn":knn}
[docs]defes_mappings_settings(self,*,text_field:str,vector_field:str,num_dimensions:Optional[int],)->Tuple[Dict[str,Any],Dict[str,Any]]:ifself.distanceisDistanceMetric.COSINE:similarity="cosine"elifself.distanceisDistanceMetric.EUCLIDEAN_DISTANCE:similarity="l2_norm"elifself.distanceisDistanceMetric.DOT_PRODUCT:similarity="dot_product"elifself.distanceisDistanceMetric.MAX_INNER_PRODUCT:similarity="max_inner_product"else:raiseValueError(f"Similarity {self.distance} not supported.")mappings:Dict[str,Any]={"properties":{vector_field:{"type":"dense_vector","dims":num_dimensions,"index":True,"similarity":similarity,},}}returnmappings,{}
def_hybrid(self,query:str,knn:Dict[str,Any],filter:List[Dict[str,Any]])->Dict[str,Any]:# Add a query to the knn query.# RRF is used to even the score from the knn query and text query# RRF has two optional parameters: {'rank_constant':int, 'rank_window_size':int}# https://www.elastic.co/guide/en/elasticsearch/reference/current/rrf.htmlstandard_query={"query":{"bool":{"must":[{"match":{self.text_field:{"query":query,}}}],"filter":filter,}}}ifself.rrfisFalse:query_body={"knn":knn,**standard_query,}else:rrf_options={}ifisinstance(self.rrf,Dict):if"rank_constant"inself.rrf:rrf_options["rank_constant"]=self.rrf["rank_constant"]if"window_size"inself.rrf:# 'window_size' was renamed to 'rank_window_size', but we support# the older name for backwards compatibilityrrf_options["rank_window_size"]=self.rrf["window_size"]if"rank_window_size"inself.rrf:rrf_options["rank_window_size"]=self.rrf["rank_window_size"]query_body={"retriever":{"rrf":{"retrievers":[{"standard":standard_query},{"knn":knn},],**rrf_options,},},}returnquery_body