Source code for langchain_community.utilities.asknews
"""Util that calls AskNews api."""from__future__importannotationsfromdatetimeimportdatetime,timedeltafromtypingimportAny,Dict,Optionalfromlangchain_core.utilsimportget_from_dict_or_envfrompydanticimportBaseModel,ConfigDict,model_validator
[docs]classAskNewsAPIWrapper(BaseModel):"""Wrapper for AskNews API."""asknews_sync:Any=None#: :meta private:asknews_async:Any=None#: :meta private:asknews_client_id:Optional[str]=None"""Client ID for the AskNews API."""asknews_client_secret:Optional[str]=None"""Client Secret for the AskNews API."""model_config=ConfigDict(extra="forbid",)@model_validator(mode="before")@classmethoddefvalidate_environment(cls,values:Dict)->Any:"""Validate that api credentials and python package exists in environment."""asknews_client_id=get_from_dict_or_env(values,"asknews_client_id","ASKNEWS_CLIENT_ID")asknews_client_secret=get_from_dict_or_env(values,"asknews_client_secret","ASKNEWS_CLIENT_SECRET")try:importasknews_sdkexceptImportError:raiseImportError("AskNews python package not found. ""Please install it with `pip install asknews`.")an_sync=asknews_sdk.AskNewsSDK(client_id=asknews_client_id,client_secret=asknews_client_secret,scopes=["news"],)an_async=asknews_sdk.AsyncAskNewsSDK(client_id=asknews_client_id,client_secret=asknews_client_secret,scopes=["news"],)values["asknews_sync"]=an_syncvalues["asknews_async"]=an_asyncvalues["asknews_client_id"]=asknews_client_idvalues["asknews_client_secret"]=asknews_client_secretreturnvalues
[docs]defsearch_news(self,query:str,max_results:int=10,hours_back:int=0)->str:"""Search news in AskNews API synchronously."""ifhours_back>48:method="kw"historical=Truestart=int((datetime.now()-timedelta(hours=hours_back)).timestamp())stop=int(datetime.now().timestamp())else:historical=Falsemethod="nl"start=Nonestop=Noneresponse=self.asknews_sync.news.search_news(query=query,n_articles=max_results,method=method,historical=historical,start_timestamp=start,end_timestamp=stop,return_type="string",)returnresponse.as_string
[docs]asyncdefasearch_news(self,query:str,max_results:int=10,hours_back:int=0)->str:"""Search news in AskNews API asynchronously."""ifhours_back>48:method="kw"historical=Truestart=int((datetime.now()-timedelta(hours=hours_back)).timestamp())stop=int(datetime.now().timestamp())else:historical=Falsemethod="nl"start=Nonestop=Noneresponse=awaitself.asknews_async.news.search_news(query=query,n_articles=max_results,method=method,historical=historical,start_timestamp=start,end_timestamp=stop,return_type="string",)returnresponse.as_string