[docs]classNotionDBLoader(BaseLoader):"""Load from `Notion DB`. Reads content from pages within a Notion Database. Args: integration_token (str): Notion integration token. database_id (str): Notion database id. request_timeout_sec (int): Timeout for Notion requests in seconds. Defaults to 10. filter_object (Dict[str, Any]): Filter object used to limit returned entries based on specified criteria. E.g.: { "timestamp": "last_edited_time", "last_edited_time": { "on_or_after": "2024-02-07" } } -> will only return entries that were last edited on or after 2024-02-07 Notion docs: https://developers.notion.com/reference/post-database-query-filter Defaults to None, which will return ALL entries. """
[docs]def__init__(self,integration_token:str,database_id:str,request_timeout_sec:Optional[int]=10,*,filter_object:Optional[Dict[str,Any]]=None,)->None:"""Initialize with parameters."""ifnotintegration_token:raiseValueError("integration_token must be provided")ifnotdatabase_id:raiseValueError("database_id must be provided")self.token=integration_tokenself.database_id=database_idself.headers={"Authorization":"Bearer "+self.token,"Content-Type":"application/json","Notion-Version":"2022-06-28",}self.request_timeout_sec=request_timeout_secself.filter_object=filter_objector{}
[docs]defload(self)->List[Document]:"""Load documents from the Notion database. Returns: List[Document]: List of documents. """page_summaries=self._retrieve_page_summaries()returnlist(self.load_page(page_summary)forpage_summaryinpage_summaries)
def_retrieve_page_summaries(self,query_dict:Dict[str,Any]={"page_size":100})->List[Dict[str,Any]]:""" Get all the pages from a Notion database OR filter based on specified criteria. """pages:List[Dict[str,Any]]=[]whileTrue:data=self._request(DATABASE_URL.format(database_id=self.database_id),method="POST",query_dict=query_dict,filter_object=self.filter_object,)pages.extend(data.get("results"))ifnotdata.get("has_more"):breakquery_dict["start_cursor"]=data.get("next_cursor")returnpages
[docs]defload_page(self,page_summary:Dict[str,Any])->Document:"""Read a page. Args: page_summary: Page summary from Notion API. """page_id=page_summary["id"]# load properties as metadatametadata:Dict[str,Any]={}value:Anyforprop_name,prop_datainpage_summary["properties"].items():prop_type=prop_data["type"]ifprop_type=="rich_text":value=self._concatenate_rich_text(prop_data["rich_text"])elifprop_type=="title":value=self._concatenate_rich_text(prop_data["title"])elifprop_type=="multi_select":value=([item["name"]foriteminprop_data["multi_select"]]ifprop_data["multi_select"]else[])elifprop_type=="url":value=prop_data["url"]elifprop_type=="unique_id":value=(f"{prop_data['unique_id']['prefix']}-{prop_data['unique_id']['number']}"ifprop_data["unique_id"]elseNone)elifprop_type=="status":value=prop_data["status"]["name"]ifprop_data["status"]elseNoneelifprop_type=="people":value=[]ifprop_data["people"]:foriteminprop_data["people"]:name=item.get("name")ifnotname:logger.warning("Missing 'name' in 'people' property "f"for page {page_id}")value.append(name)elifprop_type=="date":value=prop_data["date"]ifprop_data["date"]elseNoneelifprop_type=="last_edited_time":value=(prop_data["last_edited_time"]ifprop_data["last_edited_time"]elseNone)elifprop_type=="created_time":value=prop_data["created_time"]ifprop_data["created_time"]elseNoneelifprop_type=="checkbox":value=prop_data["checkbox"]elifprop_type=="email":value=prop_data["email"]elifprop_type=="number":value=prop_data["number"]elifprop_type=="select":value=prop_data["select"]["name"]ifprop_data["select"]elseNoneelse:value=Nonemetadata[prop_name.lower()]=valuemetadata["id"]=page_idreturnDocument(page_content=self._load_blocks(page_id),metadata=metadata)
def_load_blocks(self,block_id:str,num_tabs:int=0)->str:"""Read a block and its children."""result_lines_arr:List[str]=[]cur_block_id:str=block_idwhilecur_block_id:data=self._request(BLOCK_URL.format(block_id=cur_block_id))forresultindata["results"]:result_obj=result[result["type"]]if"rich_text"notinresult_obj:continuecur_result_text_arr:List[str]=[]forrich_textinresult_obj["rich_text"]:if"text"inrich_text:cur_result_text_arr.append("\t"*num_tabs+rich_text["text"]["content"])ifresult["has_children"]:children_text=self._load_blocks(result["id"],num_tabs=num_tabs+1)cur_result_text_arr.append(children_text)result_lines_arr.append("\n".join(cur_result_text_arr))cur_block_id=data.get("next_cursor")return"\n".join(result_lines_arr)def_request(self,url:str,method:str="GET",query_dict:Dict[str,Any]={},*,filter_object:Optional[Dict[str,Any]]=None,)->Any:json_payload=query_dict.copy()iffilter_object:json_payload["filter"]=filter_objectres=requests.request(method,url,headers=self.headers,json=json_payload,timeout=self.request_timeout_sec,)res.raise_for_status()returnres.json()def_concatenate_rich_text(self,rich_text_array:List[Dict[str,Any]])->str:"""Concatenate all text content from a rich_text array."""return"".join(item["plain_text"]foriteminrich_text_array)