[docs]classGlueCatalogLoader(BaseLoader):"""Load table schemas from AWS Glue. This loader fetches the schema of each table within a specified AWS Glue database. The schema details include column names and their data types, similar to pandas dtype representation. AWS credentials are automatically loaded using boto3, following the standard AWS method: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html If a specific AWS profile is required, it can be specified and will be used to establish the session. """
[docs]def__init__(self,database:str,*,session:Optional[Session]=None,profile_name:Optional[str]=None,table_filter:Optional[List[str]]=None,):"""Initialize Glue database loader. Args: database: The name of the Glue database from which to load table schemas. session: Optional. A boto3 Session object. If not provided, a new session will be created. profile_name: Optional. The name of the AWS profile to use for credentials. table_filter: Optional. List of table names to fetch schemas for, fetching all if None. """self.database=databaseself.profile_name=profile_nameself.table_filter=table_filterifsession:self.glue_client=session.client("glue")else:self.glue_client=self._initialize_glue_client()
def_initialize_glue_client(self)->Any:"""Initialize the AWS Glue client. Returns: The initialized AWS Glue client. Raises: ValueError: If there is an issue with AWS session/client initialization. """try:importboto3exceptImportErrorase:raiseImportError("boto3 is required to use the GlueCatalogLoader. ""Please install it with `pip install boto3`.")frometry:session=(boto3.Session(profile_name=self.profile_name)ifself.profile_nameelseboto3.Session())returnsession.client("glue")exceptExceptionase:raiseValueError("Issue with AWS session/client initialization.")fromedef_fetch_tables(self)->List[str]:"""Retrieve all table names in the specified Glue database. Returns: A list of table names. """paginator=self.glue_client.get_paginator("get_tables")table_names=[]forpageinpaginator.paginate(DatabaseName=self.database):fortableinpage["TableList"]:ifself.table_filterisNoneortable["Name"]inself.table_filter:table_names.append(table["Name"])returntable_namesdef_fetch_table_schema(self,table_name:str)->Dict[str,str]:"""Fetch the schema of a specified table. Args: table_name: The name of the table for which to fetch the schema. Returns: A dictionary mapping column names to their data types. """response=self.glue_client.get_table(DatabaseName=self.database,Name=table_name)columns=response["Table"]["StorageDescriptor"]["Columns"]return{col["Name"]:col["Type"]forcolincolumns}
[docs]deflazy_load(self)->Iterator[Document]:"""Lazily load table schemas as Document objects. Yields: Document objects, each representing the schema of a table. """table_names=self._fetch_tables()fortable_nameintable_names:schema=self._fetch_table_schema(table_name)page_content=(f"Database: {self.database}\nTable: {table_name}\nSchema:\n"+"\n".join(f"{col}: {dtype}"forcol,dtypeinschema.items()))doc=Document(page_content=page_content,metadata={"table_name":table_name})yielddoc