[docs]classTiDBLoader(BaseLoader):"""Load documents from TiDB."""
[docs]def__init__(self,connection_string:str,query:str,page_content_columns:Optional[List[str]]=None,metadata_columns:Optional[List[str]]=None,engine_args:Optional[Dict[str,Any]]=None,)->None:"""Initialize TiDB document loader. Args: connection_string (str): The connection string for the TiDB database, format: "mysql+pymysql://root@127.0.0.1:4000/test". query: The query to run in TiDB. page_content_columns: Optional. Columns written to Document `page_content`, default(None) to all columns. metadata_columns: Optional. Columns written to Document `metadata`, default(None) to no columns. engine_args: Optional. Additional arguments to pass to sqlalchemy engine. """self.connection_string=connection_stringself.query=queryself.page_content_columns=page_content_columnsself.metadata_columns=metadata_columnsifmetadata_columnsisnotNoneelse[]self.engine_args=engine_args
[docs]deflazy_load(self)->Iterator[Document]:"""Lazy load TiDB data into document objects."""fromsqlalchemyimportcreate_enginefromsqlalchemy.engineimportEnginefromsqlalchemy.sqlimporttext# use sqlalchemy to create db connectionengine:Engine=create_engine(self.connection_string,**(self.engine_argsor{}))# execute querywithengine.connect()asconn:result=conn.execute(text(self.query))# convert result to Document objectscolumn_names=list(result.keys())forrowinresult:# convert row to dict{column:value}row_data={column_names[index]:valueforindex,valueinenumerate(row)}page_content="\n".join(f"{k}: {v}"fork,vinrow_data.items()ifself.page_content_columnsisNoneorkinself.page_content_columns)metadata={col:row_data[col]forcolinself.metadata_columns}yieldDocument(page_content=page_content,metadata=metadata)