[docs]defconcatenate_rows(row:dict)->str:"""Combine message information in a readable format ready to be used."""date=row["date"]sender=row["from"]text=row["text"]returnf"{sender} on {date}: {text}\n\n"
[docs]classTelegramChatFileLoader(BaseLoader):"""Load from `Telegram chat` dump."""
[docs]def__init__(self,path:Union[str,Path]):"""Initialize with a path."""self.file_path=path
[docs]deftext_to_docs(text:Union[str,List[str]])->List[Document]:"""Convert a string or list of strings to a list of Documents with metadata."""fromlangchain_text_splittersimportRecursiveCharacterTextSplittertext_splitter=RecursiveCharacterTextSplitter(chunk_size=800,separators=["\n\n","\n",".","!","?",","," ",""],chunk_overlap=20,)ifisinstance(text,str):# Take a single string as one pagetext=[text]page_docs=[Document(page_content=page)forpageintext]# Add page numbers as metadatafori,docinenumerate(page_docs):doc.metadata["page"]=i+1# Split pages into chunksdoc_chunks=[]fordocinpage_docs:chunks=text_splitter.split_text(doc.page_content)fori,chunkinenumerate(chunks):doc=Document(page_content=chunk,metadata={"page":doc.metadata["page"],"chunk":i})# Add sources a metadatadoc.metadata["source"]=f"{doc.metadata['page']}-{doc.metadata['chunk']}"doc_chunks.append(doc)returndoc_chunks
[docs]def__init__(self,chat_entity:Optional[EntityLike]=None,api_id:Optional[int]=None,api_hash:Optional[str]=None,username:Optional[str]=None,file_path:str="telegram_data.json",):"""Initialize with API parameters. Args: chat_entity: The chat entity to fetch data from. api_id: The API ID. api_hash: The API hash. username: The username. file_path: The file path to save the data to. Defaults to "telegram_data.json". """self.chat_entity=chat_entityself.api_id=api_idself.api_hash=api_hashself.username=usernameself.file_path=file_path
[docs]asyncdeffetch_data_from_telegram(self)->None:"""Fetch data from Telegram API and save it as a JSON file."""fromtelethon.syncimportTelegramClientdata=[]asyncwithTelegramClient(self.username,self.api_id,self.api_hash)asclient:asyncformessageinclient.iter_messages(self.chat_entity):is_reply=message.reply_toisnotNonereply_to_id=message.reply_to.reply_to_msg_idifis_replyelseNonedata.append({"sender_id":message.sender_id,"text":message.text,"date":message.date.isoformat(),"message.id":message.id,"is_reply":is_reply,"reply_to_id":reply_to_id,})withopen(self.file_path,"w",encoding="utf-8")asf:json.dump(data,f,ensure_ascii=False,indent=4)
def_get_message_threads(self,data:pd.DataFrame)->dict:"""Create a dictionary of message threads from the given data. Args: data (pd.DataFrame): A DataFrame containing the conversation \ data with columns: - message.sender_id - text - date - message.id - is_reply - reply_to_id Returns: dict: A dictionary where the key is the parent message ID and \ the value is a list of message IDs in ascending order. """deffind_replies(parent_id:int,reply_data:pd.DataFrame)->List[int]:""" Recursively find all replies to a given parent message ID. Args: parent_id (int): The parent message ID. reply_data (pd.DataFrame): A DataFrame containing reply messages. Returns: list: A list of message IDs that are replies to the parent message ID. """# Find direct replies to the parent message IDdirect_replies=reply_data[reply_data["reply_to_id"]==parent_id]["message.id"].tolist()# Recursively find replies to the direct repliesall_replies=[]forreply_idindirect_replies:all_replies+=[reply_id]+find_replies(reply_id,reply_data)returnall_replies# Filter out parent messagesparent_messages=data[~data["is_reply"]]# Filter out reply messages and drop rows with NaN in 'reply_to_id'reply_messages=data[data["is_reply"]].dropna(subset=["reply_to_id"])# Convert 'reply_to_id' to integerreply_messages["reply_to_id"]=reply_messages["reply_to_id"].astype(int)# Create a dictionary of message threads with parent message IDs as keys and \# lists of reply message IDs as valuesmessage_threads={parent_id:[parent_id]+find_replies(parent_id,reply_messages)forparent_idinparent_messages["message.id"]}returnmessage_threadsdef_combine_message_texts(self,message_threads:Dict[int,List[int]],data:pd.DataFrame)->str:""" Combine the message texts for each parent message ID based \ on the list of message threads. Args: message_threads (dict): A dictionary where the key is the parent message \ ID and the value is a list of message IDs in ascending order. data (pd.DataFrame): A DataFrame containing the conversation data: - message.sender_id - text - date - message.id - is_reply - reply_to_id Returns: str: A combined string of message texts sorted by date. """combined_text=""# Iterate through sorted parent message IDsforparent_id,message_idsinmessage_threads.items():# Get the message texts for the message IDs and sort them by datemessage_texts=(data[data["message.id"].isin(message_ids)].sort_values(by="date")["text"].tolist())message_texts=[str(elem)foreleminmessage_texts]# Combine the message textscombined_text+=" ".join(message_texts)+".\n"returncombined_text.strip()
[docs]defload(self)->List[Document]:"""Load documents."""ifself.chat_entityisnotNone:try:importnest_asyncionest_asyncio.apply()asyncio.run(self.fetch_data_from_telegram())exceptImportError:raiseImportError("""`nest_asyncio` package not found. please install with `pip install nest_asyncio` """)p=Path(self.file_path)withopen(p,encoding="utf8")asf:d=json.load(f)try:importpandasaspdexceptImportError:raiseImportError("""`pandas` package not found. please install with `pip install pandas` """)normalized_messages=pd.json_normalize(d)df=pd.DataFrame(normalized_messages)message_threads=self._get_message_threads(df)combined_texts=self._combine_message_texts(message_threads,df)returntext_to_docs(combined_texts)
# For backwards compatibilityTelegramChatLoader=TelegramChatFileLoader