Source code for langchain_community.utilities.sql_database
"""SQLAlchemy wrapper around a database."""from__future__importannotationsfromtypingimportAny,Dict,Iterable,List,Literal,Optional,Sequence,Unionimportsqlalchemyfromlangchain_core._apiimportdeprecatedfromlangchain_core.utilsimportget_from_envfromsqlalchemyimport(MetaData,Table,create_engine,inspect,select,text,)fromsqlalchemy.engineimportURL,Engine,Resultfromsqlalchemy.excimportProgrammingError,SQLAlchemyErrorfromsqlalchemy.schemaimportCreateTablefromsqlalchemy.sql.expressionimportExecutablefromsqlalchemy.typesimportNullTypedef_format_index(index:sqlalchemy.engine.interfaces.ReflectedIndex)->str:return(f'Name: {index["name"]}, Unique: {index["unique"]},'f' Columns: {str(index["column_names"])}')
[docs]deftruncate_word(content:Any,*,length:int,suffix:str="...")->str:""" Truncate a string to a certain number of words, based on the max string length. """ifnotisinstance(content,str)orlength<=0:returncontentiflen(content)<=length:returncontentreturncontent[:length-len(suffix)].rsplit(" ",1)[0]+suffix
[docs]classSQLDatabase:"""SQLAlchemy wrapper around a database."""
[docs]def__init__(self,engine:Engine,schema:Optional[str]=None,metadata:Optional[MetaData]=None,ignore_tables:Optional[List[str]]=None,include_tables:Optional[List[str]]=None,sample_rows_in_table_info:int=3,indexes_in_table_info:bool=False,custom_table_info:Optional[dict]=None,view_support:bool=False,max_string_length:int=300,lazy_table_reflection:bool=False,):"""Create engine from database URI."""self._engine=engineself._schema=schemaifinclude_tablesandignore_tables:raiseValueError("Cannot specify both include_tables and ignore_tables")self._inspector=inspect(self._engine)# including view support by adding the views as well as tables to the all# tables list if view_support is Trueself._all_tables=set(self._inspector.get_table_names(schema=schema)+(self._inspector.get_view_names(schema=schema)ifview_supportelse[]))self._include_tables=set(include_tables)ifinclude_tableselseset()ifself._include_tables:missing_tables=self._include_tables-self._all_tablesifmissing_tables:raiseValueError(f"include_tables {missing_tables} not found in database")self._ignore_tables=set(ignore_tables)ifignore_tableselseset()ifself._ignore_tables:missing_tables=self._ignore_tables-self._all_tablesifmissing_tables:raiseValueError(f"ignore_tables {missing_tables} not found in database")usable_tables=self.get_usable_table_names()self._usable_tables=set(usable_tables)ifusable_tableselseself._all_tablesifnotisinstance(sample_rows_in_table_info,int):raiseTypeError("sample_rows_in_table_info must be an integer")self._sample_rows_in_table_info=sample_rows_in_table_infoself._indexes_in_table_info=indexes_in_table_infoself._custom_table_info=custom_table_infoifself._custom_table_info:ifnotisinstance(self._custom_table_info,dict):raiseTypeError("table_info must be a dictionary with table names as keys and the ""desired table info as values")# only keep the tables that are also present in the databaseintersection=set(self._custom_table_info).intersection(self._all_tables)self._custom_table_info=dict((table,self._custom_table_info[table])fortableinself._custom_table_infoiftableinintersection)self._max_string_length=max_string_lengthself._view_support=view_supportself._metadata=metadataorMetaData()ifnotlazy_table_reflection:# including view support if view_support = trueself._metadata.reflect(views=view_support,bind=self._engine,only=list(self._usable_tables),schema=self._schema,)
[docs]@classmethoddeffrom_uri(cls,database_uri:Union[str,URL],engine_args:Optional[dict]=None,**kwargs:Any,)->SQLDatabase:"""Construct a SQLAlchemy engine from URI."""_engine_args=engine_argsor{}returncls(create_engine(database_uri,**_engine_args),**kwargs)
[docs]@classmethoddeffrom_databricks(cls,catalog:str,schema:str,host:Optional[str]=None,api_token:Optional[str]=None,warehouse_id:Optional[str]=None,cluster_id:Optional[str]=None,engine_args:Optional[dict]=None,**kwargs:Any,)->SQLDatabase:""" Class method to create an SQLDatabase instance from a Databricks connection. This method requires the 'databricks-sql-connector' package. If not installed, it can be added using `pip install databricks-sql-connector`. Args: catalog (str): The catalog name in the Databricks database. schema (str): The schema name in the catalog. host (Optional[str]): The Databricks workspace hostname, excluding 'https://' part. If not provided, it attempts to fetch from the environment variable 'DATABRICKS_HOST'. If still unavailable and if running in a Databricks notebook, it defaults to the current workspace hostname. Defaults to None. api_token (Optional[str]): The Databricks personal access token for accessing the Databricks SQL warehouse or the cluster. If not provided, it attempts to fetch from 'DATABRICKS_TOKEN'. If still unavailable and running in a Databricks notebook, a temporary token for the current user is generated. Defaults to None. warehouse_id (Optional[str]): The warehouse ID in the Databricks SQL. If provided, the method configures the connection to use this warehouse. Cannot be used with 'cluster_id'. Defaults to None. cluster_id (Optional[str]): The cluster ID in the Databricks Runtime. If provided, the method configures the connection to use this cluster. Cannot be used with 'warehouse_id'. If running in a Databricks notebook and both 'warehouse_id' and 'cluster_id' are None, it uses the ID of the cluster the notebook is attached to. Defaults to None. engine_args (Optional[dict]): The arguments to be used when connecting Databricks. Defaults to None. **kwargs (Any): Additional keyword arguments for the `from_uri` method. Returns: SQLDatabase: An instance of SQLDatabase configured with the provided Databricks connection details. Raises: ValueError: If 'databricks-sql-connector' is not found, or if both 'warehouse_id' and 'cluster_id' are provided, or if neither 'warehouse_id' nor 'cluster_id' are provided and it's not executing inside a Databricks notebook. """try:fromdatabricksimportsql# noqa: F401exceptImportError:raiseImportError("databricks-sql-connector package not found, please install with"" `pip install databricks-sql-connector`")context=Nonetry:fromdbruntime.databricks_repl_contextimportget_contextcontext=get_context()default_host=context.browserHostNameexcept(ImportError,AttributeError):default_host=NoneifhostisNone:host=get_from_env("host","DATABRICKS_HOST",default_host)default_api_token=context.apiTokenifcontextelseNoneifapi_tokenisNone:api_token=get_from_env("api_token","DATABRICKS_TOKEN",default_api_token)ifwarehouse_idisNoneandcluster_idisNone:ifcontext:cluster_id=context.clusterIdelse:raiseValueError("Need to provide either 'warehouse_id' or 'cluster_id'.")ifwarehouse_idandcluster_id:raiseValueError("Can't have both 'warehouse_id' or 'cluster_id'.")ifwarehouse_id:http_path=f"/sql/1.0/warehouses/{warehouse_id}"else:http_path=f"/sql/protocolv1/o/0/{cluster_id}"uri=(f"databricks://token:{api_token}@{host}?"f"http_path={http_path}&catalog={catalog}&schema={schema}")returncls.from_uri(database_uri=uri,engine_args=engine_args,**kwargs)
[docs]@classmethoddeffrom_cnosdb(cls,url:str="127.0.0.1:8902",user:str="root",password:str="",tenant:str="cnosdb",database:str="public",)->SQLDatabase:""" Class method to create an SQLDatabase instance from a CnosDB connection. This method requires the 'cnos-connector' package. If not installed, it can be added using `pip install cnos-connector`. Args: url (str): The HTTP connection host name and port number of the CnosDB service, excluding "http://" or "https://", with a default value of "127.0.0.1:8902". user (str): The username used to connect to the CnosDB service, with a default value of "root". password (str): The password of the user connecting to the CnosDB service, with a default value of "". tenant (str): The name of the tenant used to connect to the CnosDB service, with a default value of "cnosdb". database (str): The name of the database in the CnosDB tenant. Returns: SQLDatabase: An instance of SQLDatabase configured with the provided CnosDB connection details. """try:fromcnosdb_connectorimportmake_cnosdb_langchain_uriuri=make_cnosdb_langchain_uri(url,user,password,tenant,database)returncls.from_uri(database_uri=uri)exceptImportError:raiseImportError("cnos-connector package not found, please install with"" `pip install cnos-connector`")
@propertydefdialect(self)->str:"""Return string representation of dialect to use."""returnself._engine.dialect.name
[docs]defget_usable_table_names(self)->Iterable[str]:"""Get names of tables available."""ifself._include_tables:returnsorted(self._include_tables)returnsorted(self._all_tables-self._ignore_tables)
[docs]@deprecated("0.0.1",alternative="get_usable_table_names",removal="1.0")defget_table_names(self)->Iterable[str]:"""Get names of tables available."""returnself.get_usable_table_names()
@propertydeftable_info(self)->str:"""Information about all tables in the database."""returnself.get_table_info()
[docs]defget_table_info(self,table_names:Optional[List[str]]=None)->str:"""Get information about specified tables. Follows best practices as specified in: Rajkumar et al, 2022 (https://arxiv.org/abs/2204.00498) If `sample_rows_in_table_info`, the specified number of sample rows will be appended to each table description. This can increase performance as demonstrated in the paper. """all_table_names=self.get_usable_table_names()iftable_namesisnotNone:missing_tables=set(table_names).difference(all_table_names)ifmissing_tables:raiseValueError(f"table_names {missing_tables} not found in database")all_table_names=table_namesmetadata_table_names=[tbl.namefortblinself._metadata.sorted_tables]to_reflect=set(all_table_names)-set(metadata_table_names)ifto_reflect:self._metadata.reflect(views=self._view_support,bind=self._engine,only=list(to_reflect),schema=self._schema,)meta_tables=[tblfortblinself._metadata.sorted_tablesiftbl.nameinset(all_table_names)andnot(self.dialect=="sqlite"andtbl.name.startswith("sqlite_"))]tables=[]fortableinmeta_tables:ifself._custom_table_infoandtable.nameinself._custom_table_info:tables.append(self._custom_table_info[table.name])continue# Ignore JSON datatyped columnsfork,vintable.columns.items():# AttributeError: items in sqlalchemy v1iftype(v.type)isNullType:table._columns.remove(v)# add create table commandcreate_table=str(CreateTable(table).compile(self._engine))table_info=f"{create_table.rstrip()}"has_extra_info=(self._indexes_in_table_infoorself._sample_rows_in_table_info)ifhas_extra_info:table_info+="\n\n/*"ifself._indexes_in_table_info:table_info+=f"\n{self._get_table_indexes(table)}\n"ifself._sample_rows_in_table_info:table_info+=f"\n{self._get_sample_rows(table)}\n"ifhas_extra_info:table_info+="*/"tables.append(table_info)tables.sort()final_str="\n\n".join(tables)returnfinal_str
def_get_table_indexes(self,table:Table)->str:indexes=self._inspector.get_indexes(table.name)indexes_formatted="\n".join(map(_format_index,indexes))returnf"Table Indexes:\n{indexes_formatted}"def_get_sample_rows(self,table:Table)->str:# build the select commandcommand=select(table).limit(self._sample_rows_in_table_info)# save the columns in string formatcolumns_str="\t".join([col.nameforcolintable.columns])try:# get the sample rowswithself._engine.connect()asconnection:sample_rows_result=connection.execute(command)# type: ignore# shorten values in the sample rowssample_rows=list(map(lambdals:[str(i)[:100]foriinls],sample_rows_result))# save the sample rows in string formatsample_rows_str="\n".join(["\t".join(row)forrowinsample_rows])# in some dialects when there are no rows in the table a# 'ProgrammingError' is returnedexceptProgrammingError:sample_rows_str=""return(f"{self._sample_rows_in_table_info} rows from {table.name} table:\n"f"{columns_str}\n"f"{sample_rows_str}")def_execute(self,command:Union[str,Executable],fetch:Literal["all","one","cursor"]="all",*,parameters:Optional[Dict[str,Any]]=None,execution_options:Optional[Dict[str,Any]]=None,)->Union[Sequence[Dict[str,Any]],Result]:""" Executes SQL command through underlying engine. If the statement returns no rows, an empty list is returned. """parameters=parametersor{}execution_options=execution_optionsor{}withself._engine.begin()asconnection:# type: Connection # type: ignore[name-defined]ifself._schemaisnotNone:ifself.dialect=="snowflake":connection.exec_driver_sql("ALTER SESSION SET search_path = %s",(self._schema,),execution_options=execution_options,)elifself.dialect=="bigquery":connection.exec_driver_sql("SET @@dataset_id=?",(self._schema,),execution_options=execution_options,)elifself.dialect=="mssql":passelifself.dialect=="trino":connection.exec_driver_sql("USE ?",(self._schema,),execution_options=execution_options,)elifself.dialect=="duckdb":# Unclear which parameterized argument syntax duckdb supports.# The docs for the duckdb client say they support multiple,# but `duckdb_engine` seemed to struggle with all of them:# https://github.com/Mause/duckdb_engine/issues/796connection.exec_driver_sql(f"SET search_path TO {self._schema}",execution_options=execution_options,)elifself.dialect=="oracle":connection.exec_driver_sql(f"ALTER SESSION SET CURRENT_SCHEMA = {self._schema}",execution_options=execution_options,)elifself.dialect=="sqlany":# If anybody using Sybase SQL anywhere database then it should not# go to else condition. It should be same as mssql.passelifself.dialect=="postgresql":# postgresqlconnection.exec_driver_sql("SET search_path TO %s",(self._schema,),execution_options=execution_options,)ifisinstance(command,str):command=text(command)elifisinstance(command,Executable):passelse:raiseTypeError(f"Query expression has unknown type: {type(command)}")cursor=connection.execute(command,parameters,execution_options=execution_options,)ifcursor.returns_rows:iffetch=="all":result=[x._asdict()forxincursor.fetchall()]eliffetch=="one":first_result=cursor.fetchone()result=[]iffirst_resultisNoneelse[first_result._asdict()]eliffetch=="cursor":returncursorelse:raiseValueError("Fetch parameter must be either 'one', 'all', or 'cursor'")returnresultreturn[]
[docs]defrun(self,command:Union[str,Executable],fetch:Literal["all","one","cursor"]="all",include_columns:bool=False,*,parameters:Optional[Dict[str,Any]]=None,execution_options:Optional[Dict[str,Any]]=None,)->Union[str,Sequence[Dict[str,Any]],Result[Any]]:"""Execute a SQL command and return a string representing the results. If the statement returns rows, a string of the results is returned. If the statement returns no rows, an empty string is returned. """result=self._execute(command,fetch,parameters=parameters,execution_options=execution_options)iffetch=="cursor":returnresultres=[{column:truncate_word(value,length=self._max_string_length)forcolumn,valueinr.items()}forrinresult]ifnotinclude_columns:res=[tuple(row.values())forrowinres]# type: ignore[misc]ifnotres:return""else:returnstr(res)
[docs]defget_table_info_no_throw(self,table_names:Optional[List[str]]=None)->str:"""Get information about specified tables. Follows best practices as specified in: Rajkumar et al, 2022 (https://arxiv.org/abs/2204.00498) If `sample_rows_in_table_info`, the specified number of sample rows will be appended to each table description. This can increase performance as demonstrated in the paper. """try:returnself.get_table_info(table_names)exceptValueErrorase:"""Format the error message"""returnf"Error: {e}"
[docs]defrun_no_throw(self,command:str,fetch:Literal["all","one"]="all",include_columns:bool=False,*,parameters:Optional[Dict[str,Any]]=None,execution_options:Optional[Dict[str,Any]]=None,)->Union[str,Sequence[Dict[str,Any]],Result[Any]]:"""Execute a SQL command and return a string representing the results. If the statement returns rows, a string of the results is returned. If the statement returns no rows, an empty string is returned. If the statement throws an error, the error message is returned. """try:returnself.run(command,fetch,parameters=parameters,execution_options=execution_options,include_columns=include_columns,)exceptSQLAlchemyErrorase:"""Format the error message"""returnf"Error: {e}"
[docs]defget_context(self)->Dict[str,Any]:"""Return db context that you may want in agent prompt."""table_names=list(self.get_usable_table_names())table_info=self.get_table_info_no_throw()return{"table_info":table_info,"table_names":", ".join(table_names)}