[docs]defimport_google()->Tuple[Request,Credentials,ServiceCredentials]:"""Import google libraries. Returns: Tuple[Request, Credentials]: Request and Credentials classes. """try:fromgoogle.auth.transport.requestsimportRequest# noqa: F401fromgoogle.oauth2.credentialsimportCredentials# noqa: F401fromgoogle.oauth2.service_accountimportCredentialsasServiceCredentialsexceptImportError:raiseImportError("You need to install gmail dependencies to use this toolkit. ""Try running poetry install --with gmail")returnRequest,Credentials,ServiceCredentials# type: ignore[return-value]
[docs]defimport_installed_app_flow()->InstalledAppFlow:"""Import InstalledAppFlow class. Returns: InstalledAppFlow: InstalledAppFlow class. """try:fromgoogle_auth_oauthlib.flowimportInstalledAppFlowexceptImportError:raiseImportError("You need to install gmail dependencies to use this toolkit. ""Please, install bigquery dependency group: ""`pip install langchain-google-community[gmail]`")returnInstalledAppFlow
[docs]defimport_googleapiclient_resource_builder()->build_resource:"""Import googleapiclient.discovery.build function. Returns: build_resource: googleapiclient.discovery.build function. """try:fromgoogleapiclient.discoveryimportbuildexceptImportError:raiseImportError("You need to install all dependencies to use this toolkit. ""Try running pip install langchain-google-community")returnbuild
[docs]defget_gmail_credentials(token_file:Optional[str]=None,client_secrets_file:Optional[str]=None,service_account_file:Optional[str]=None,scopes:Optional[List[str]]=None,use_domain_wide:bool=False,delegated_user:Optional[str]=None,)->Credentials:"""Get credentials."""ifuse_domain_wide:_,_,ServiceCredentials=import_google()service_account_file=service_account_fileorDEFAULT_SERVICE_ACCOUNT_FILEscopes=scopesorDEFAULT_SERVICE_SCOPEScredentials=ServiceCredentials.from_service_account_file(service_account_file,scopes=scopes)ifdelegated_user:credentials=credentials.with_subject(delegated_user)returncredentialselse:# From https://developers.google.com/gmail/api/quickstart/pythonRequest,Credentials,_=import_google()InstalledAppFlow=import_installed_app_flow()creds=Nonescopes=scopesorDEFAULT_SCOPEStoken_file=token_fileorDEFAULT_CREDS_TOKEN_FILEclient_secrets_file=client_secrets_fileorDEFAULT_CLIENT_SECRETS_FILE# The file token.json stores the user's access and refresh tokens, and is# created automatically when the authorization flow completes for the first# time.ifos.path.exists(token_file):creds=Credentials.from_authorized_user_file(token_file,scopes)ifnotcredsornotcreds.valid:ifcredsandcreds.expiredandcreds.refresh_token:creds.refresh(Request())# type: ignore[call-arg]else:# https://developers.google.com/gmail/api/quickstart/python#authorize_credentials_for_a_desktop_application # noqaflow=InstalledAppFlow.from_client_secrets_file(client_secrets_file,scopes)creds=flow.run_local_server(port=0)withopen(token_file,"w")astoken:token.write(creds.to_json())returncreds
[docs]defbuild_resource_service(credentials:Optional[Credentials]=None,service_name:str="gmail",service_version:str="v1",use_domain_wide:bool=False,delegated_user:Optional[str]=None,service_account_file:Optional[str]=None,scopes:Optional[List[str]]=None,)->Resource:"""Build a Gmail service."""credentials=credentialsorget_gmail_credentials(use_domain_wide=use_domain_wide,delegated_user=delegated_user,service_account_file=service_account_file,scopes=scopes,)builder=import_googleapiclient_resource_builder()returnbuilder(service_name,service_version,credentials=credentials)
[docs]defclean_email_body(body:str)->str:"""Clean email body."""try:frombs4importBeautifulSouptry:soup=BeautifulSoup(str(body),"html.parser")body=soup.get_text()returnstr(body)exceptExceptionase:logger.error(e)returnstr(body)exceptImportError:logger.warning("BeautifulSoup not installed. Skipping cleaning.")returnstr(body)