# Based on https://github.com/langchain-ai/langchain/blob/edbe7d5f5e0dcc771c1f53a49bb784a3960ce448/libs/community/langchain_community/document_loaders/mongodb.pyfrom__future__importannotationsimportloggingfromimportlib.metadataimportversionfromtypingimportDict,List,Optional,Sequencefromlangchain_community.document_loaders.baseimportBaseLoaderfromlangchain_core.documentsimportDocumentfromlangchain_core.runnables.configimportrun_in_executorfrompymongoimportMongoClientfrompymongo.collectionimportCollectionfrompymongo.driver_infoimportDriverInfologger=logging.getLogger(__name__)
[docs]classMongoDBLoader(BaseLoader):"""Document Loaders are classes to load Documents. Document Loaders are usually used to load a lot of Documents in a single run."""
[docs]def__init__(self,collection:Collection,*,filter_criteria:Optional[Dict]=None,field_names:Optional[Sequence[str]]=None,metadata_names:Optional[Sequence[str]]=None,include_db_collection_in_metadata:bool=True,)->None:""" Initializes the MongoDB loader with necessary database connection details and configurations. Args: collection (Collection): The pymongo collection to fetch documents from. filter_criteria (Optional[Dict]): MongoDB filter criteria for querying documents. field_names (Optional[Sequence[str]]): List of field names to retrieve from documents. metadata_names (Optional[Sequence[str]]): Additional metadata fields to extract from documents. include_db_collection_in_metadata (bool): Flag to include database and collection names in metadata. """self.collection=collectionself.db=collection.databaseself.db_name=self.db.nameself.collection_name=collection.nameself.field_names=field_namesor[]self.filter_criteria=filter_criteriaor{}self.metadata_names=metadata_namesor[]self.include_db_collection_in_metadata=include_db_collection_in_metadata
[docs]@classmethoddeffrom_connection_string(cls,connection_string:str,db_name:str,collection_name:str,*,filter_criteria:Optional[Dict]=None,field_names:Optional[Sequence[str]]=None,metadata_names:Optional[Sequence[str]]=None,include_db_collection_in_metadata:bool=True,)->MongoDBLoader:""" Creates a MongoDB loader with necessary database connection details and configurations. Args: connection_string (str): MongoDB connection URI. db_name (str):Name of the database to connect to. collection_name (str): Name of the collection to fetch documents from. filter_criteria (Optional[Dict]): MongoDB filter criteria for querying documents. field_names (Optional[Sequence[str]]): List of field names to retrieve from documents. metadata_names (Optional[Sequence[str]]): Additional metadata fields to extract from documents. include_db_collection_in_metadata (bool): Flag to include database and collection names in metadata. """client=MongoClient(connection_string,driver=DriverInfo(name="Langchain",version=version("langchain-mongodb")),)collection=client[db_name][collection_name]returnMongoDBLoader(collection,filter_criteria=filter_criteria,field_names=field_names,metadata_names=metadata_names,include_db_collection_in_metadata=include_db_collection_in_metadata,)
[docs]defclose(self)->None:"""Close the resources used by the MongoDBLoader."""self.db.client.close()
[docs]defload(self)->List[Document]:"""Load data into Document objects."""result=[]total_docs=self.collection.count_documents(self.filter_criteria)projection=self._construct_projection()fordocinself.collection.find(self.filter_criteria,projection):metadata=self._extract_fields(doc,self.metadata_names,default="")# Optionally add database and collection names to metadataifself.include_db_collection_in_metadata:metadata.update({"database":self.db_name,"collection":self.collection_name})# Extract text content from filtered fields or use the entire documentifself.field_namesisnotNone:fields=self._extract_fields(doc,self.field_names,default="")texts=[str(value)forvalueinfields.values()]text=" ".join(texts)else:text=str(doc)result.append(Document(page_content=text,metadata=metadata))iflen(result)!=total_docs:logger.warning(f"Only partial collection of documents returned. "f"Loaded {len(result)} docs, expected {total_docs}.")returnresult
[docs]asyncdefaload(self)->List[Document]:"""Asynchronously loads data into Document objects."""returnawaitrun_in_executor(None,self.load)
def_construct_projection(self)->Optional[Dict]:"""Constructs the projection dictionary for MongoDB query based on the specified field names and metadata names."""field_names=list(self.field_names)or[]metadata_names=list(self.metadata_names)or[]all_fields=field_names+metadata_namesreturn{field:1forfieldinall_fields}ifall_fieldselseNonedef_extract_fields(self,document:Dict,fields:Sequence[str],default:str="",)->Dict:"""Extracts and returns values for specified fields from a document."""extracted={}forfieldinfieldsor[]:value=documentforkeyinfield.split("."):value=value.get(key,default)ifvalue==default:breaknew_field_name=field.replace(".","_")extracted[new_field_name]=valuereturnextracted