[docs]def__init__(self,connection_string:str,db_name:str,collection_name:str,*,filter_criteria:Optional[Dict]=None,field_names:Optional[Sequence[str]]=None,metadata_names:Optional[Sequence[str]]=None,include_db_collection_in_metadata:bool=True,)->None:""" Initializes the MongoDB loader with necessary database connection details and configurations. Args: connection_string (str): MongoDB connection URI. db_name (str):Name of the database to connect to. collection_name (str): Name of the collection to fetch documents from. filter_criteria (Optional[Dict]): MongoDB filter criteria for querying documents. field_names (Optional[Sequence[str]]): List of field names to retrieve from documents. metadata_names (Optional[Sequence[str]]): Additional metadata fields to extract from documents. include_db_collection_in_metadata (bool): Flag to include database and collection names in metadata. Raises: ImportError: If the motor library is not installed. ValueError: If any necessary argument is missing. """try:frommotor.motor_asyncioimportAsyncIOMotorClientexceptImportErrorase:raiseImportError("Cannot import from motor, please install with `pip install motor`.")fromeifnotconnection_string:raiseValueError("connection_string must be provided.")ifnotdb_name:raiseValueError("db_name must be provided.")ifnotcollection_name:raiseValueError("collection_name must be provided.")self.client=AsyncIOMotorClient(connection_string)self.db_name=db_nameself.collection_name=collection_nameself.field_names=field_namesor[]self.filter_criteria=filter_criteriaor{}self.metadata_names=metadata_namesor[]self.include_db_collection_in_metadata=include_db_collection_in_metadataself.db=self.client.get_database(db_name)self.collection=self.db.get_collection(collection_name)
[docs]defload(self)->List[Document]:"""Load data into Document objects. Attention: This implementation starts an asyncio event loop which will only work if running in a sync env. In an async env, it should fail since there is already an event loop running. This code should be updated to kick off the event loop from a separate thread if running within an async context. """returnasyncio.run(self.aload())
[docs]asyncdefaload(self)->List[Document]:"""Asynchronously loads data into Document objects."""result=[]total_docs=awaitself.collection.count_documents(self.filter_criteria)projection=self._construct_projection()asyncfordocinself.collection.find(self.filter_criteria,projection):metadata=self._extract_fields(doc,self.metadata_names,default="")# Optionally add database and collection names to metadataifself.include_db_collection_in_metadata:metadata.update({"database":self.db_name,"collection":self.collection_name,})# Extract text content from filtered fields or use the entire documentifself.field_namesisnotNone:fields=self._extract_fields(doc,self.field_names,default="")texts=[str(value)forvalueinfields.values()]text=" ".join(texts)else:text=str(doc)result.append(Document(page_content=text,metadata=metadata))iflen(result)!=total_docs:logger.warning(f"Only partial collection of documents returned. "f"Loaded {len(result)} docs, expected {total_docs}.")returnresult
def_construct_projection(self)->Optional[Dict]:"""Constructs the projection dictionary for MongoDB query based on the specified field names and metadata names."""field_names=list(self.field_names)or[]metadata_names=list(self.metadata_names)or[]all_fields=field_names+metadata_namesreturn{field:1forfieldinall_fields}ifall_fieldselseNonedef_extract_fields(self,document:Dict,fields:Sequence[str],default:str="",)->Dict:"""Extracts and returns values for specified fields from a document."""extracted={}forfieldinfieldsor[]:value=documentforkeyinfield.split("."):value=value.get(key,default)ifvalue==default:breaknew_field_name=field.replace(".","_")extracted[new_field_name]=valuereturnextracted