[docs]def__init__(self,connection_string:str,db_name:str,collection_name:str,*,filter_criteria:Optional[Dict]=None,field_names:Optional[Sequence[str]]=None,)->None:try:frommotor.motor_asyncioimportAsyncIOMotorClientexceptImportErrorase:raiseImportError("Cannot import from motor, please install with `pip install motor`.")fromeifnotconnection_string:raiseValueError("connection_string must be provided.")ifnotdb_name:raiseValueError("db_name must be provided.")ifnotcollection_name:raiseValueError("collection_name must be provided.")self.client=AsyncIOMotorClient(connection_string)self.db_name=db_nameself.collection_name=collection_nameself.field_names=field_namesself.filter_criteria=filter_criteriaor{}self.db=self.client.get_database(db_name)self.collection=self.db.get_collection(collection_name)
[docs]defload(self)->List[Document]:"""Load data into Document objects. Attention: This implementation starts an asyncio event loop which will only work if running in a sync env. In an async env, it should fail since there is already an event loop running. This code should be updated to kick off the event loop from a separate thread if running within an async context. """returnasyncio.run(self.aload())
[docs]asyncdefaload(self)->List[Document]:"""Load data into Document objects."""result=[]total_docs=awaitself.collection.count_documents(self.filter_criteria)# Construct the projection dictionary if field_names are specifiedprojection=({field:1forfieldinself.field_names}ifself.field_nameselseNone)asyncfordocinself.collection.find(self.filter_criteria,projection):metadata={"database":self.db_name,"collection":self.collection_name,}# Extract text content from filtered fields or use the entire documentifself.field_namesisnotNone:fields={}fornameinself.field_names:# Split the field names to handle nested fieldskeys=name.split(".")value=docforkeyinkeys:ifkeyinvalue:value=value[key]else:value=""breakfields[name]=valuetexts=[str(value)forvalueinfields.values()]text=" ".join(texts)else:text=str(doc)result.append(Document(page_content=text,metadata=metadata))iflen(result)!=total_docs:logger.warning(f"Only partial collection of documents returned. "f"Loaded {len(result)} docs, expected {total_docs}.")returnresult