[docs]classZillizCloudPipelineRetriever(BaseRetriever):"""`Zilliz Cloud Pipeline` retriever. Parameters: pipeline_ids: A dictionary of pipeline ids. Valid keys: "ingestion", "search", "deletion". token: Zilliz Cloud's token. Defaults to "". cloud_region: The region of Zilliz Cloud's cluster. Defaults to 'gcp-us-west1'. """pipeline_ids:Dicttoken:str=""cloud_region:str="gcp-us-west1"def_get_relevant_documents(self,query:str,top_k:int=10,offset:int=0,output_fields:List=[],filter:str="",*,run_manager:CallbackManagerForRetrieverRun,)->List[Document]:""" Get documents relevant to a query. Args: query: String to find relevant documents for top_k: The number of results. Defaults to 10. offset: The number of records to skip in the search result. Defaults to 0. output_fields: The extra fields to present in output. filter: The Milvus expression to filter search results. Defaults to "". run_manager: The callbacks handler to use. Returns: List of relevant documents """if"search"inself.pipeline_ids:search_pipe_id=self.pipeline_ids.get("search")else:raiseException("A search pipeline id must be provided in pipeline_ids to ""get relevant documents.")domain=(f"https://controller.api.{self.cloud_region}.zillizcloud.com/v1/pipelines")headers={"Authorization":f"Bearer {self.token}","Accept":"application/json","Content-Type":"application/json",}url=f"{domain}/{search_pipe_id}/run"params={"data":{"query_text":query},"params":{"limit":top_k,"offset":offset,"outputFields":output_fields,"filter":filter,},}response=requests.post(url,headers=headers,json=params)ifresponse.status_code!=200:raiseRuntimeError(response.text)response_dict=response.json()ifresponse_dict["code"]!=200:raiseRuntimeError(response_dict)response_data=response_dict["data"]search_results=response_data["result"]return[Document(page_content=result.pop("text")if"text"inresultelseresult.pop("chunk_text"),metadata=result,)forresultinsearch_results]
[docs]defadd_texts(self,texts:List[str],metadata:Optional[Dict[str,Any]]=None)->Dict:""" Add documents to store. Only supported by a text ingestion pipeline in Zilliz Cloud. Args: texts: A list of text strings. metadata: A key-value dictionary of metadata will be inserted as preserved fields required by ingestion pipeline. Defaults to None. """if"ingestion"inself.pipeline_ids:ingeset_pipe_id=self.pipeline_ids.get("ingestion")else:raiseException("An ingestion pipeline id must be provided in pipeline_ids to"" add documents.")domain=(f"https://controller.api.{self.cloud_region}.zillizcloud.com/v1/pipelines")headers={"Authorization":f"Bearer {self.token}","Accept":"application/json","Content-Type":"application/json",}url=f"{domain}/{ingeset_pipe_id}/run"metadata={}ifmetadataisNoneelsemetadataparams={"data":{"text_list":texts}}params["data"].update(metadata)response=requests.post(url,headers=headers,json=params)ifresponse.status_code!=200:raiseException(response.text)response_dict=response.json()ifresponse_dict["code"]!=200:raiseException(response_dict)response_data=response_dict["data"]returnresponse_data
[docs]defadd_doc_url(self,doc_url:str,metadata:Optional[Dict[str,Any]]=None)->Dict:""" Add a document from url. Only supported by a document ingestion pipeline in Zilliz Cloud. Args: doc_url: A document url. metadata: A key-value dictionary of metadata will be inserted as preserved fields required by ingestion pipeline. Defaults to None. """if"ingestion"inself.pipeline_ids:ingest_pipe_id=self.pipeline_ids.get("ingestion")else:raiseException("An ingestion pipeline id must be provided in pipeline_ids to ""add documents.")domain=(f"https://controller.api.{self.cloud_region}.zillizcloud.com/v1/pipelines")headers={"Authorization":f"Bearer {self.token}","Accept":"application/json","Content-Type":"application/json",}url=f"{domain}/{ingest_pipe_id}/run"params={"data":{"doc_url":doc_url}}metadata={}ifmetadataisNoneelsemetadataparams["data"].update(metadata)response=requests.post(url,headers=headers,json=params)ifresponse.status_code!=200:raiseException(response.text)response_dict=response.json()ifresponse_dict["code"]!=200:raiseException(response_dict)response_data=response_dict["data"]returnresponse_data
[docs]defdelete(self,key:str,value:Any)->Dict:""" Delete documents. Only supported by a deletion pipeline in Zilliz Cloud. Args: key: input name to run the deletion pipeline value: input value to run deletion pipeline """if"deletion"inself.pipeline_ids:deletion_pipe_id=self.pipeline_ids.get("deletion")else:raiseException("A deletion pipeline id must be provided in pipeline_ids to ""add documents.")domain=(f"https://controller.api.{self.cloud_region}.zillizcloud.com/v1/pipelines")headers={"Authorization":f"Bearer {self.token}","Accept":"application/json","Content-Type":"application/json",}url=f"{domain}/{deletion_pipe_id}/run"params={"data":{key:value}}response=requests.post(url,headers=headers,json=params)ifresponse.status_code!=200:raiseException(response.text)response_dict=response.json()ifresponse_dict["code"]!=200:raiseException(response_dict)response_data=response_dict["data"]returnresponse_data