[docs]classNeo4jGraph(GraphStore):"""Neo4j database wrapper for various graph operations. Parameters: url (Optional[str]): The URL of the Neo4j database server. username (Optional[str]): The username for database authentication. password (Optional[str]): The password for database authentication. database (str): The name of the database to connect to. Default is 'neo4j'. timeout (Optional[float]): The timeout for transactions in seconds. Useful for terminating long-running queries. By default, there is no timeout set. sanitize (bool): A flag to indicate whether to remove lists with more than 128 elements from results. Useful for removing embedding-like properties from database responses. Default is False. refresh_schema (bool): A flag whether to refresh schema information at initialization. Default is True. enhanced_schema (bool): A flag whether to scan the database for example values and use them in the graph schema. Default is False. driver_config (Dict): Configuration passed to Neo4j Driver. *Security note*: Make sure that the database connection uses credentials that are narrowly-scoped to only include necessary permissions. Failure to do so may result in data corruption or loss, since the calling code may attempt commands that would result in deletion, mutation of data if appropriately prompted or reading sensitive data if such data is present in the database. The best way to guard against such negative outcomes is to (as appropriate) limit the permissions granted to the credentials used with this tool. See https://python.langchain.com/docs/security for more information. """
[docs]def__init__(self,url:Optional[str]=None,username:Optional[str]=None,password:Optional[str]=None,database:Optional[str]=None,timeout:Optional[float]=None,sanitize:bool=False,refresh_schema:bool=True,*,driver_config:Optional[Dict]=None,enhanced_schema:bool=False,)->None:"""Create a new Neo4j graph wrapper instance."""url=get_from_dict_or_env({"url":url},"url","NEO4J_URI")# if username and password are "", assume Neo4j auth is disabledifusername==""andpassword=="":auth=Noneelse:username=get_from_dict_or_env({"username":username},"username","NEO4J_USERNAME",)password=get_from_dict_or_env({"password":password},"password","NEO4J_PASSWORD",)auth=(username,password)database=get_from_dict_or_env({"database":database},"database","NEO4J_DATABASE","neo4j")self._driver=neo4j.GraphDatabase.driver(url,auth=auth,**(driver_configor{}))self._database=databaseself.timeout=timeoutself.sanitize=sanitizeself._enhanced_schema=enhanced_schemaself.schema:str=""self.structured_schema:Dict[str,Any]={}# Verify connectiontry:self._driver.verify_connectivity()exceptneo4j.exceptions.ConfigurationError:raiseValueError("Could not connect to Neo4j database. ""Please ensure that the driver config is correct")exceptneo4j.exceptions.ServiceUnavailable:raiseValueError("Could not connect to Neo4j database. ""Please ensure that the url is correct")exceptneo4j.exceptions.AuthError:raiseValueError("Could not connect to Neo4j database. ""Please ensure that the username and password are correct")# Set schemaifrefresh_schema:try:self.refresh_schema()exceptneo4j.exceptions.ClientErrorase:ife.code=="Neo.ClientError.Procedure.ProcedureNotFound":raiseValueError("Could not use APOC procedures. ""Please ensure the APOC plugin is installed in Neo4j and that ""'apoc.meta.data()' is allowed in Neo4j configuration ")raisee
def_check_driver_state(self)->None:""" Check if the driver is available and ready for operations. Raises: RuntimeError: If the driver has been closed or is not initialized. """ifnothasattr(self,"_driver"):raiseRuntimeError("Cannot perform operations - Neo4j connection has been closed")@propertydefget_schema(self)->str:"""Returns the schema of the Graph"""returnself.schema@propertydefget_structured_schema(self)->Dict[str,Any]:"""Returns the structured schema of the Graph"""returnself.structured_schema
[docs]defquery(self,query:str,params:dict={},session_params:dict={},)->List[Dict[str,Any]]:"""Query Neo4j database. Args: query (str): The Cypher query to execute. params (dict): The parameters to pass to the query. session_params (dict): Parameters to pass to the session used for executing the query. Returns: List[Dict[str, Any]]: The list of dictionaries containing the query results. Raises: RuntimeError: If the connection has been closed. """self._check_driver_state()fromneo4jimportQueryfromneo4j.exceptionsimportNeo4jErrorifnotsession_params:try:data,_,_=self._driver.execute_query(Query(text=query,timeout=self.timeout),database_=self._database,parameters_=params,)json_data=[r.data()forrindata]ifself.sanitize:json_data=[_value_sanitize(el)forelinjson_data]returnjson_dataexceptNeo4jErrorase:ifnot(((# isCallInTransactionErrore.code=="Neo.DatabaseError.Statement.ExecutionFailed"ore.code=="Neo.DatabaseError.Transaction.TransactionStartFailed")ande.messageisnotNoneand"in an implicit transaction"ine.message)or(# isPeriodicCommitErrore.code=="Neo.ClientError.Statement.SemanticError"ande.messageisnotNoneand("in an open transaction is not possible"ine.messageor"tried to execute in an explicit transaction"ine.message))):raise# fallback to allow implicit transactionssession_params.setdefault("database",self._database)withself._driver.session(**session_params)assession:result=session.run(Query(text=query,timeout=self.timeout),params)json_data=[r.data()forrinresult]ifself.sanitize:json_data=[_value_sanitize(el)forelinjson_data]returnjson_data
[docs]defrefresh_schema(self)->None:""" Refreshes the Neo4j graph schema information. Raises: RuntimeError: If the connection has been closed. """self._check_driver_state()self.structured_schema=get_structured_schema(driver=self._driver,is_enhanced=self._enhanced_schema,database=self._database,timeout=self.timeout,sanitize=self.sanitize,)self.schema=format_schema(schema=self.structured_schema,is_enhanced=self._enhanced_schema)
[docs]defadd_graph_documents(self,graph_documents:List[GraphDocument],include_source:bool=False,baseEntityLabel:bool=False,)->None:""" This method constructs nodes and relationships in the graph based on the provided GraphDocument objects. Parameters: - graph_documents (List[GraphDocument]): A list of GraphDocument objects that contain the nodes and relationships to be added to the graph. Each GraphDocument should encapsulate the structure of part of the graph, including nodes, relationships, and optionally the source document information. - include_source (bool, optional): If True, stores the source document and links it to nodes in the graph using the MENTIONS relationship. This is useful for tracing back the origin of data. Merges source documents based on the `id` property from the source document metadata if available; otherwise it calculates the MD5 hash of `page_content` for merging process. Defaults to False. - baseEntityLabel (bool, optional): If True, each newly created node gets a secondary __Entity__ label, which is indexed and improves import speed and performance. Defaults to False. Raises: RuntimeError: If the connection has been closed. """self._check_driver_state()ifbaseEntityLabel:# Check if constraint already existsconstraint_exists=any([el["labelsOrTypes"]==[BASE_ENTITY_LABEL]andel["properties"]==["id"]forelinself.structured_schema.get("metadata",{}).get("constraint",[])])ifnotconstraint_exists:# Create constraintself.query(f"CREATE CONSTRAINT IF NOT EXISTS FOR (b:{BASE_ENTITY_LABEL}) ""REQUIRE b.id IS UNIQUE;")self.refresh_schema()# Refresh constraint information# Check each graph_document has a source when include_source is trueifinclude_source:fordocingraph_documents:ifdoc.sourceisNone:raiseTypeError("include_source is set to True, ""but at least one document has no `source`.")node_import_query=_get_node_import_query(baseEntityLabel,include_source)rel_import_query=_get_rel_import_query(baseEntityLabel)fordocumentingraph_documents:node_import_query_params:dict[str,Any]={"data":[el.__dict__forelindocument.nodes]}ifinclude_sourceanddocument.source:ifnotdocument.source.metadata.get("id"):document.source.metadata["id"]=md5(document.source.page_content.encode("utf-8")).hexdigest()node_import_query_params["document"]=document.source.__dict__# Remove backticks from node typesfornodeindocument.nodes:node.type=_remove_backticks(node.type)# Import nodesself.query(node_import_query,node_import_query_params)# Import relationshipsself.query(rel_import_query,{"data":[{"source":el.source.id,"source_label":_remove_backticks(el.source.type),"target":el.target.id,"target_label":_remove_backticks(el.target.type),"type":_remove_backticks(el.type.replace(" ","_").upper()),"properties":el.properties,}forelindocument.relationships]},)
[docs]defclose(self)->None:""" Explicitly close the Neo4j driver connection. Delegates connection management to the Neo4j driver. """ifhasattr(self,"_driver"):self._driver.close()# Remove the driver attribute to indicate closuredelattr(self,"_driver")
def__enter__(self)->"Neo4jGraph":""" Enter the runtime context for the Neo4j graph connection. Enables use of the graph connection with the 'with' statement. This method allows for automatic resource management and ensures that the connection is properly handled. Returns: Neo4jGraph: The current graph connection instance Example: with Neo4jGraph(...) as graph: graph.query(...) # Connection automatically managed """returnselfdef__exit__(self,exc_type:Optional[Type[BaseException]],exc_val:Optional[BaseException],exc_tb:Optional[Any],)->None:""" Exit the runtime context for the Neo4j graph connection. This method is automatically called when exiting a 'with' statement. It ensures that the database connection is closed, regardless of whether an exception occurred during the context's execution. Args: exc_type: The type of exception that caused the context to exit (None if no exception occurred) exc_val: The exception instance that caused the context to exit (None if no exception occurred) exc_tb: The traceback for the exception (None if no exception occurred) Note: Any exception is re-raised after the connection is closed. """self.close()def__del__(self)->None:""" Destructor for the Neo4j graph connection. This method is called during garbage collection to ensure that database resources are released if not explicitly closed. Caution: - Do not rely on this method for deterministic resource cleanup - Always prefer explicit .close() or context manager Best practices: 1. Use context manager: with Neo4jGraph(...) as graph: ... 2. Explicitly close: graph = Neo4jGraph(...) try: ... finally: graph.close() """try:self.close()exceptException:# Suppress any exceptions during garbage collectionpass