Source code for langchain_community.utilities.powerbi
"""Wrapper around a Power BI endpoint."""from__future__importannotationsimportasyncioimportloggingimportosfromtypingimportTYPE_CHECKING,Any,Dict,Iterable,List,Optional,UnionimportaiohttpimportrequestsfromaiohttpimportClientTimeout,ServerTimeoutErrorfrompydanticimport(BaseModel,ConfigDict,Field,model_validator,)fromrequests.exceptionsimportTimeoutlogger=logging.getLogger(__name__)BASE_URL=os.getenv("POWERBI_BASE_URL","https://api.powerbi.com/v1.0/myorg")ifTYPE_CHECKING:fromazure.core.credentialsimportTokenCredential
[docs]classPowerBIDataset(BaseModel):"""Create PowerBI engine from dataset ID and credential or token. Use either the credential or a supplied token to authenticate. If both are supplied the credential is used to generate a token. The impersonated_user_name is the UPN of a user to be impersonated. If the model is not RLS enabled, this will be ignored. """dataset_id:strtable_names:List[str]group_id:Optional[str]=Nonecredential:Optional[TokenCredential]=Nonetoken:Optional[str]=Noneimpersonated_user_name:Optional[str]=Nonesample_rows_in_table_info:int=Field(default=1,gt=0,le=10)schemas:Dict[str,str]=Field(default_factory=dict)aiosession:Optional[aiohttp.ClientSession]=Nonemodel_config=ConfigDict(arbitrary_types_allowed=True,)@model_validator(mode="before")@classmethoddefvalidate_params(cls,values:Dict[str,Any])->Any:"""Validate that at least one of token and credentials is present."""table_names=values.get("table_names",[])values["table_names"]=[fix_table_name(table)fortableintable_names]if"token"invaluesor"credential"invalues:returnvaluesraiseValueError("Please provide either a credential or a token.")@propertydefrequest_url(self)->str:"""Get the request url."""ifself.group_id:returnf"{BASE_URL}/groups/{self.group_id}/datasets/{self.dataset_id}/executeQueries"# noqa: E501 # pylint: disable=C0301returnf"{BASE_URL}/datasets/{self.dataset_id}/executeQueries"# pylint: disable=C0301@propertydefheaders(self)->Dict[str,str]:"""Get the token."""ifself.token:return{"Content-Type":"application/json","Authorization":"Bearer "+self.token,}fromazure.core.exceptionsimport(ClientAuthenticationError,# pylint: disable=import-outside-toplevel)ifself.credential:try:token=self.credential.get_token("https://analysis.windows.net/powerbi/api/.default").tokenreturn{"Content-Type":"application/json","Authorization":"Bearer "+token,}exceptExceptionasexc:# pylint: disable=broad-exception-caughtraiseClientAuthenticationError("Could not get a token from the supplied credentials.")fromexcraiseClientAuthenticationError("No credential or token supplied.")
[docs]defget_table_names(self)->Iterable[str]:"""Get names of tables available."""returnself.table_names
[docs]defget_schemas(self)->str:"""Get the available schema's."""ifself.schemas:return", ".join([f"{key}: {value}"forkey,valueinself.schemas.items()])return"No known schema's yet. Use the schema_powerbi tool first."
@propertydeftable_info(self)->str:"""Information about all tables in the database."""returnself.get_table_info()def_get_tables_to_query(self,table_names:Optional[Union[List[str],str]]=None)->Optional[List[str]]:"""Get the tables names that need to be queried, after checking they exist."""iftable_namesisnotNone:if(isinstance(table_names,list)andlen(table_names)>0andtable_names[0]!=""):fixed_tables=[fix_table_name(table)fortableintable_names]non_existing_tables=[tablefortableinfixed_tablesiftablenotinself.table_names]ifnon_existing_tables:logger.warning("Table(s) %s not found in dataset.",", ".join(non_existing_tables),)tables=[tablefortableinfixed_tablesiftablenotinnon_existing_tables]returntablesiftableselseNoneifisinstance(table_names,str)andtable_names!="":iftable_namesnotinself.table_names:logger.warning("Table %s not found in dataset.",table_names)returnNonereturn[fix_table_name(table_names)]returnself.table_namesdef_get_tables_todo(self,tables_todo:List[str])->List[str]:"""Get the tables that still need to be queried."""return[tablefortableintables_todoiftablenotinself.schemas]def_get_schema_for_tables(self,table_names:List[str])->str:"""Create a string of the table schemas for the supplied tables."""schemas=[schemafortable,schemainself.schemas.items()iftableintable_names]return", ".join(schemas)
[docs]defget_table_info(self,table_names:Optional[Union[List[str],str]]=None)->str:"""Get information about specified tables."""tables_requested=self._get_tables_to_query(table_names)iftables_requestedisNone:return"No (valid) tables requested."tables_todo=self._get_tables_todo(tables_requested)fortableintables_todo:self._get_schema(table)returnself._get_schema_for_tables(tables_requested)
[docs]asyncdefaget_table_info(self,table_names:Optional[Union[List[str],str]]=None)->str:"""Get information about specified tables."""tables_requested=self._get_tables_to_query(table_names)iftables_requestedisNone:return"No (valid) tables requested."tables_todo=self._get_tables_todo(tables_requested)awaitasyncio.gather(*[self._aget_schema(table)fortableintables_todo])returnself._get_schema_for_tables(tables_requested)
def_get_schema(self,table:str)->None:"""Get the schema for a table."""try:result=self.run(f"EVALUATE TOPN({self.sample_rows_in_table_info}, {table})")self.schemas[table]=json_to_md(result["results"][0]["tables"][0]["rows"])exceptTimeout:logger.warning("Timeout while getting table info for %s",table)self.schemas[table]="unknown"exceptExceptionasexc:# pylint: disable=broad-exception-caughtlogger.warning("Error while getting table info for %s: %s",table,exc)self.schemas[table]="unknown"asyncdef_aget_schema(self,table:str)->None:"""Get the schema for a table."""try:result=awaitself.arun(f"EVALUATE TOPN({self.sample_rows_in_table_info}, {table})")self.schemas[table]=json_to_md(result["results"][0]["tables"][0]["rows"])exceptServerTimeoutError:logger.warning("Timeout while getting table info for %s",table)self.schemas[table]="unknown"exceptExceptionasexc:# pylint: disable=broad-exception-caughtlogger.warning("Error while getting table info for %s: %s",table,exc)self.schemas[table]="unknown"def_create_json_content(self,command:str)->dict[str,Any]:"""Create the json content for the request."""return{"queries":[{"query":rf"{command}"}],"impersonatedUserName":self.impersonated_user_name,"serializerSettings":{"includeNulls":True},}
[docs]defrun(self,command:str)->Any:"""Execute a DAX command and return a json representing the results."""logger.debug("Running command: %s",command)response=requests.post(self.request_url,json=self._create_json_content(command),headers=self.headers,timeout=10,)ifresponse.status_code==403:return("TokenError: Could not login to PowerBI, please check your credentials.")returnresponse.json()
[docs]asyncdefarun(self,command:str)->Any:"""Execute a DAX command and return the result asynchronously."""logger.debug("Running command: %s",command)ifself.aiosession:asyncwithself.aiosession.post(self.request_url,headers=self.headers,json=self._create_json_content(command),timeout=ClientTimeout(total=10),)asresponse:ifresponse.status==403:return"TokenError: Could not login to PowerBI, please check your credentials."# noqa: E501response_json=awaitresponse.json(content_type=response.content_type)returnresponse_jsonasyncwithaiohttp.ClientSession()assession:asyncwithsession.post(self.request_url,headers=self.headers,json=self._create_json_content(command),timeout=ClientTimeout(total=10),)asresponse:ifresponse.status==403:return"TokenError: Could not login to PowerBI, please check your credentials."# noqa: E501response_json=awaitresponse.json(content_type=response.content_type)returnresponse_json
[docs]defjson_to_md(json_contents:List[Dict[str,Union[str,int,float]]],table_name:Optional[str]=None,)->str:"""Convert a JSON object to a markdown table."""iflen(json_contents)==0:return""output_md=""headers=json_contents[0].keys()forheaderinheaders:header.replace("[",".").replace("]","")iftable_name:header.replace(f"{table_name}.","")output_md+=f"| {header} "output_md+="|\n"forrowinjson_contents:forvalueinrow.values():output_md+=f"| {value} "output_md+="|\n"returnoutput_md
[docs]deffix_table_name(table:str)->str:"""Add single quotes around table names that contain spaces."""if" "intableandnottable.startswith("'")andnottable.endswith("'"):returnf"'{table}'"returntable