[docs]def__init__(self,lakefs_access_key:str,lakefs_secret_key:str,lakefs_endpoint:str,):self.__endpoint="/".join([lakefs_endpoint,"api","v1/"])self.__auth=HTTPBasicAuth(lakefs_access_key,lakefs_secret_key)try:health_check=requests.get(urljoin(self.__endpoint,"healthcheck"),auth=self.__auth)health_check.raise_for_status()exceptException:raiseValueError("lakeFS server isn't accessible. Make sure lakeFS is running.")
def__validate_instance(self)->None:ifself.repoisNoneorself.repo=="":raiseValueError("no repository was provided. use `set_repo` to specify a repository")ifself.refisNoneorself.ref=="":raiseValueError("no ref was provided. use `set_ref` to specify a ref")ifself.pathisNone:raiseValueError("no path was provided. use `set_path` to specify a path")
[docs]classUnstructuredLakeFSLoader(UnstructuredBaseLoader):"""Load from `lakeFS` as unstructured data."""
def_get_metadata(self)->dict:return{"repo":self.repo,"ref":self.ref,"path":self.path}def_get_elements(self)->List:fromunstructured.partition.autoimportpartitionlocal_prefix="local://"ifself.presign:withtempfile.TemporaryDirectory()astemp_dir:file_path=f"{temp_dir}/{self.path.split('/')[-1]}"os.makedirs(os.path.dirname(file_path),exist_ok=True)response=requests.get(self.url)response.raise_for_status()withopen(file_path,mode="wb")asfile:file.write(response.content)returnpartition(filename=file_path)elifnotself.url.startswith(local_prefix):raiseValueError("Non pre-signed URLs are supported only with 'local' blockstore")else:local_path=self.url[len(local_prefix):]returnpartition(filename=local_path)