[docs]classKineticaLoader(BaseLoader):"""Load from `Kinetica` API. Each document represents one row of the result. The `page_content_columns` are written into the `page_content` of the document. The `metadata_columns` are written into the `metadata` of the document. By default, all columns are written into the `page_content` and none into the `metadata`. """
[docs]def__init__(self,query:str,host:str,username:str,password:str,parameters:Optional[Dict[str,Any]]=None,page_content_columns:Optional[List[str]]=None,metadata_columns:Optional[List[str]]=None,):"""Initialize Kinetica document loader. Args: query: The query to run in Kinetica. parameters: Optional. Parameters to pass to the query. page_content_columns: Optional. Columns written to Document `page_content`. metadata_columns: Optional. Columns written to Document `metadata`. """self.query=queryself.host=hostself.username=usernameself.password=passwordself.parameters=parametersself.page_content_columns=page_content_columnsself.metadata_columns=metadata_columnsifmetadata_columnsisnotNoneelse[]
def_execute_query(self)->List[Dict[str,Any]]:try:fromgpudbimportGPUdb,GPUdbSqlIteratorexceptImportError:raiseImportError("Could not import Kinetica python API. ""Please install it with `pip install gpudb==7.2.0.9`.")try:options=GPUdb.Options()options.username=self.usernameoptions.password=self.passwordconn=GPUdb(host=self.host,options=options)withGPUdbSqlIterator(conn,self.query)asrecords:column_names=records.type_map.keys()query_result=[dict(zip(column_names,record))forrecordinrecords]exceptExceptionase:print(f"An error occurred: {e}")# noqa: T201query_result=[]returnquery_resultdef_get_columns(self,query_result:List[Dict[str,Any]])->Tuple[List[str],List[str]]:page_content_columns=(self.page_content_columnsifself.page_content_columnselse[])metadata_columns=self.metadata_columnsifself.metadata_columnselse[]ifpage_content_columnsisNoneandquery_result:page_content_columns=list(query_result[0].keys())ifmetadata_columnsisNone:metadata_columns=[]returnpage_content_columnsor[],metadata_columns
[docs]deflazy_load(self)->Iterator[Document]:query_result=self._execute_query()ifisinstance(query_result,Exception):print(f"An error occurred during the query: {query_result}")# noqa: T201return[]# type: ignore[return-value]page_content_columns,metadata_columns=self._get_columns(query_result)if"*"inpage_content_columns:page_content_columns=list(query_result[0].keys())forrowinquery_result:page_content="\n".join(f"{k}: {v}"fork,vinrow.items()ifkinpage_content_columns)metadata={k:vfork,vinrow.items()ifkinmetadata_columns}doc=Document(page_content=page_content,metadata=metadata)yielddoc
[docs]defload(self)->List[Document]:"""Load data into document objects."""returnlist(self.lazy_load())