Source code for langchain_community.graphs.nebula_graph
importloggingfromstringimportTemplatefromtypingimportAny,Dict,Optionallogger=logging.getLogger(__name__)rel_query=Template("""MATCH ()-[e:`$edge_type`]->() WITH e limit 1MATCH (m)-[:`$edge_type`]->(n) WHERE id(m) == src(e) AND id(n) == dst(e)RETURN "(:" + tags(m)[0] + ")-[:$edge_type]->(:" + tags(n)[0] + ")" AS rels""")RETRY_TIMES=3
[docs]classNebulaGraph:"""NebulaGraph wrapper for graph operations. NebulaGraph inherits methods from Neo4jGraph to bring ease to the user space. *Security note*: Make sure that the database connection uses credentials that are narrowly-scoped to only include necessary permissions. Failure to do so may result in data corruption or loss, since the calling code may attempt commands that would result in deletion, mutation of data if appropriately prompted or reading sensitive data if such data is present in the database. The best way to guard against such negative outcomes is to (as appropriate) limit the permissions granted to the credentials used with this tool. See https://python.langchain.com/docs/security for more information. """
[docs]def__init__(self,space:str,username:str="root",password:str="nebula",address:str="127.0.0.1",port:int=9669,session_pool_size:int=30,)->None:"""Create a new NebulaGraph wrapper instance."""try:importnebula3# noqa: F401importpandas# noqa: F401exceptImportError:raiseImportError("Please install NebulaGraph Python client and pandas first: ""`pip install nebula3-python pandas`")self.username=usernameself.password=passwordself.address=addressself.port=portself.space=spaceself.session_pool_size=session_pool_sizeself.session_pool=self._get_session_pool()self.schema=""# Set schematry:self.refresh_schema()exceptExceptionase:raiseValueError(f"Could not refresh schema. Error: {e}")
def_get_session_pool(self)->Any:assertall([self.username,self.password,self.address,self.port,self.space,]),("Please provide all of the following parameters: ""username, password, address, port, space")fromnebula3.ConfigimportSessionPoolConfigfromnebula3.ExceptionimportAuthFailedException,InValidHostnamefromnebula3.gclient.net.SessionPoolimportSessionPoolconfig=SessionPoolConfig()config.max_size=self.session_pool_sizetry:session_pool=SessionPool(self.username,self.password,self.space,[(self.address,self.port)],)exceptInValidHostname:raiseValueError("Could not connect to NebulaGraph database. ""Please ensure that the address and port are correct")try:session_pool.init(config)exceptAuthFailedException:raiseValueError("Could not connect to NebulaGraph database. ""Please ensure that the username and password are correct")exceptRuntimeErrorase:raiseValueError(f"Error initializing session pool. Error: {e}")returnsession_pooldef__del__(self)->None:try:self.session_pool.close()exceptExceptionase:logger.warning(f"Could not close session pool. Error: {e}")@propertydefget_schema(self)->str:"""Returns the schema of the NebulaGraph database"""returnself.schema
[docs]defexecute(self,query:str,params:Optional[dict]=None,retry:int=0)->Any:"""Query NebulaGraph database."""fromnebula3.ExceptionimportIOErrorException,NoValidSessionExceptionfromnebula3.fbthrift.transport.TTransportimportTTransportExceptionparams=paramsor{}try:result=self.session_pool.execute_parameter(query,params)ifnotresult.is_succeeded():logger.warning(f"Error executing query to NebulaGraph. "f"Error: {result.error_msg()}\n"f"Query: {query}\n")returnresultexceptNoValidSessionException:logger.warning(f"No valid session found in session pool. "f"Please consider increasing the session pool size. "f"Current size: {self.session_pool_size}")raiseValueError(f"No valid session found in session pool. "f"Please consider increasing the session pool size. "f"Current size: {self.session_pool_size}")exceptRuntimeErrorase:ifretry<RETRY_TIMES:retry+=1logger.warning(f"Error executing query to NebulaGraph. "f"Retrying ({retry}/{RETRY_TIMES})...\n"f"query: {query}\n"f"Error: {e}")returnself.execute(query,params,retry)else:raiseValueError(f"Error executing query to NebulaGraph. Error: {e}")except(TTransportException,IOErrorException):# connection issue, try to recreate session poolifretry<RETRY_TIMES:retry+=1logger.warning(f"Connection issue with NebulaGraph. "f"Retrying ({retry}/{RETRY_TIMES})...\n to recreate session pool")self.session_pool=self._get_session_pool()returnself.execute(query,params,retry)