[docs]classDocumentStorage(BaseStore[str,Document]):"""Abstract interface of a key, text storage for retrieving documents."""
[docs]classGCSDocumentStorage(DocumentStorage):"""Stores documents in Google Cloud Storage. For each pair id, document_text the name of the blob will be {prefix}/{id} stored in plain text format. """
[docs]def__init__(self,bucket:storage.Bucket,prefix:Optional[str]="documents",threaded=True,n_threads=8,)->None:"""Constructor. Args: bucket: Bucket where the documents will be stored. prefix: Prefix that is prepended to all document names. """super().__init__()self._bucket=bucketself._prefix=prefixself._threaded=threadedself._n_threads=n_threadsifthreaded:ifnot(int(n_threads)>0andint(n_threads)<=50):raiseValueError("n_threads must be a valid integer,"" greater than 0 and lower than or equal to 50")
[docs]defmset(self,key_value_pairs:Sequence[Tuple[str,Document]])->None:"""Stores a series of documents using each keys Args: key_value_pairs (Sequence[Tuple[K, V]]): A sequence of key-value pairs. """ifself._threaded:results=transfer_manager.upload_many([self._prepare_doc_for_bulk_upload(key,value)forkey,valueinkey_value_pairs],skip_if_exists=False,upload_kwargs=None,deadline=None,raise_exception=False,worker_type="thread",max_workers=self._n_threads,)forresultinresults:# The results list is either `None` or an exception for each filename in# the input list, in order.ifisinstance(result,Exception):raiseresultelse:forkey,valueinkey_value_pairs:self._set_one(key,value)
def_convert_bytes_to_doc(self,doc:io.BytesIO,result:Any)->Union[Document,None]:ifisinstance(result,NotFound):returnNoneelifresultisNone:doc.seek(0)raw_doc=doc.read()data=raw_doc.decode("utf-8")data_json=json.loads(data)returnDocument(**data_json)else:raiseException("Unexpected result type when batch getting multiple files from GCS")
[docs]defmget(self,keys:Sequence[str])->List[Optional[Document]]:"""Gets a batch of documents by id. The default implementation only loops `get_by_id`. Subclasses that have faster ways to retrieve data by batch should implement this method. Args: ids: List of ids for the text. Returns: List of documents. If the key id is not found for any id record returns a None instead. """ifself._threaded:download_docs=[(self._bucket.blob(self._get_blob_name(key)),io.BytesIO())forkeyinkeys]download_results=transfer_manager.download_many(download_docs,skip_if_exists=False,download_kwargs=None,deadline=None,raise_exception=False,worker_type="thread",max_workers=self._n_threads,)fori,resultinenumerate(download_results):ifisinstance(result,Exception)andnotisinstance(result,NotFound):raiseresultreturn[self._convert_bytes_to_doc(doc[1],result)fordoc,resultinzip(download_docs,download_results)]else:return[self._get_one(key)forkeyinkeys]
[docs]defmdelete(self,keys:Sequence[str])->None:"""Deletes a batch of documents by id. Args: keys: List of ids for the text. """foriinrange(0,len(keys),GCS_MAX_BATCH_SIZE):batch=keys[i:i+GCS_MAX_BATCH_SIZE]withself._bucket.client.batch():forkeyinbatch:self._delete_one(key)
[docs]defyield_keys(self,*,prefix:str|None=None)->Iterator[str]:"""Yields the keys present in the storage. Args: prefix: Ignored. Uses the prefix provided in the constructor. """forblobinself._bucket.list_blobs(prefix=self._prefix):yieldblob.name.split("/")[-1]
def_get_one(self,key:str)->Document|None:"""Gets the text of a document by its id. If not found, returns None. Args: key: Id of the document to get from the storage. Returns: Document if found, otherwise None. """blob_name=self._get_blob_name(key)existing_blob=self._bucket.get_blob(blob_name)ifexisting_blobisNone:returnNonedocument_str=existing_blob.download_as_text()document_json:Dict[str,Any]=json.loads(document_str)returnDocument(**document_json)def_set_one(self,key:str,value:Document)->None:"""Stores a document text associated to a document_id. Args: key: Id of the document to be stored. document: Document to be stored. """blob_name=self._get_blob_name(key)new_blow=self._bucket.blob(blob_name)document_json=value.dict()document_text=json.dumps(document_json)new_blow.upload_from_string(document_text)def_delete_one(self,key:str)->None:"""Deletes one document by its key. Args: key (str): Id of the document to delete. """blob_name=self._get_blob_name(key)blob=self._bucket.blob(blob_name)blob.delete()def_get_blob_name(self,document_id:str)->str:"""Builds a blob name using the prefix and the document_id. Args: document_id: Id of the document. Returns: Name of the blob that the document will be/is stored in """returnf"{self._prefix}/{document_id}"
[docs]classDataStoreDocumentStorage(DocumentStorage):"""Stores documents in Google Cloud DataStore."""
[docs]def__init__(self,datastore_client:datastore.Client,kind:str="document_id",text_property_name:str="text",metadata_property_name:str="metadata",)->None:"""Constructor. Args: bucket: Bucket where the documents will be stored. prefix: Prefix that is prepended to all document names. """super().__init__()self._client=datastore_clientself._text_property_name=text_property_nameself._metadata_property_name=metadata_property_nameself._kind=kind
[docs]defmget(self,keys:Sequence[str])->List[Optional[Document]]:"""Gets a batch of documents by id. Args: ids: List of ids for the text. Returns: List of texts. If the key id is not found for any id record returns a None instead. """ds_keys=[self._client.key(self._kind,id_)forid_inkeys]entities=self._client.get_multi(ds_keys)# Entities are not sorted by key by default, the order is unclear. This orders# the list by the id retrieved.entity_id_lookup={entity.key.id_or_name:entityforentityinentities}entities=[entity_id_lookup.get(id_)forid_inkeys]return[Document(page_content=entity[self._text_property_name],metadata=self._convert_entity_to_dict(entity[self._metadata_property_name]),)ifentityisnotNoneelseNoneforentityinentities]
[docs]defmset(self,key_value_pairs:Sequence[Tuple[str,Document]])->None:"""Stores a series of documents using each keys Args: key_value_pairs (Sequence[Tuple[K, V]]): A sequence of key-value pairs. """ids=[keyforkey,_inkey_value_pairs]documents=[documentfor_,documentinkey_value_pairs]withself._client.transaction():keys=[self._client.key(self._kind,id_)forid_inids]entities=[]forkey,documentinzip(keys,documents):entity=self._client.entity(key=key)entity[self._text_property_name]=document.page_contententity[self._metadata_property_name]=document.metadataentities.append(entity)self._client.put_multi(entities)
[docs]defmdelete(self,keys:Sequence[str])->None:"""Deletes a sequence of documents by key. Args: keys (Sequence[str]): A sequence of keys to delete. """withself._client.transaction():keys=[self._client.key(self._kind,id_)forid_inkeys]self._client.delete_multi(keys)
[docs]defyield_keys(self,*,prefix:str|None=None)->Iterator[str]:"""Yields the keys of all documents in the storage. Args: prefix: Ignored """query=self._client.query(kind=self._kind)query.keys_only()forentityinquery.fetch():yieldstr(entity.key.id_or_name)
def_convert_entity_to_dict(self,entity:datastore.Entity)->Dict[str,Any]:"""Recursively transform an entity into a plain dictionary."""fromgoogle.cloudimportdatastore# type: ignore[attr-defined, unused-ignore]dict_entity=dict(entity)forkeyindict_entity:value=dict_entity[key]ifisinstance(value,datastore.Entity):dict_entity[key]=self._convert_entity_to_dict(value)returndict_entity