[docs]classSearchType(str,enum.Enum):"""Enumerator of the Distance strategies."""VECTOR="vector"HYBRID="hybrid"
DEFAULT_SEARCH_TYPE=SearchType.VECTOR
[docs]classIndexType(str,enum.Enum):"""Enumerator of the index types."""NODE="NODE"RELATIONSHIP="RELATIONSHIP"
DEFAULT_INDEX_TYPE=IndexType.NODEdef_get_search_index_query(search_type:SearchType,index_type:IndexType=DEFAULT_INDEX_TYPE)->str:ifindex_type==IndexType.NODE:type_to_query_map={SearchType.VECTOR:("CALL db.index.vector.queryNodes($index, $k, $embedding) ""YIELD node, score "),SearchType.HYBRID:("CALL { ""CALL db.index.vector.queryNodes($index, $k, $embedding) ""YIELD node, score ""WITH collect({node:node, score:score}) AS nodes, max(score) AS max ""UNWIND nodes AS n "# We use 0 as min"RETURN n.node AS node, (n.score / max) AS score UNION ""CALL db.index.fulltext.queryNodes($keyword_index, $query, ""{limit: $k}) YIELD node, score ""WITH collect({node:node, score:score}) AS nodes, max(score) AS max ""UNWIND nodes AS n "# We use 0 as min"RETURN n.node AS node, (n.score / max) AS score ""} "# dedup"WITH node, max(score) AS score ORDER BY score DESC LIMIT $k "),}returntype_to_query_map[search_type]else:return("CALL db.index.vector.queryRelationships($index, $k, $embedding) ""YIELD relationship, score ")
[docs]defcheck_if_not_null(props:List[str],values:List[Any])->None:"""Check if the values are not None or empty string"""forprop,valueinzip(props,values):ifnotvalue:raiseValueError(f"Parameter `{prop}` must not be None or empty string")
[docs]defsort_by_index_name(lst:List[Dict[str,Any]],index_name:str)->List[Dict[str,Any]]:"""Sort first element to match the index_name if exists"""returnsorted(lst,key=lambdax:x.get("name")!=index_name)
[docs]defremove_lucene_chars(text:str)->str:"""Remove Lucene special characters"""special_chars=["+","-","&","|","!","(",")","{","}","[","]","^",'"',"~","*","?",":","\\",]forcharinspecial_chars:ifcharintext:text=text.replace(char," ")returntext.strip()
[docs]defdict_to_yaml_str(input_dict:Dict,indent:int=0)->str:""" Convert a dictionary to a YAML-like string without using external libraries. Parameters: - input_dict (dict): The dictionary to convert. - indent (int): The current indentation level. Returns: - str: The YAML-like string representation of the input dictionary. """yaml_str=""forkey,valueininput_dict.items():padding=" "*indentifisinstance(value,dict):yaml_str+=f"{padding}{key}:\n{dict_to_yaml_str(value,indent+1)}"elifisinstance(value,list):yaml_str+=f"{padding}{key}:\n"foriteminvalue:yaml_str+=f"{padding}- {item}\n"else:yaml_str+=f"{padding}{key}: {value}\n"returnyaml_str
[docs]defcombine_queries(input_queries:List[Tuple[str,Dict[str,Any]]],operator:str)->Tuple[str,Dict[str,Any]]:"""Combine multiple queries with an operator."""# Initialize variables to hold the combined query and parameterscombined_query:str=""combined_params:Dict={}param_counter:Dict={}forquery,paramsininput_queries:# Process each query fragment and its parametersnew_query=queryforparam,valueinparams.items():# Update the parameter name to ensure uniquenessifparaminparam_counter:param_counter[param]+=1else:param_counter[param]=1new_param_name=f"{param}_{param_counter[param]}"# Replace the parameter in the query fragmentnew_query=new_query.replace(f"${param}",f"${new_param_name}")# Add the parameter to the combined parameters dictionarycombined_params[new_param_name]=value# Combine the query fragments with an AND operatorifcombined_query:combined_query+=f" {operator} "combined_query+=f"({new_query})"returncombined_query,combined_params
[docs]defcollect_params(input_data:List[Tuple[str,Dict[str,str]]],)->Tuple[List[str],Dict[str,Any]]:"""Transform the input data into the desired format. Args: - input_data (list of tuples): Input data to transform. Each tuple contains a string and a dictionary. Returns: - tuple: A tuple containing a list of strings and a dictionary. """# Initialize variables to hold the output partsquery_parts=[]params={}# Loop through each item in the input dataforquery_part,paramininput_data:# Append the query part to the listquery_parts.append(query_part)# Update the params dictionary with the param dictionaryparams.update(param)# Return the transformed datareturn(query_parts,params)
def_handle_field_filter(field:str,value:Any,param_number:int=1)->Tuple[str,Dict]:"""Create a filter for a specific field. Args: field: name of field value: value to filter If provided as is then this will be an equality filter If provided as a dictionary then this will be a filter, the key will be the operator and the value will be the value to filter by param_number: sequence number of parameters used to map between param dict and Cypher snippet Returns a tuple of - Cypher filter snippet - Dictionary with parameters used in filter snippet """ifnotisinstance(field,str):raiseValueError(f"field should be a string but got: {type(field)} with value: {field}")iffield.startswith("$"):raiseValueError(f"Invalid filter condition. Expected a field but got an operator: "f"{field}")# Allow [a-zA-Z0-9_], disallow $ for now until we support escape charactersifnotfield.isidentifier():raiseValueError(f"Invalid field name: {field}. Expected a valid identifier.")ifisinstance(value,dict):# This is a filter specificationiflen(value)!=1:raiseValueError("Invalid filter condition. Expected a value which ""is a dictionary with a single key that corresponds to an operator "f"but got a dictionary with {len(value)} keys. The first few "f"keys are: {list(value.keys())[:3]}")operator,filter_value=list(value.items())[0]# Verify that that operator is an operatorifoperatornotinSUPPORTED_OPERATORS:raiseValueError(f"Invalid operator: {operator}. "f"Expected one of {SUPPORTED_OPERATORS}")else:# Then we assume an equality operatoroperator="$eq"filter_value=valueifoperatorinCOMPARISONS_TO_NATIVE:# Then we implement an equality filter# native is trusted inputnative=COMPARISONS_TO_NATIVE[operator]query_snippet=f"n.`{field}` {native} $param_{param_number}"query_param={f"param_{param_number}":filter_value}return(query_snippet,query_param)elifoperator=="$between":low,high=filter_valuequery_snippet=(f"$param_{param_number}_low <= n.`{field}` <= $param_{param_number}_high")query_param={f"param_{param_number}_low":low,f"param_{param_number}_high":high,}return(query_snippet,query_param)elifoperatorin{"$in","$nin","$like","$ilike"}:# We'll do force coercion to textifoperatorin{"$in","$nin"}:forvalinfilter_value:ifnotisinstance(val,(str,int,float)):raiseNotImplementedError(f"Unsupported type: {type(val)} for value: {val}")ifoperatorin{"$in"}:query_snippet=f"n.`{field}` IN $param_{param_number}"query_param={f"param_{param_number}":filter_value}return(query_snippet,query_param)elifoperatorin{"$nin"}:query_snippet=f"n.`{field}` NOT IN $param_{param_number}"query_param={f"param_{param_number}":filter_value}return(query_snippet,query_param)elifoperatorin{"$like"}:query_snippet=f"n.`{field}` CONTAINS $param_{param_number}"query_param={f"param_{param_number}":filter_value.rstrip("%")}return(query_snippet,query_param)elifoperatorin{"$ilike"}:query_snippet=f"toLower(n.`{field}`) CONTAINS $param_{param_number}"query_param={f"param_{param_number}":filter_value.rstrip("%")}return(query_snippet,query_param)else:raiseNotImplementedError()else:raiseNotImplementedError()
[docs]defconstruct_metadata_filter(filter:Dict[str,Any])->Tuple[str,Dict]:"""Construct a metadata filter. Args: filter: A dictionary representing the filter condition. Returns: Tuple[str, Dict] """ifisinstance(filter,dict):iflen(filter)==1:# The only operators allowed at the top level are $AND and $OR# First check if an operator or a fieldkey,value=list(filter.items())[0]ifkey.startswith("$"):# Then it's an operatorifkey.lower()notin["$and","$or"]:raiseValueError(f"Invalid filter condition. Expected $and or $or "f"but got: {key}")else:# Then it's a fieldreturn_handle_field_filter(key,filter[key])# Here we handle the $and and $or operatorsifnotisinstance(value,list):raiseValueError(f"Expected a list, but got {type(value)} for value: {value}")ifkey.lower()=="$and":and_=combine_queries([construct_metadata_filter(el)forelinvalue],"AND")iflen(and_)>=1:returnand_else:raiseValueError("Invalid filter condition. Expected a dictionary ""but got an empty dictionary")elifkey.lower()=="$or":or_=combine_queries([construct_metadata_filter(el)forelinvalue],"OR")iflen(or_)>=1:returnor_else:raiseValueError("Invalid filter condition. Expected a dictionary ""but got an empty dictionary")else:raiseValueError(f"Invalid filter condition. Expected $and or $or "f"but got: {key}")eliflen(filter)>1:# Then all keys have to be fields (they cannot be operators)forkeyinfilter.keys():ifkey.startswith("$"):raiseValueError(f"Invalid filter condition. Expected a field but got: {key}")# These should all be fields and combined using an $and operatorand_multiple=collect_params([_handle_field_filter(k,v,index)forindex,(k,v)inenumerate(filter.items())])iflen(and_multiple)>=1:return" AND ".join(and_multiple[0]),and_multiple[1]else:raiseValueError("Invalid filter condition. Expected a dictionary ""but got an empty dictionary")else:raiseValueError("Got an empty dictionary for filters.")
[docs]classNeo4jVector(VectorStore):"""`Neo4j` vector index. To use, you should have the ``neo4j`` python package installed. Args: url: Neo4j connection url username: Neo4j username. password: Neo4j password database: Optionally provide Neo4j database Defaults to "neo4j" embedding: Any embedding function implementing `langchain.embeddings.base.Embeddings` interface. distance_strategy: The distance strategy to use. (default: COSINE) pre_delete_collection: If True, will delete existing data if it exists. (default: False). Useful for testing. Example: .. code-block:: python from langchain_community.vectorstores.neo4j_vector import Neo4jVector from langchain_community.embeddings.openai import OpenAIEmbeddings url="bolt://localhost:7687" username="neo4j" password="pleaseletmein" embeddings = OpenAIEmbeddings() vectorestore = Neo4jVector.from_documents( embedding=embeddings, documents=docs, url=url username=username, password=password, ) """
[docs]def__init__(self,embedding:Embeddings,*,search_type:SearchType=SearchType.VECTOR,username:Optional[str]=None,password:Optional[str]=None,url:Optional[str]=None,keyword_index_name:Optional[str]="keyword",database:Optional[str]=None,index_name:str="vector",node_label:str="Chunk",embedding_node_property:str="embedding",text_node_property:str="text",distance_strategy:DistanceStrategy=DEFAULT_DISTANCE_STRATEGY,logger:Optional[logging.Logger]=None,pre_delete_collection:bool=False,retrieval_query:str="",relevance_score_fn:Optional[Callable[[float],float]]=None,index_type:IndexType=DEFAULT_INDEX_TYPE,graph:Optional[Neo4jGraph]=None,)->None:try:importneo4jexceptImportError:raiseImportError("Could not import neo4j python package. ""Please install it with `pip install neo4j`.")# Allow only cosine and euclidean distance strategiesifdistance_strategynotin[DistanceStrategy.EUCLIDEAN_DISTANCE,DistanceStrategy.COSINE,]:raiseValueError("distance_strategy must be either 'EUCLIDEAN_DISTANCE' or 'COSINE'")# Graph object takes precedent over env or input paramsifgraph:self._driver=graph._driverself._database=graph._databaseelse:# Handle if the credentials are environment variables# Support URL for backwards compatibilityifnoturl:url=os.environ.get("NEO4J_URL")url=get_from_dict_or_env({"url":url},"url","NEO4J_URI")username=get_from_dict_or_env({"username":username},"username","NEO4J_USERNAME")password=get_from_dict_or_env({"password":password},"password","NEO4J_PASSWORD")database=get_from_dict_or_env({"database":database},"database","NEO4J_DATABASE","neo4j")self._driver=neo4j.GraphDatabase.driver(url,auth=(username,password))self._database=database# Verify connectiontry:self._driver.verify_connectivity()exceptneo4j.exceptions.ServiceUnavailable:raiseValueError("Could not connect to Neo4j database. ""Please ensure that the url is correct")exceptneo4j.exceptions.AuthError:raiseValueError("Could not connect to Neo4j database. ""Please ensure that the username and password are correct")self.schema=""# Verify if the version support vector indexself._is_enterprise=Falseself.verify_version()# Verify that required values are not nullcheck_if_not_null(["index_name","node_label","embedding_node_property","text_node_property",],[index_name,node_label,embedding_node_property,text_node_property],)self.embedding=embeddingself._distance_strategy=distance_strategyself.index_name=index_nameself.keyword_index_name=keyword_index_nameself.node_label=node_labelself.embedding_node_property=embedding_node_propertyself.text_node_property=text_node_propertyself.logger=loggerorlogging.getLogger(__name__)self.override_relevance_score_fn=relevance_score_fnself.retrieval_query=retrieval_queryself.search_type=search_typeself._index_type=index_type# Calculate embedding dimensionself.embedding_dimension=len(embedding.embed_query("foo"))# Delete existing data if flaggedifpre_delete_collection:fromneo4j.exceptionsimportDatabaseErrorself.query(f"MATCH (n:`{self.node_label}`) ""CALL { WITH n DETACH DELETE n } ""IN TRANSACTIONS OF 10000 ROWS;")# Delete indextry:self.query(f"DROP INDEX {self.index_name}")exceptDatabaseError:# Index didn't exist yetpass
[docs]defquery(self,query:str,*,params:Optional[dict]=None,retry_on_session_expired:bool=True,)->List[Dict[str,Any]]:""" This method sends a Cypher query to the connected Neo4j database and returns the results as a list of dictionaries. Args: query (str): The Cypher query to execute. params (dict, optional): Dictionary of query parameters. Defaults to {}. Returns: List[Dict[str, Any]]: List of dictionaries containing the query results. """fromneo4j.exceptionsimportCypherSyntaxError,SessionExpiredparams=paramsor{}withself._driver.session(database=self._database)assession:try:data=session.run(query,params)return[r.data()forrindata]exceptCypherSyntaxErrorase:raiseValueError(f"Cypher Statement is not valid\n{e}")except(SessionExpired)ase:# Session expired is a transient error that can be retriedifretry_on_session_expired:returnself.query(query,params=params,retry_on_session_expired=False)else:raisee
[docs]defverify_version(self)->None:""" Check if the connected Neo4j database version supports vector indexing. Queries the Neo4j database to retrieve its version and compares it against a target version (5.11.0) that is known to support vector indexing. Raises a ValueError if the connected Neo4j version is not supported. """db_data=self.query("CALL dbms.components()")version=db_data[0]["versions"][0]if"aura"inversion:version_tuple=tuple(map(int,version.split("-")[0].split(".")))+(0,)else:version_tuple=tuple(map(int,version.split(".")))target_version=(5,11,0)ifversion_tuple<target_version:raiseValueError("Version index is only supported in Neo4j version 5.11 or greater")# Flag for metadata filteringmetadata_target_version=(5,18,0)ifversion_tuple<metadata_target_version:self.support_metadata_filter=Falseelse:self.support_metadata_filter=True# Flag for enterpriseself._is_enterprise=Trueifdb_data[0]["edition"]=="enterprise"elseFalse
[docs]defretrieve_existing_index(self)->Tuple[Optional[int],Optional[str]]:""" Check if the vector index exists in the Neo4j database and returns its embedding dimension. This method queries the Neo4j database for existing indexes and attempts to retrieve the dimension of the vector index with the specified name. If the index exists, its dimension is returned. If the index doesn't exist, `None` is returned. Returns: int or None: The embedding dimension of the existing index if found. """index_information=self.query("SHOW INDEXES YIELD name, type, entityType, labelsOrTypes, ""properties, options WHERE type = 'VECTOR' AND (name = $index_name ""OR (labelsOrTypes[0] = $node_label AND ""properties[0] = $embedding_node_property)) ""RETURN name, entityType, labelsOrTypes, properties, options ",params={"index_name":self.index_name,"node_label":self.node_label,"embedding_node_property":self.embedding_node_property,},)# sort by index_nameindex_information=sort_by_index_name(index_information,self.index_name)try:self.index_name=index_information[0]["name"]self.node_label=index_information[0]["labelsOrTypes"][0]self.embedding_node_property=index_information[0]["properties"][0]self._index_type=index_information[0]["entityType"]embedding_dimension=Noneindex_config=index_information[0]["options"]["indexConfig"]if"vector.dimensions"inindex_config:embedding_dimension=index_config["vector.dimensions"]returnembedding_dimension,index_information[0]["entityType"]exceptIndexError:returnNone,None
[docs]defretrieve_existing_fts_index(self,text_node_properties:List[str]=[])->Optional[str]:""" Check if the fulltext index exists in the Neo4j database This method queries the Neo4j database for existing fts indexes with the specified name. Returns: (Tuple): keyword index information """index_information=self.query("SHOW INDEXES YIELD name, type, labelsOrTypes, properties, options ""WHERE type = 'FULLTEXT' AND (name = $keyword_index_name ""OR (labelsOrTypes = [$node_label] AND ""properties = $text_node_property)) ""RETURN name, labelsOrTypes, properties, options ",params={"keyword_index_name":self.keyword_index_name,"node_label":self.node_label,"text_node_property":text_node_propertiesor[self.text_node_property],},)# sort by index_nameindex_information=sort_by_index_name(index_information,self.index_name)try:self.keyword_index_name=index_information[0]["name"]self.text_node_property=index_information[0]["properties"][0]node_label=index_information[0]["labelsOrTypes"][0]returnnode_labelexceptIndexError:returnNone
[docs]defcreate_new_index(self)->None:""" This method constructs a Cypher query and executes it to create a new vector index in Neo4j. """index_query=("CALL db.index.vector.createNodeIndex(""$index_name,""$node_label,""$embedding_node_property,""toInteger($embedding_dimension),""$similarity_metric )")parameters={"index_name":self.index_name,"node_label":self.node_label,"embedding_node_property":self.embedding_node_property,"embedding_dimension":self.embedding_dimension,"similarity_metric":DISTANCE_MAPPING[self._distance_strategy],}self.query(index_query,params=parameters)
[docs]defcreate_new_keyword_index(self,text_node_properties:List[str]=[])->None:""" This method constructs a Cypher query and executes it to create a new full text index in Neo4j. """node_props=text_node_propertiesor[self.text_node_property]fts_index_query=(f"CREATE FULLTEXT INDEX {self.keyword_index_name} "f"FOR (n:`{self.node_label}`) ON EACH "f"[{', '.join(['n.`'+el+'`'forelinnode_props])}]")self.query(fts_index_query)
@propertydefembeddings(self)->Embeddings:returnself.embedding@classmethoddef__from(cls,texts:List[str],embeddings:List[List[float]],embedding:Embeddings,metadatas:Optional[List[dict]]=None,ids:Optional[List[str]]=None,create_id_index:bool=True,search_type:SearchType=SearchType.VECTOR,**kwargs:Any,)->Neo4jVector:ifidsisNone:ids=[md5(text.encode("utf-8")).hexdigest()fortextintexts]ifnotmetadatas:metadatas=[{}for_intexts]store=cls(embedding=embedding,search_type=search_type,**kwargs,)# Check if the vector index already existsembedding_dimension,index_type=store.retrieve_existing_index()# Raise error if relationship index typeifindex_type=="RELATIONSHIP":raiseValueError("Data ingestion is not supported with relationship vector index.")# If the vector index doesn't exist yetifnotindex_type:store.create_new_index()# If the index already exists, check if embedding dimensions matchelif(embedding_dimensionandnotstore.embedding_dimension==embedding_dimension):raiseValueError(f"Index with name {store.index_name} already exists.""The provided embedding function and vector index ""dimensions do not match.\n"f"Embedding function dimension: {store.embedding_dimension}\n"f"Vector index dimension: {embedding_dimension}")ifsearch_type==SearchType.HYBRID:fts_node_label=store.retrieve_existing_fts_index()# If the FTS index doesn't exist yetifnotfts_node_label:store.create_new_keyword_index()else:# Validate that FTS and Vector index use the same informationifnotfts_node_label==store.node_label:raiseValueError("Vector and keyword index don't index the same node label")# Create unique constraint for faster importifcreate_id_index:store.query("CREATE CONSTRAINT IF NOT EXISTS "f"FOR (n:`{store.node_label}`) REQUIRE n.id IS UNIQUE;")store.add_embeddings(texts=texts,embeddings=embeddings,metadatas=metadatas,ids=ids,**kwargs)returnstore
[docs]defadd_embeddings(self,texts:Iterable[str],embeddings:List[List[float]],metadatas:Optional[List[dict]]=None,ids:Optional[List[str]]=None,**kwargs:Any,)->List[str]:"""Add embeddings to the vectorstore. Args: texts: Iterable of strings to add to the vectorstore. embeddings: List of list of embedding vectors. metadatas: List of metadatas associated with the texts. kwargs: vectorstore specific parameters """ifidsisNone:ids=[md5(text.encode("utf-8")).hexdigest()fortextintexts]ifnotmetadatas:metadatas=[{}for_intexts]import_query=("UNWIND $data AS row ""CALL { WITH row "f"MERGE (c:`{self.node_label}` {{id: row.id}}) ""WITH c, row "f"CALL db.create.setNodeVectorProperty(c, "f"'{self.embedding_node_property}', row.embedding) "f"SET c.`{self.text_node_property}` = row.text ""SET c += row.metadata ""} IN TRANSACTIONS OF 1000 ROWS ")parameters={"data":[{"text":text,"metadata":metadata,"embedding":embedding,"id":id}fortext,metadata,embedding,idinzip(texts,metadatas,embeddings,ids)]}self.query(import_query,params=parameters)returnids
[docs]defadd_texts(self,texts:Iterable[str],metadatas:Optional[List[dict]]=None,ids:Optional[List[str]]=None,**kwargs:Any,)->List[str]:"""Run more texts through the embeddings and add to the vectorstore. Args: texts: Iterable of strings to add to the vectorstore. metadatas: Optional list of metadatas associated with the texts. kwargs: vectorstore specific parameters Returns: List of ids from adding the texts into the vectorstore. """embeddings=self.embedding.embed_documents(list(texts))returnself.add_embeddings(texts=texts,embeddings=embeddings,metadatas=metadatas,ids=ids,**kwargs)
[docs]defsimilarity_search(self,query:str,k:int=4,params:Dict[str,Any]={},filter:Optional[Dict[str,Any]]=None,**kwargs:Any,)->List[Document]:"""Run similarity search with Neo4jVector. Args: query (str): Query text to search for. k (int): Number of results to return. Defaults to 4. params (Dict[str, Any]): The search params for the index type. Defaults to empty dict. filter (Optional[Dict[str, Any]]): Dictionary of argument(s) to filter on metadata. Defaults to None. Returns: List of Documents most similar to the query. """embedding=self.embedding.embed_query(text=query)returnself.similarity_search_by_vector(embedding=embedding,k=k,query=query,params=params,filter=filter,**kwargs,)
[docs]defsimilarity_search_with_score(self,query:str,k:int=4,params:Dict[str,Any]={},filter:Optional[Dict[str,Any]]=None,**kwargs:Any,)->List[Tuple[Document,float]]:"""Return docs most similar to query. Args: query: Text to look up documents similar to. k: Number of Documents to return. Defaults to 4. params (Dict[str, Any]): The search params for the index type. Defaults to empty dict. filter (Optional[Dict[str, Any]]): Dictionary of argument(s) to filter on metadata. Defaults to None. Returns: List of Documents most similar to the query and score for each """embedding=self.embedding.embed_query(query)docs=self.similarity_search_with_score_by_vector(embedding=embedding,k=k,query=query,params=params,filter=filter,**kwargs,)returndocs
[docs]defsimilarity_search_with_score_by_vector(self,embedding:List[float],k:int=4,filter:Optional[Dict[str,Any]]=None,params:Dict[str,Any]={},**kwargs:Any,)->List[Tuple[Document,float]]:""" Perform a similarity search in the Neo4j database using a given vector and return the top k similar documents with their scores. This method uses a Cypher query to find the top k documents that are most similar to a given embedding. The similarity is measured using a vector index in the Neo4j database. The results are returned as a list of tuples, each containing a Document object and its similarity score. Args: embedding (List[float]): The embedding vector to compare against. k (int, optional): The number of top similar documents to retrieve. filter (Optional[Dict[str, Any]]): Dictionary of argument(s) to filter on metadata. Defaults to None. params (Dict[str, Any]): The search params for the index type. Defaults to empty dict. Returns: List[Tuple[Document, float]]: A list of tuples, each containing a Document object and its similarity score. """iffilter:# Verify that 5.18 or later is usedifnotself.support_metadata_filter:raiseValueError("Metadata filtering is only supported in ""Neo4j version 5.18 or greater")# Metadata filtering and hybrid doesn't workifself.search_type==SearchType.HYBRID:raiseValueError("Metadata filtering can't be use in combination with ""a hybrid search approach")parallel_query=("CYPHER runtime = parallel parallelRuntimeSupport=all "ifself._is_enterpriseelse"")base_index_query=parallel_query+(f"MATCH (n:`{self.node_label}`) WHERE "f"n.`{self.embedding_node_property}` IS NOT NULL AND "f"size(n.`{self.embedding_node_property}`) = "f"toInteger({self.embedding_dimension}) AND ")base_cosine_query=(" WITH n as node, vector.similarity.cosine("f"n.`{self.embedding_node_property}`, ""$embedding) AS score ORDER BY score DESC LIMIT toInteger($k) ")filter_snippets,filter_params=construct_metadata_filter(filter)index_query=base_index_query+filter_snippets+base_cosine_queryelse:index_query=_get_search_index_query(self.search_type,self._index_type)filter_params={}ifself._index_type==IndexType.RELATIONSHIP:ifkwargs.get("return_embeddings"):default_retrieval=(f"RETURN relationship.`{self.text_node_property}` AS text, score, "f"relationship {{.*, `{self.text_node_property}`: Null, "f"`{self.embedding_node_property}`: Null, id: Null, "f"_embedding_: relationship.`{self.embedding_node_property}`}} ""AS metadata")else:default_retrieval=(f"RETURN relationship.`{self.text_node_property}` AS text, score, "f"relationship {{.*, `{self.text_node_property}`: Null, "f"`{self.embedding_node_property}`: Null, id: Null }} AS metadata")else:ifkwargs.get("return_embeddings"):default_retrieval=(f"RETURN node.`{self.text_node_property}` AS text, score, "f"node {{.*, `{self.text_node_property}`: Null, "f"`{self.embedding_node_property}`: Null, id: Null, "f"_embedding_: node.`{self.embedding_node_property}`}} AS metadata")else:default_retrieval=(f"RETURN node.`{self.text_node_property}` AS text, score, "f"node {{.*, `{self.text_node_property}`: Null, "f"`{self.embedding_node_property}`: Null, id: Null }} AS metadata")retrieval_query=(self.retrieval_queryifself.retrieval_queryelsedefault_retrieval)read_query=index_query+retrieval_queryparameters={"index":self.index_name,"k":k,"embedding":embedding,"keyword_index":self.keyword_index_name,"query":remove_lucene_chars(kwargs["query"]),**params,**filter_params,}results=self.query(read_query,params=parameters)ifany(result["text"]isNoneforresultinresults):ifnotself.retrieval_query:raiseValueError(f"Make sure that none of the `{self.text_node_property}` "f"properties on nodes with label `{self.node_label}` ""are missing or empty")else:raiseValueError("Inspect the `retrieval_query` and ensure it doesn't ""return None for the `text` column")ifkwargs.get("return_embeddings")andany(result["metadata"]["_embedding_"]isNoneforresultinresults):ifnotself.retrieval_query:raiseValueError(f"Make sure that none of the `{self.embedding_node_property}` "f"properties on nodes with label `{self.node_label}` ""are missing or empty")else:raiseValueError("Inspect the `retrieval_query` and ensure it doesn't ""return None for the `_embedding_` metadata column")docs=[(Document(page_content=dict_to_yaml_str(result["text"])ifisinstance(result["text"],dict)elseresult["text"],metadata={k:vfork,vinresult["metadata"].items()ifvisnotNone},),result["score"],)forresultinresults]returndocs
[docs]defsimilarity_search_by_vector(self,embedding:List[float],k:int=4,filter:Optional[Dict[str,Any]]=None,params:Dict[str,Any]={},**kwargs:Any,)->List[Document]:"""Return docs most similar to embedding vector. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter (Optional[Dict[str, Any]]): Dictionary of argument(s) to filter on metadata. Defaults to None. params (Dict[str, Any]): The search params for the index type. Defaults to empty dict. Returns: List of Documents most similar to the query vector. """docs_and_scores=self.similarity_search_with_score_by_vector(embedding=embedding,k=k,filter=filter,params=params,**kwargs)return[docfordoc,_indocs_and_scores]
[docs]@classmethoddeffrom_texts(cls:Type[Neo4jVector],texts:List[str],embedding:Embeddings,metadatas:Optional[List[dict]]=None,distance_strategy:DistanceStrategy=DEFAULT_DISTANCE_STRATEGY,ids:Optional[List[str]]=None,**kwargs:Any,)->Neo4jVector:""" Return Neo4jVector initialized from texts and embeddings. Neo4j credentials are required in the form of `url`, `username`, and `password` and optional `database` parameters. """embeddings=embedding.embed_documents(list(texts))returncls.__from(texts,embeddings,embedding,metadatas=metadatas,ids=ids,distance_strategy=distance_strategy,**kwargs,)
[docs]@classmethoddeffrom_embeddings(cls,text_embeddings:List[Tuple[str,List[float]]],embedding:Embeddings,metadatas:Optional[List[dict]]=None,distance_strategy:DistanceStrategy=DEFAULT_DISTANCE_STRATEGY,ids:Optional[List[str]]=None,pre_delete_collection:bool=False,**kwargs:Any,)->Neo4jVector:"""Construct Neo4jVector wrapper from raw documents and pre- generated embeddings. Return Neo4jVector initialized from documents and embeddings. Neo4j credentials are required in the form of `url`, `username`, and `password` and optional `database` parameters. Example: .. code-block:: python from langchain_community.vectorstores.neo4j_vector import Neo4jVector from langchain_community.embeddings import OpenAIEmbeddings embeddings = OpenAIEmbeddings() text_embeddings = embeddings.embed_documents(texts) text_embedding_pairs = list(zip(texts, text_embeddings)) vectorstore = Neo4jVector.from_embeddings( text_embedding_pairs, embeddings) """texts=[t[0]fortintext_embeddings]embeddings=[t[1]fortintext_embeddings]returncls.__from(texts,embeddings,embedding,metadatas=metadatas,ids=ids,distance_strategy=distance_strategy,pre_delete_collection=pre_delete_collection,**kwargs,)
[docs]@classmethoddeffrom_existing_index(cls:Type[Neo4jVector],embedding:Embeddings,index_name:str,search_type:SearchType=DEFAULT_SEARCH_TYPE,keyword_index_name:Optional[str]=None,**kwargs:Any,)->Neo4jVector:""" Get instance of an existing Neo4j vector index. This method will return the instance of the store without inserting any new embeddings. Neo4j credentials are required in the form of `url`, `username`, and `password` and optional `database` parameters along with the `index_name` definition. """ifsearch_type==SearchType.HYBRIDandnotkeyword_index_name:raiseValueError("keyword_index name has to be specified ""when using hybrid search option")store=cls(embedding=embedding,index_name=index_name,keyword_index_name=keyword_index_name,search_type=search_type,**kwargs,)embedding_dimension,index_type=store.retrieve_existing_index()# Raise error if relationship index typeifindex_type=="RELATIONSHIP":raiseValueError("Relationship vector index is not supported with ""`from_existing_index` method. Please use the ""`from_existing_relationship_index` method.")ifnotindex_type:raiseValueError("The specified vector index name does not exist. ""Make sure to check if you spelled it correctly")# Check if embedding function and vector index dimensions matchifembedding_dimensionandnotstore.embedding_dimension==embedding_dimension:raiseValueError("The provided embedding function and vector index ""dimensions do not match.\n"f"Embedding function dimension: {store.embedding_dimension}\n"f"Vector index dimension: {embedding_dimension}")ifsearch_type==SearchType.HYBRID:fts_node_label=store.retrieve_existing_fts_index()# If the FTS index doesn't exist yetifnotfts_node_label:raiseValueError("The specified keyword index name does not exist. ""Make sure to check if you spelled it correctly")else:# Validate that FTS and Vector index use the same informationifnotfts_node_label==store.node_label:raiseValueError("Vector and keyword index don't index the same node label")returnstore
[docs]@classmethoddeffrom_existing_relationship_index(cls:Type[Neo4jVector],embedding:Embeddings,index_name:str,search_type:SearchType=DEFAULT_SEARCH_TYPE,**kwargs:Any,)->Neo4jVector:""" Get instance of an existing Neo4j relationship vector index. This method will return the instance of the store without inserting any new embeddings. Neo4j credentials are required in the form of `url`, `username`, and `password` and optional `database` parameters along with the `index_name` definition. """ifsearch_type==SearchType.HYBRID:raiseValueError("Hybrid search is not supported in combination ""with relationship vector index")store=cls(embedding=embedding,index_name=index_name,**kwargs,)embedding_dimension,index_type=store.retrieve_existing_index()ifnotindex_type:raiseValueError("The specified vector index name does not exist. ""Make sure to check if you spelled it correctly")# Raise error if relationship index typeifindex_type=="NODE":raiseValueError("Node vector index is not supported with ""`from_existing_relationship_index` method. Please use the ""`from_existing_index` method.")# Check if embedding function and vector index dimensions matchifembedding_dimensionandnotstore.embedding_dimension==embedding_dimension:raiseValueError("The provided embedding function and vector index ""dimensions do not match.\n"f"Embedding function dimension: {store.embedding_dimension}\n"f"Vector index dimension: {embedding_dimension}")returnstore
[docs]@classmethoddeffrom_documents(cls:Type[Neo4jVector],documents:List[Document],embedding:Embeddings,distance_strategy:DistanceStrategy=DEFAULT_DISTANCE_STRATEGY,ids:Optional[List[str]]=None,**kwargs:Any,)->Neo4jVector:""" Return Neo4jVector initialized from documents and embeddings. Neo4j credentials are required in the form of `url`, `username`, and `password` and optional `database` parameters. """texts=[d.page_contentfordindocuments]metadatas=[d.metadatafordindocuments]returncls.from_texts(texts=texts,embedding=embedding,distance_strategy=distance_strategy,metadatas=metadatas,ids=ids,**kwargs,)
[docs]@classmethoddeffrom_existing_graph(cls:Type[Neo4jVector],embedding:Embeddings,node_label:str,embedding_node_property:str,text_node_properties:List[str],*,keyword_index_name:Optional[str]="keyword",index_name:str="vector",search_type:SearchType=DEFAULT_SEARCH_TYPE,retrieval_query:str="",**kwargs:Any,)->Neo4jVector:""" Initialize and return a Neo4jVector instance from an existing graph. This method initializes a Neo4jVector instance using the provided parameters and the existing graph. It validates the existence of the indices and creates new ones if they don't exist. Returns: Neo4jVector: An instance of Neo4jVector initialized with the provided parameters and existing graph. Example: >>> neo4j_vector = Neo4jVector.from_existing_graph( ... embedding=my_embedding, ... node_label="Document", ... embedding_node_property="embedding", ... text_node_properties=["title", "content"] ... ) Note: Neo4j credentials are required in the form of `url`, `username`, and `password`, and optional `database` parameters passed as additional keyword arguments. """# Validate the list is not emptyifnottext_node_properties:raiseValueError("Parameter `text_node_properties` must not be an empty list")# Prefer retrieval query from params, otherwise construct itifnotretrieval_query:retrieval_query=(f"RETURN reduce(str='', k IN {text_node_properties} |"" str + '\\n' + k + ': ' + coalesce(node[k], '')) AS text, ""node {.*, `"+embedding_node_property+"`: Null, id: Null, "+", ".join([f"`{prop}`: Null"forpropintext_node_properties])+"} AS metadata, score")store=cls(embedding=embedding,index_name=index_name,keyword_index_name=keyword_index_name,search_type=search_type,retrieval_query=retrieval_query,node_label=node_label,embedding_node_property=embedding_node_property,**kwargs,)# Check if the vector index already existsembedding_dimension,index_type=store.retrieve_existing_index()# Raise error if relationship index typeifindex_type=="RELATIONSHIP":raiseValueError("`from_existing_graph` method does not support "" existing relationship vector index. ""Please use `from_existing_relationship_index` method")# If the vector index doesn't exist yetifnotindex_type:store.create_new_index()# If the index already exists, check if embedding dimensions matchelif(embedding_dimensionandnotstore.embedding_dimension==embedding_dimension):raiseValueError(f"Index with name {store.index_name} already exists.""The provided embedding function and vector index ""dimensions do not match.\n"f"Embedding function dimension: {store.embedding_dimension}\n"f"Vector index dimension: {embedding_dimension}")# FTS index for Hybrid searchifsearch_type==SearchType.HYBRID:fts_node_label=store.retrieve_existing_fts_index(text_node_properties)# If the FTS index doesn't exist yetifnotfts_node_label:store.create_new_keyword_index(text_node_properties)else:# Validate that FTS and Vector index use the same informationifnotfts_node_label==store.node_label:raiseValueError("Vector and keyword index don't index the same node label")# Populate embeddingswhileTrue:fetch_query=(f"MATCH (n:`{node_label}`) "f"WHERE n.{embedding_node_property} IS null ""AND any(k in $props WHERE n[k] IS NOT null) "f"RETURN elementId(n) AS id, reduce(str='',""k IN $props | str + '\\n' + k + ':' + coalesce(n[k], '')) AS text ""LIMIT 1000")data=store.query(fetch_query,params={"props":text_node_properties})ifnotdata:breaktext_embeddings=embedding.embed_documents([el["text"]forelindata])params={"data":[{"id":el["id"],"embedding":embedding}forel,embeddinginzip(data,text_embeddings)]}store.query("UNWIND $data AS row "f"MATCH (n:`{node_label}`) ""WHERE elementId(n) = row.id "f"CALL db.create.setNodeVectorProperty(n, "f"'{embedding_node_property}', row.embedding) ""RETURN count(*)",params=params,)# If embedding calculation should be stoppediflen(data)<1000:breakreturnstore
[docs]defmax_marginal_relevance_search(self,query:str,k:int=4,fetch_k:int=20,lambda_mult:float=0.5,filter:Optional[dict]=None,**kwargs:Any,)->List[Document]:"""Return docs selected using the maximal marginal relevance. Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents. Args: query: search query text. k: Number of Documents to return. Defaults to 4. fetch_k: Number of Documents to fetch to pass to MMR algorithm. lambda_mult: Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. filter: Filter on metadata properties, e.g. { "str_property": "foo", "int_property": 123 } Returns: List of Documents selected by maximal marginal relevance. """# Embed the queryquery_embedding=self.embedding.embed_query(query)# Fetch the initial documentsgot_docs=self.similarity_search_with_score_by_vector(embedding=query_embedding,query=query,k=fetch_k,return_embeddings=True,filter=filter,**kwargs,)# Get the embeddings for the fetched documentsgot_embeddings=[doc.metadata["_embedding_"]fordoc,_ingot_docs]# Select documents using maximal marginal relevanceselected_indices=maximal_marginal_relevance(np.array(query_embedding),got_embeddings,lambda_mult=lambda_mult,k=k)selected_docs=[got_docs[i][0]foriinselected_indices]# Remove embedding values from metadatafordocinselected_docs:deldoc.metadata["_embedding_"]returnselected_docs
def_select_relevance_score_fn(self)->Callable[[float],float]:""" The 'correct' relevance function may differ depending on a few things, including: - the distance / similarity metric used by the VectorStore - the scale of your embeddings (OpenAI's are unit normed. Many others are not!) - embedding dimensionality - etc. """ifself.override_relevance_score_fnisnotNone:returnself.override_relevance_score_fn# Default strategy is to rely on distance strategy provided# in vectorstore constructorifself._distance_strategy==DistanceStrategy.COSINE:returnlambdax:xelifself._distance_strategy==DistanceStrategy.EUCLIDEAN_DISTANCE:returnlambdax:xelse:raiseValueError("No supported normalization function"f" for distance_strategy of {self._distance_strategy}.""Consider providing relevance_score_fn to PGVector constructor.")