Source code for langchain_mongodb.agent_toolkit.database
"""Wrapper around a MongoDB database."""from__future__importannotationsimportjsonfromdatetimeimportdate,datetimefromimportlib.metadataimportversionfromtypingimportAny,Dict,Iterable,List,Optional,UnionfrombsonimportObjectIdfrombson.binaryimportBinaryfrombson.decimal128importDecimal128frombson.json_utilimportdumpsfrompymongoimportMongoClientfrompymongo.cursorimportCursorfrompymongo.driver_infoimportDriverInfofrompymongo.errorsimportPyMongoErrorNUM_DOCUMENTS_TO_SAMPLE=4MAX_STRING_LENGTH_OF_SAMPLE_DOCUMENT_VALUE=20_BSON_LOOKUP={str:"String",int:"Number",float:"Number",bool:"Boolean",ObjectId:"ObjectId",date:"Date",datetime:"Timestamp",None:"Null",Decimal128:"Decimal128",Binary:"Binary",}
[docs]classMongoDBDatabase:"""Wrapper around a MongoDB database."""
[docs]def__init__(self,client:MongoClient,database:str,schema:Optional[str]=None,ignore_collections:Optional[List[str]]=None,include_collections:Optional[List[str]]=None,sample_docs_in_collection_info:int=3,indexes_in_collection_info:bool=False,):"""Create a MongoDBDatabase from client and database name."""self._client=clientself._db=client[database]self._schema=schemaifinclude_collectionsandignore_collections:raiseValueError("Cannot specify both include_collections and ignore_collections")self._include_colls=set(include_collectionsor[])self._ignore_colls=set(ignore_collectionsor[])self._all_colls=set(self._db.list_collection_names())self._sample_docs_in_coll_info=sample_docs_in_collection_infoself._indexes_in_coll_info=indexes_in_collection_info
[docs]@classmethoddeffrom_connection_string(cls,connection_string:str,database:Optional[str]=None,**kwargs:Any,)->MongoDBDatabase:"""Construct a MongoDBDatabase from URI."""client=MongoClient(connection_string,driver=DriverInfo(name="Langchain",version=version("langchain-mongodb")),)database=databaseorclient.get_default_database().namereturncls(client,database,**kwargs)
[docs]defclose(self)->None:"""Close the resources used by the MongoDBDatabase."""self._client.close()
[docs]defget_usable_collection_names(self)->Iterable[str]:"""Get names of collections available."""ifself._include_colls:returnsorted(self._include_colls)returnsorted(self._all_colls-self._ignore_colls)
@propertydefcollection_info(self)->str:"""Information about all collections in the database."""returnself.get_collection_info()
[docs]defget_collection_info(self,collection_names:Optional[List[str]]=None)->str:"""Get information about specified collections. Follows best practices as specified in: Rajkumar et al, 2022 (https://arxiv.org/abs/2204.00498) If `sample_rows_in_collection_info`, the specified number of sample rows will be appended to each collection description. This can increase performance as demonstrated in the paper. """all_coll_names=self.get_usable_collection_names()ifcollection_namesisnotNone:missing_collections=set(collection_names).difference(all_coll_names)ifmissing_collections:raiseValueError(f"collection_names {missing_collections} not found in database")all_coll_names=collection_namescolls=[]forcollinall_coll_names:# add schemaschema=self._get_collection_schema(coll)coll_info=f"Database name: {self._db.name}\n"coll_info+=f"Collection name: {coll}\n"coll_info+=f"Schema from a sample of documents from the collection:\n{schema.rstrip()}"has_extra_info=(self._indexes_in_coll_infoorself._sample_docs_in_coll_info)ifhas_extra_info:coll_info+="\n\n/*"ifself._indexes_in_coll_info:coll_info+=f"\n{self._get_collection_indexes(coll)}\n"ifself._sample_docs_in_coll_info:coll_info+=f"\n{self._get_sample_docs(coll)}\n"ifhas_extra_info:coll_info+="*/"colls.append(coll_info)colls.sort()final_str="\n\n".join(colls)returnfinal_str
def_get_collection_schema(self,collection:str):coll=self._db[collection]doc=coll.find_one({})return"\n".join(self._parse_doc(doc,""))def_parse_doc(self,doc,prefix):sub_schema=[]forkey,valueindoc.items():ifprefix:full_key=f"{prefix}.{key}"else:full_key=keyifisinstance(value,dict):sub_schema.extend(self._parse_doc(value,full_key))elifisinstance(value,list):ifnotlen(value):sub_schema.append(f"{full_key}: Array")elifisinstance(value[0],dict):sub_schema.extend(self._parse_doc(value[0],f"{full_key}[]"))else:iftype(value[0])in_BSON_LOOKUP:type_name=_BSON_LOOKUP[type(value[0])]sub_schema.append(f"{full_key}: Array<{type_name}>")else:sub_schema.append(f"{full_key}: Array")eliftype(value)in_BSON_LOOKUP:type_name=_BSON_LOOKUP[type(value)]sub_schema.append(f"{full_key}: {type_name}")ifnotsub_schema:sub_schema.append(f"{prefix}: Document")returnsub_schemadef_get_collection_indexes(self,collection:str)->str:coll=self._db[collection]indexes=list(coll.list_indexes())ifnotindexes:return""indexes=self._inspector.get_indexes(collection.name)returnf"Collection Indexes:\n{json.dumps(indexes,indent=2)}"def_get_sample_docs(self,collection:str)->str:col=self._db[collection]docs=list(col.find({},limit=self._sample_docs_in_coll_info))fordocindocs:self._elide_doc(doc)return(f"{self._sample_docs_in_coll_info} documents from {collection} collection:\n"f"{dumps(docs,indent=2)}")def_elide_doc(self,doc):forkey,valueindoc.items():ifisinstance(value,dict):self._elide_doc(value)elifisinstance(value,list):items=[]foriteminvalue:ifisinstance(item,dict):self._elide_doc(item)elif(isinstance(item,str)andlen(item)>MAX_STRING_LENGTH_OF_SAMPLE_DOCUMENT_VALUE):item=item[:MAX_STRING_LENGTH_OF_SAMPLE_DOCUMENT_VALUE+1]items.append(item)doc[key]=itemselif(isinstance(value,str)andlen(value)>MAX_STRING_LENGTH_OF_SAMPLE_DOCUMENT_VALUE):doc[key]=value[:MAX_STRING_LENGTH_OF_SAMPLE_DOCUMENT_VALUE+1]def_parse_command(self,command:str)->Any:# Convert a JavaScript command to a python object.command=command.strip().replace("\n","").replace(" ","")# Handle missing closing parens.ifcommand.endswith("]"):command+=")"agg_command=command[command.index("["):-1]try:returnjson.loads(agg_command)exceptExceptionase:raiseValueError(f"Cannot execute command {command}")frome
[docs]defrun(self,command:str)->Union[str,Cursor]:"""Execute a MongoDB aggregation command and return a string representing the results. If the statement returns documents, a string of the results is returned. If the statement returns no documents, an empty string is returned. The command MUST be of the form: `db.collectionName.aggregate(...)`. """ifnotcommand.startswith("db."):raiseValueError(f"Cannot run command {command}")col_name=command.split(".")[1]ifcol_namenotinself.get_usable_collection_names():raiseValueError(f"Collection {col_name} does not exist!")coll=self._db[col_name]if".aggregate("notincommand:raiseValueError(f"Cannot execute command {command}")agg=self._parse_command(command)returndumps(list(coll.aggregate(agg)),indent=2)
[docs]defget_collection_info_no_throw(self,collection_names:Optional[List[str]]=None)->str:"""Get information about specified collections. Follows best practices as specified in: Rajkumar et al, 2022 (https://arxiv.org/abs/2204.00498) If `sample_rows_in_collection_info`, the specified number of sample rows will be appended to each collection description. This can increase performance as demonstrated in the paper. """try:returnself.get_collection_info(collection_names)exceptValueErrorase:"""Format the error message"""raiseereturnf"Error: {e}"
[docs]defrun_no_throw(self,command:str)->Union[str,Cursor]:"""Execute a MongoDB command and return a string representing the results. If the statement returns rows, a string of the results is returned. If the statement returns no rows, an empty string is returned. If the statement throws an error, the error message is returned. """try:returnself.run(command)exceptPyMongoErrorase:"""Format the error message"""returnf"Error: {e}"
[docs]defget_context(self)->Dict[str,Any]:"""Return db context that you may want in agent prompt."""collection_names=list(self.get_usable_collection_names())collection_info=self.get_collection_info_no_throw()return{"collection_info":collection_info,"collection_names":", ".join(collection_names),}