[docs]@staticmethoddefgenerate_object_id(input_string:Union[str,None]=None)->str:out_length=32# output lengthhash_len=8# hash value lengthifinput_stringisNone:input_string="".join(random.choices("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789",k=16,))# timestamptimestamp=int(time.time())timestamp_bin=struct.pack(">I",timestamp)# 4 bytes# hash_valuehashval_bin=hashlib.sha256(input_string.encode()).digest()hashval_bin=hashval_bin[:hash_len]# 8 bytes# countercounter_bin=struct.pack(">I",random.getrandbits(32))# 4 bytes# binary object idobject_id=timestamp_bin+hashval_bin+counter_bin# 16 bytesobject_id_hex=object_id.hex()# 32 bytesobject_id_hex=object_id_hex.zfill(out_length)# fill with zeros if less than 32 bytesobject_id_hex=object_id_hex[:out_length]returnobject_id_hex
[docs]@staticmethoddefread_file(conn:Connection,file_path:str,params:dict)->Union[Document,None]:"""Read a file using OracleReader Args: conn: Oracle Connection, file_path: Oracle Directory, params: ONNX file name. Returns: Plain text and metadata as Langchain Document. """metadata:Dict[str,Any]={}try:importoracledbexceptImportErrorase:raiseImportError("Unable to import oracledb, please install with ""`pip install -U oracledb`.")frometry:oracledb.defaults.fetch_lobs=Falsecursor=conn.cursor()withopen(file_path,"rb")asf:data=f.read()ifdataisNone:returnDocument(page_content="",metadata=metadata)mdata=cursor.var(oracledb.DB_TYPE_CLOB)text=cursor.var(oracledb.DB_TYPE_CLOB)cursor.execute(""" declare input blob; begin input := :blob; :mdata := dbms_vector_chain.utl_to_text(input, json(:pref)); :text := dbms_vector_chain.utl_to_text(input); end;""",blob=data,pref=json.dumps(params),mdata=mdata,text=text,)cursor.close()ifmdataisNone:metadata={}else:doc_data=str(mdata.getvalue())ifdoc_data.startswith("<!DOCTYPE html")ordoc_data.startswith("<HTML>"):p=ParseOracleDocMetadata()p.feed(doc_data)metadata=p.get_metadata()doc_id=OracleDocReader.generate_object_id(conn.username+"$"+file_path)metadata["_oid"]=doc_idmetadata["_file"]=file_pathiftextisNone:returnDocument(page_content="",metadata=metadata)else:returnDocument(page_content=str(text.getvalue()),metadata=metadata)exceptExceptionasex:logger.info(f"An exception occurred :: {ex}")logger.info(f"Skip processing {file_path}")cursor.close()returnNone
[docs]defload(self)->List[Document]:"""Load data into LangChain Document objects..."""try:importoracledbexceptImportErrorase:raiseImportError("Unable to import oracledb, please install with ""`pip install -U oracledb`.")fromencols=0results:List[Document]=[]metadata:Dict[str,Any]={}m_params={"plaintext":"false"}try:# extract the parametersifself.paramsisnotNone:self.file=self.params.get("file")self.dir=self.params.get("dir")self.owner=self.params.get("owner")self.tablename=self.params.get("tablename")self.colname=self.params.get("colname")else:raiseException("Missing loader parameters")oracledb.defaults.fetch_lobs=Falseifself.file:doc=OracleDocReader.read_file(self.conn,self.file,m_params)ifdocisNone:returnresultsresults.append(doc)ifself.dir:skip_count=0forfile_nameinos.listdir(self.dir):file_path=os.path.join(self.dir,file_name)ifos.path.isfile(file_path):doc=OracleDocReader.read_file(self.conn,file_path,m_params)ifdocisNone:skip_count=skip_count+1logger.info(f"Total skipped: {skip_count}\n")else:results.append(doc)ifself.tablename:try:ifself.ownerisNoneorself.colnameisNone:raiseException("Missing owner or column name or both.")cursor=self.conn.cursor()self.mdata_cols=self.params.get("mdata_cols")ifself.mdata_colsisnotNone:iflen(self.mdata_cols)>3:raiseException("Exceeds the max number of columns "+"you can request for metadata.")# execute a query to get column data typessql=("select column_name, data_type from all_tab_columns "+"where owner = :ownername and "+"table_name = :tablename")cursor.execute(sql,ownername=self.owner.upper(),tablename=self.tablename.upper(),)# cursor.execute(sql)rows=cursor.fetchall()forrowinrows:ifrow[0]inself.mdata_cols:ifrow[1]notin["NUMBER","BINARY_DOUBLE","BINARY_FLOAT","LONG","DATE","TIMESTAMP","VARCHAR2",]:raiseException("The datatype for the column requested "+"for metadata is not supported.")self.mdata_cols_sql=", rowid"ifself.mdata_colsisnotNone:forcolinself.mdata_cols:self.mdata_cols_sql=self.mdata_cols_sql+", "+col# [TODO] use bind variablessql=("select dbms_vector_chain.utl_to_text(t."+self.colname+", json('"+json.dumps(m_params)+"')) mdata, dbms_vector_chain.utl_to_text(t."+self.colname+") text"+self.mdata_cols_sql+" from "+self.owner+"."+self.tablename+" t")cursor.execute(sql)forrowincursor:metadata={}ifrowisNone:doc_id=OracleDocReader.generate_object_id(self.conn.username+"$"+self.owner+"$"+self.tablename+"$"+self.colname)metadata["_oid"]=doc_idresults.append(Document(page_content="",metadata=metadata))else:ifrow[0]isnotNone:data=str(row[0])ifdata.startswith("<!DOCTYPE html")ordata.startswith("<HTML>"):p=ParseOracleDocMetadata()p.feed(data)metadata=p.get_metadata()doc_id=OracleDocReader.generate_object_id(self.conn.username+"$"+self.owner+"$"+self.tablename+"$"+self.colname+"$"+str(row[2]))metadata["_oid"]=doc_idmetadata["_rowid"]=row[2]# process projected metadata colsifself.mdata_colsisnotNone:ncols=len(self.mdata_cols)foriinrange(0,ncols):metadata[self.mdata_cols[i]]=row[i+2]ifrow[1]isNone:results.append(Document(page_content="",metadata=metadata))else:results.append(Document(page_content=str(row[1]),metadata=metadata))exceptExceptionasex:logger.info(f"An exception occurred :: {ex}")traceback.print_exc()cursor.close()raisereturnresultsexceptExceptionasex:logger.info(f"An exception occurred :: {ex}")traceback.print_exc()raise
[docs]classOracleTextSplitter(TextSplitter):"""Splitting text using Oracle chunker."""
[docs]def__init__(self,conn:Connection,params:Dict[str,Any],**kwargs:Any)->None:"""Initialize."""self.conn=connself.params=paramssuper().__init__(**kwargs)try:importjsontry:importoracledbexceptImportErrorase:raiseImportError("Unable to import oracledb, please install with ""`pip install -U oracledb`.")fromeself._oracledb=oracledbself._json=jsonexceptImportError:raiseImportError("oracledb or json or both are not installed. "+"Please install them. "+"Recommendations: `pip install oracledb`. ")
[docs]defsplit_text(self,text:str)->List[str]:"""Split incoming text and return chunks."""try:importoracledbexceptImportErrorase:raiseImportError("Unable to import oracledb, please install with ""`pip install -U oracledb`.")fromesplits=[]try:# returns strings or bytes instead of a locatorself._oracledb.defaults.fetch_lobs=Falsecursor=self.conn.cursor()cursor.setinputsizes(content=oracledb.CLOB)cursor.execute("select t.column_value from "+"dbms_vector_chain.utl_to_chunks(:content, json(:params)) t",content=text,params=self._json.dumps(self.params),)whileTrue:row=cursor.fetchone()ifrowisNone:breakd=self._json.loads(row[0])splits.append(d["chunk_data"])returnsplitsexceptExceptionasex:logger.info(f"An exception occurred :: {ex}")traceback.print_exc()raise