Source code for langchain_community.tools.nuclia.tool
"""Tool for the Nuclia Understanding API.Installation:```bash pip install --upgrade protobuf pip install nucliadb-protos```"""importasyncioimportbase64importloggingimportmimetypesimportosfromtypingimportAny,Dict,Optional,Type,Unionimportrequestsfromlangchain_core.callbacksimport(AsyncCallbackManagerForToolRun,CallbackManagerForToolRun,)fromlangchain_core.toolsimportBaseToolfrompydanticimportBaseModel,Fieldlogger=logging.getLogger(__name__)
[docs]classNUASchema(BaseModel):"""Input for Nuclia Understanding API. Attributes: action: Action to perform. Either `push` or `pull`. id: ID of the file to push or pull. path: Path to the file to push (needed only for `push` action). text: Text content to process (needed only for `push` action). """action:str=Field(...,description="Action to perform. Either `push` or `pull`.",)id:str=Field(...,description="ID of the file to push or pull.",)path:Optional[str]=Field(...,description="Path to the file to push (needed only for `push` action).",)text:Optional[str]=Field(...,description="Text content to process (needed only for `push` action).",)
[docs]classNucliaUnderstandingAPI(BaseTool):# type: ignore[override, override]"""Tool to process files with the Nuclia Understanding API."""name:str="nuclia_understanding_api"description:str=("A wrapper around Nuclia Understanding API endpoints. ""Useful for when you need to extract text from any kind of files. ")args_schema:Type[BaseModel]=NUASchema_results:Dict[str,Any]={}_config:Dict[str,Any]={}def__init__(self,enable_ml:bool=False)->None:zone=os.environ.get("NUCLIA_ZONE","europe-1")self._config["BACKEND"]=f"https://{zone}.nuclia.cloud/api/v1"key=os.environ.get("NUCLIA_NUA_KEY")ifnotkey:raiseValueError("NUCLIA_NUA_KEY environment variable not set")else:self._config["NUA_KEY"]=keyself._config["enable_ml"]=enable_mlsuper().__init__()# type: ignore[call-arg]def_run(self,action:str,id:str,path:Optional[str],text:Optional[str],run_manager:Optional[CallbackManagerForToolRun]=None,)->str:"""Use the tool."""ifaction=="push":self._check_params(path,text)ifpath:returnself._pushFile(id,path)iftext:returnself._pushText(id,text)elifaction=="pull":returnself._pull(id)return""asyncdef_arun(self,action:str,id:str,path:Optional[str]=None,text:Optional[str]=None,run_manager:Optional[AsyncCallbackManagerForToolRun]=None,)->str:"""Use the tool asynchronously."""self._check_params(path,text)ifpath:self._pushFile(id,path)iftext:self._pushText(id,text)data=NonewhileTrue:data=self._pull(id)ifdata:breakawaitasyncio.sleep(15)returndatadef_pushText(self,id:str,text:str)->str:field={"textfield":{"text":{"body":text,"format":0}},"processing_options":{"ml_text":self._config["enable_ml"]},}returnself._pushField(id,field)def_pushFile(self,id:str,content_path:str)->str:withopen(content_path,"rb")assource_file:response=requests.post(self._config["BACKEND"]+"/processing/upload",headers={"content-type":mimetypes.guess_type(content_path)[0]or"application/octet-stream","x-stf-nuakey":"Bearer "+self._config["NUA_KEY"],},data=source_file.read(),)ifresponse.status_code!=200:logger.info(f"Error uploading {content_path}: "f"{response.status_code}{response.text}")return""else:field={"filefield":{"file":f"{response.text}"},"processing_options":{"ml_text":self._config["enable_ml"]},}returnself._pushField(id,field)def_pushField(self,id:str,field:Any)->str:logger.info(f"Pushing {id} in queue")response=requests.post(self._config["BACKEND"]+"/processing/push",headers={"content-type":"application/json","x-stf-nuakey":"Bearer "+self._config["NUA_KEY"],},json=field,)ifresponse.status_code!=200:logger.info(f"Error pushing field {id}:{response.status_code}{response.text}")raiseValueError("Error pushing field")else:uuid=response.json()["uuid"]logger.info(f"Field {id} pushed in queue, uuid: {uuid}")self._results[id]={"uuid":uuid,"status":"pending"}returnuuiddef_pull(self,id:str)->str:self._pull_queue()result=self._results.get(id,None)ifnotresult:logger.info(f"{id} not in queue")return""elifresult["status"]=="pending":logger.info(f"Waiting for {result['uuid']} to be processed")return""else:returnresult["data"]def_pull_queue(self)->None:try:fromnucliadb_protos.writer_pb2importBrokerMessageexceptImportErrorase:raiseImportError("nucliadb-protos is not installed. ""Run `pip install nucliadb-protos` to install.")frometry:fromgoogle.protobuf.json_formatimportMessageToJsonexceptImportErrorase:raiseImportError("Unable to import google.protobuf, please install with ""`pip install protobuf`.")fromeres=requests.get(self._config["BACKEND"]+"/processing/pull",headers={"x-stf-nuakey":"Bearer "+self._config["NUA_KEY"],},).json()ifres["status"]=="empty":logger.info("Queue empty")elifres["status"]=="ok":payload=res["payload"]pb=BrokerMessage()pb.ParseFromString(base64.b64decode(payload))uuid=pb.uuidlogger.info(f"Pulled {uuid} from queue")matching_id=self._find_matching_id(uuid)ifnotmatching_id:logger.info(f"No matching id for {uuid}")else:self._results[matching_id]["status"]="done"data=MessageToJson(pb,preserving_proto_field_name=True,including_default_value_fields=True,# type: ignore)self._results[matching_id]["data"]=datadef_find_matching_id(self,uuid:str)->Union[str,None]:forid,resultinself._results.items():ifresult["uuid"]==uuid:returnidreturnNonedef_check_params(self,path:Optional[str],text:Optional[str])->None:ifnotpathandnottext:raiseValueError("File path or text is required")ifpathandtext:raiseValueError("Cannot process both file and text on a single run")