[docs]def__init__(self,filter_criteria:Optional[Dict]=None,**kwargs:Any,)->None:try:fromsurrealdbimportSurrealexceptImportErrorase:raiseImportError("""Cannot import from surrealdb. please install with `pip install surrealdb`.""")fromeself.dburl=kwargs.pop("dburl","ws://localhost:8000/rpc")ifself.dburl[0:2]=="ws":self.sdb=Surreal(self.dburl)else:raiseValueError("Only websocket connections are supported at this time.")self.filter_criteria=filter_criteriaor{}if"table"inself.filter_criteria:raiseValueError("key `table` is not a valid criteria for `filter_criteria` argument.")self.ns=kwargs.pop("ns","langchain")self.db=kwargs.pop("db","database")self.table=kwargs.pop("table","documents")self.sdb=Surreal(self.dburl)self.kwargs=kwargs
[docs]asyncdefinitialize(self)->None:""" Initialize connection to surrealdb database and authenticate if credentials are provided """awaitself.sdb.connect()if"db_user"inself.kwargsand"db_pass"inself.kwargs:user=self.kwargs.get("db_user")password=self.kwargs.get("db_pass")awaitself.sdb.signin({"user":user,"pass":password})awaitself.sdb.use(self.ns,self.db)
[docs]asyncdefaload(self)->List[Document]:"""Load data into Document objects."""query="SELECT * FROM type::table($table)"ifself.filter_criteriaisnotNoneandlen(self.filter_criteria)>0:query+=" WHERE "foridx,keyinenumerate(self.filter_criteria):query+=f""" {"AND"ifidx>0else""}{key} = ${key}"""metadata={"ns":self.ns,"db":self.db,"table":self.table,}results=awaitself.sdb.query(query,{"table":self.table,**self.filter_criteria})return[(Document(page_content=json.dumps(result),metadata={"id":result["id"],**result["metadata"],**metadata},))forresultinresults[0]["result"]]