[docs]defdefault_joiner(docs:List[Tuple[str,Any]])->str:"""Default joiner for content columns."""return"\n".join([doc[1]fordocindocs])
[docs]classColumnNotFoundError(Exception):"""Column not found error."""def__init__(self,missing_key:str,query:str):super().__init__(f'Column "{missing_key}" not selected in query:\n{query}')
[docs]classRocksetLoader(BaseLoader):"""Load from a `Rockset` database. To use, you should have the `rockset` python package installed. Example: .. code-block:: python # This code will load 3 records from the "langchain_demo" # collection as Documents, with the `text` column used as # the content from langchain_community.document_loaders import RocksetLoader from rockset import RocksetClient, Regions, models loader = RocksetLoader( RocksetClient(Regions.usw2a1, "<api key>"), models.QueryRequestSql( query="select * from langchain_demo limit 3" ), ["text"] ) ) """
[docs]def__init__(self,client:Any,query:Any,content_keys:List[str],metadata_keys:Optional[List[str]]=None,content_columns_joiner:Callable[[List[Tuple[str,Any]]],str]=default_joiner,):"""Initialize with Rockset client. Args: client: Rockset client object. query: Rockset query object. content_keys: The collection columns to be written into the `page_content` of the Documents. metadata_keys: The collection columns to be written into the `metadata` of the Documents. By default, this is all the keys in the document. content_columns_joiner: Method that joins content_keys and its values into a string. It's method that takes in a List[Tuple[str, Any]]], representing a list of tuples of (column name, column value). By default, this is a method that joins each column value with a new line. This method is only relevant if there are multiple content_keys. """try:fromrocksetimportQueryPaginator,RocksetClientfromrockset.modelsimportQueryRequestSqlexceptImportError:raiseImportError("Could not import rockset client python package. ""Please install it with `pip install rockset`.")ifnotisinstance(client,RocksetClient):raiseValueError(f"client should be an instance of rockset.RocksetClient, "f"got {type(client)}")ifnotisinstance(query,QueryRequestSql):raiseValueError(f"query should be an instance of rockset.model.QueryRequestSql, "f"got {type(query)}")self.client=clientself.query=queryself.content_keys=content_keysself.content_columns_joiner=content_columns_joinerself.metadata_keys=metadata_keysself.paginator=QueryPaginatorself.request_model=QueryRequestSqltry:self.client.set_application("langchain")exceptAttributeError:# ignorepass
[docs]deflazy_load(self)->Iterator[Document]:query_results=self.client.Queries.query(sql=self.query).results# execute the SQL queryfordocinquery_results:# for each doc in the responsetry:yieldDocument(page_content=self.content_columns_joiner([(col,doc[col])forcolinself.content_keys]),metadata={col:doc[col]forcolinself.metadata_keys}ifself.metadata_keysisnotNoneelsedoc,)# try to yield the Documentexcept(KeyError)ase:# either content_columns or metadata_columns is invalidraiseColumnNotFoundError(e.args[0],self.query)# raise that the column isn't in the db schema