Source code for langchain_community.tools.searx_search.tool
"""Tool for the SearxNG search API."""fromtypingimportOptional,Typefromlangchain_core.callbacksimport(AsyncCallbackManagerForToolRun,CallbackManagerForToolRun,)fromlangchain_core.toolsimportBaseToolfrompydanticimportBaseModel,ConfigDict,Fieldfromlangchain_community.utilities.searx_searchimportSearxSearchWrapper
[docs]classSearxSearchQueryInput(BaseModel):"""Input for the SearxSearch tool."""query:str=Field(description="query to look up on searx")
[docs]classSearxSearchRun(BaseTool):# type: ignore[override, override]"""Tool that queries a Searx instance."""name:str="searx_search"description:str=("A meta search engine.""Useful for when you need to answer questions about current events.""Input should be a search query.")wrapper:SearxSearchWrapperkwargs:dict=Field(default_factory=dict)args_schema:Type[BaseModel]=SearxSearchQueryInputdef_run(self,query:str,run_manager:Optional[CallbackManagerForToolRun]=None,)->str:"""Use the tool."""returnself.wrapper.run(query,**self.kwargs)asyncdef_arun(self,query:str,run_manager:Optional[AsyncCallbackManagerForToolRun]=None,)->str:"""Use the tool asynchronously."""returnawaitself.wrapper.arun(query,**self.kwargs)
[docs]classSearxSearchResults(BaseTool):# type: ignore[override, override]"""Tool that queries a Searx instance and gets back json."""name:str="searx_search_results"description:str=("A meta search engine.""Useful for when you need to answer questions about current events.""Input should be a search query. Output is a JSON array of the query results")wrapper:SearxSearchWrappernum_results:int=4kwargs:dict=Field(default_factory=dict)args_schema:Type[BaseModel]=SearxSearchQueryInputmodel_config=ConfigDict(extra="allow",)def_run(self,query:str,run_manager:Optional[CallbackManagerForToolRun]=None,)->str:"""Use the tool."""returnstr(self.wrapper.results(query,self.num_results,**self.kwargs))asyncdef_arun(self,query:str,run_manager:Optional[AsyncCallbackManagerForToolRun]=None,)->str:"""Use the tool asynchronously."""return(awaitself.wrapper.aresults(query,self.num_results,**self.kwargs)).__str__()