[docs]classSnowflakeLoader(BaseLoader):"""Load from `Snowflake` API. Each document represents one row of the result. The `page_content_columns` are written into the `page_content` of the document. The `metadata_columns` are written into the `metadata` of the document. By default, all columns are written into the `page_content` and none into the `metadata`. """
[docs]def__init__(self,query:str,user:str,password:str,account:str,warehouse:str,role:str,database:str,schema:str,parameters:Optional[Dict[str,Any]]=None,page_content_columns:Optional[List[str]]=None,metadata_columns:Optional[List[str]]=None,):"""Initialize Snowflake document loader. Args: query: The query to run in Snowflake. user: Snowflake user. password: Snowflake password. account: Snowflake account. warehouse: Snowflake warehouse. role: Snowflake role. database: Snowflake database schema: Snowflake schema parameters: Optional. Parameters to pass to the query. page_content_columns: Optional. Columns written to Document `page_content`. metadata_columns: Optional. Columns written to Document `metadata`. """self.query=queryself.user=userself.password=passwordself.account=accountself.warehouse=warehouseself.role=roleself.database=databaseself.schema=schemaself.parameters=parametersself.page_content_columns=(page_content_columnsifpage_content_columnsisnotNoneelse["*"])self.metadata_columns=metadata_columnsifmetadata_columnsisnotNoneelse[]
def_execute_query(self)->List[Dict[str,Any]]:try:importsnowflake.connectorexceptImportErrorasex:raiseImportError("Could not import snowflake-connector-python package. ""Please install it with `pip install snowflake-connector-python`.")fromexconn=snowflake.connector.connect(user=self.user,password=self.password,account=self.account,warehouse=self.warehouse,role=self.role,database=self.database,schema=self.schema,parameters=self.parameters,)try:cur=conn.cursor()cur.execute("USE DATABASE "+self.database)cur.execute("USE SCHEMA "+self.schema)cur.execute(self.query,self.parameters)query_result=cur.fetchall()column_names=[column[0]forcolumnincur.description]query_result=[dict(zip(column_names,row))forrowinquery_result]exceptExceptionase:print(f"An error occurred: {e}")# noqa: T201query_result=[]finally:cur.close()returnquery_resultdef_get_columns(self,query_result:List[Dict[str,Any]])->Tuple[List[str],List[str]]:page_content_columns=(self.page_content_columnsifself.page_content_columnselse[])metadata_columns=self.metadata_columnsifself.metadata_columnselse[]ifpage_content_columnsisNoneandquery_result:page_content_columns=list(query_result[0].keys())ifmetadata_columnsisNone:metadata_columns=[]returnpage_content_columnsor[],metadata_columns
[docs]deflazy_load(self)->Iterator[Document]:query_result=self._execute_query()ifisinstance(query_result,Exception):print(f"An error occurred during the query: {query_result}")# noqa: T201return[]# type: ignore[return-value]page_content_columns,metadata_columns=self._get_columns(query_result)if"*"inpage_content_columns:page_content_columns=list(query_result[0].keys())forrowinquery_result:page_content="\n".join(f"{k}: {v}"fork,vinrow.items()ifkinpage_content_columns)metadata={k:vfork,vinrow.items()ifkinmetadata_columns}doc=Document(page_content=page_content,metadata=metadata)yielddoc