[docs]classCouchbaseLoader(BaseLoader):"""Load documents from `Couchbase`. Each document represents one row of the result. The `page_content_fields` are written into the `page_content`of the document. The `metadata_fields` are written into the `metadata` of the document. By default, all columns are written into the `page_content` and none into the `metadata`. """
[docs]def__init__(self,connection_string:str,db_username:str,db_password:str,query:str,*,page_content_fields:Optional[List[str]]=None,metadata_fields:Optional[List[str]]=None,)->None:"""Initialize Couchbase document loader. Args: connection_string (str): The connection string to the Couchbase cluster. db_username (str): The username to connect to the Couchbase cluster. db_password (str): The password to connect to the Couchbase cluster. query (str): The SQL++ query to execute. page_content_fields (Optional[List[str]]): The columns to write into the `page_content` field of the document. By default, all columns are written. metadata_fields (Optional[List[str]]): The columns to write into the `metadata` field of the document. By default, no columns are written. """try:fromcouchbase.authimportPasswordAuthenticatorfromcouchbase.clusterimportClusterfromcouchbase.optionsimportClusterOptionsexceptImportErrorase:raiseImportError("Could not import couchbase package.""Please install couchbase SDK with `pip install couchbase`.")fromeifnotconnection_string:raiseValueError("connection_string must be provided.")ifnotdb_username:raiseValueError("db_username must be provided.")ifnotdb_password:raiseValueError("db_password must be provided.")auth=PasswordAuthenticator(db_username,db_password,)self.cluster:Cluster=Cluster(connection_string,ClusterOptions(auth))self.query=queryself.page_content_fields=page_content_fieldsself.metadata_fields=metadata_fields
[docs]deflazy_load(self)->Iterator[Document]:"""Load Couchbase data into Document objects lazily."""fromdatetimeimporttimedelta# Ensure connection to Couchbase clusterself.cluster.wait_until_ready(timedelta(seconds=5))# Run SQL++ Queryresult=self.cluster.query(self.query)forrowinresult:metadata_fields=self.metadata_fieldspage_content_fields=self.page_content_fieldsifnotpage_content_fields:page_content_fields=list(row.keys())ifnotmetadata_fields:metadata_fields=[]metadata={field:row[field]forfieldinmetadata_fields}document="\n".join(f"{k}: {v}"fork,vinrow.items()ifkinpage_content_fields)yield(Document(page_content=document,metadata=metadata))