Source code for langchain_text_splitters.markdown
from __future__ import annotations
import re
from typing import Any, Dict, List, Tuple, TypedDict, Union
from langchain_core.documents import Document
from langchain_text_splitters.base import Language
from langchain_text_splitters.character import RecursiveCharacterTextSplitter
[docs]
class MarkdownTextSplitter(RecursiveCharacterTextSplitter):
"""Attempts to split the text along Markdown-formatted headings."""
[docs]
def __init__(self, **kwargs: Any) -> None:
"""Initialize a MarkdownTextSplitter."""
separators = self.get_separators_for_language(Language.MARKDOWN)
super().__init__(separators=separators, **kwargs)
[docs]
class MarkdownHeaderTextSplitter:
"""Splitting markdown files based on specified headers."""
[docs]
def __init__(
self,
headers_to_split_on: List[Tuple[str, str]],
return_each_line: bool = False,
strip_headers: bool = True,
):
"""Create a new MarkdownHeaderTextSplitter.
Args:
headers_to_split_on: Headers we want to track
return_each_line: Return each line w/ associated headers
strip_headers: Strip split headers from the content of the chunk
"""
# Output line-by-line or aggregated into chunks w/ common headers
self.return_each_line = return_each_line
# Given the headers we want to split on,
# (e.g., "#, ##, etc") order by length
self.headers_to_split_on = sorted(
headers_to_split_on, key=lambda split: len(split[0]), reverse=True
)
# Strip headers split headers from the content of the chunk
self.strip_headers = strip_headers
[docs]
def aggregate_lines_to_chunks(self, lines: List[LineType]) -> List[Document]:
"""Combine lines with common metadata into chunks
Args:
lines: Line of text / associated header metadata
"""
aggregated_chunks: List[LineType] = []
for line in lines:
if (
aggregated_chunks
and aggregated_chunks[-1]["metadata"] == line["metadata"]
):
# If the last line in the aggregated list
# has the same metadata as the current line,
# append the current content to the last lines's content
aggregated_chunks[-1]["content"] += " \n" + line["content"]
elif (
aggregated_chunks
and aggregated_chunks[-1]["metadata"] != line["metadata"]
# may be issues if other metadata is present
and len(aggregated_chunks[-1]["metadata"]) < len(line["metadata"])
and aggregated_chunks[-1]["content"].split("\n")[-1][0] == "#"
and not self.strip_headers
):
# If the last line in the aggregated list
# has different metadata as the current line,
# and has shallower header level than the current line,
# and the last line is a header,
# and we are not stripping headers,
# append the current content to the last line's content
aggregated_chunks[-1]["content"] += " \n" + line["content"]
# and update the last line's metadata
aggregated_chunks[-1]["metadata"] = line["metadata"]
else:
# Otherwise, append the current line to the aggregated list
aggregated_chunks.append(line)
return [
Document(page_content=chunk["content"], metadata=chunk["metadata"])
for chunk in aggregated_chunks
]
[docs]
def split_text(self, text: str) -> List[Document]:
"""Split markdown file
Args:
text: Markdown file"""
# Split the input text by newline character ("\n").
lines = text.split("\n")
# Final output
lines_with_metadata: List[LineType] = []
# Content and metadata of the chunk currently being processed
current_content: List[str] = []
current_metadata: Dict[str, str] = {}
# Keep track of the nested header structure
# header_stack: List[Dict[str, Union[int, str]]] = []
header_stack: List[HeaderType] = []
initial_metadata: Dict[str, str] = {}
in_code_block = False
opening_fence = ""
for line in lines:
stripped_line = line.strip()
# Remove all non-printable characters from the string, keeping only visible
# text.
stripped_line = "".join(filter(str.isprintable, stripped_line))
if not in_code_block:
# Exclude inline code spans
if stripped_line.startswith("```") and stripped_line.count("```") == 1:
in_code_block = True
opening_fence = "```"
elif stripped_line.startswith("~~~"):
in_code_block = True
opening_fence = "~~~"
else:
if stripped_line.startswith(opening_fence):
in_code_block = False
opening_fence = ""
if in_code_block:
current_content.append(stripped_line)
continue
# Check each line against each of the header types (e.g., #, ##)
for sep, name in self.headers_to_split_on:
# Check if line starts with a header that we intend to split on
if stripped_line.startswith(sep) and (
# Header with no text OR header is followed by space
# Both are valid conditions that sep is being used a header
len(stripped_line) == len(sep) or stripped_line[len(sep)] == " "
):
# Ensure we are tracking the header as metadata
if name is not None:
# Get the current header level
current_header_level = sep.count("#")
# Pop out headers of lower or same level from the stack
while (
header_stack
and header_stack[-1]["level"] >= current_header_level
):
# We have encountered a new header
# at the same or higher level
popped_header = header_stack.pop()
# Clear the metadata for the
# popped header in initial_metadata
if popped_header["name"] in initial_metadata:
initial_metadata.pop(popped_header["name"])
# Push the current header to the stack
header: HeaderType = {
"level": current_header_level,
"name": name,
"data": stripped_line[len(sep) :].strip(),
}
header_stack.append(header)
# Update initial_metadata with the current header
initial_metadata[name] = header["data"]
# Add the previous line to the lines_with_metadata
# only if current_content is not empty
if current_content:
lines_with_metadata.append(
{
"content": "\n".join(current_content),
"metadata": current_metadata.copy(),
}
)
current_content.clear()
if not self.strip_headers:
current_content.append(stripped_line)
break
else:
if stripped_line:
current_content.append(stripped_line)
elif current_content:
lines_with_metadata.append(
{
"content": "\n".join(current_content),
"metadata": current_metadata.copy(),
}
)
current_content.clear()
current_metadata = initial_metadata.copy()
if current_content:
lines_with_metadata.append(
{"content": "\n".join(current_content), "metadata": current_metadata}
)
# lines_with_metadata has each line with associated header metadata
# aggregate these into chunks based on common metadata
if not self.return_each_line:
return self.aggregate_lines_to_chunks(lines_with_metadata)
else:
return [
Document(page_content=chunk["content"], metadata=chunk["metadata"])
for chunk in lines_with_metadata
]
[docs]
class LineType(TypedDict):
"""Line type as typed dict."""
metadata: Dict[str, str]
content: str
[docs]
class ExperimentalMarkdownSyntaxTextSplitter:
"""
An experimental text splitter for handling Markdown syntax.
This splitter aims to retain the exact whitespace of the original text while
extracting structured metadata, such as headers. It is a re-implementation of the
MarkdownHeaderTextSplitter with notable changes to the approach and
additional features.
Key Features:
- Retains the original whitespace and formatting of the Markdown text.
- Extracts headers, code blocks, and horizontal rules as metadata.
- Splits out code blocks and includes the language in the "Code" metadata key.
- Splits text on horizontal rules (`---`) as well.
- Defaults to sensible splitting behavior, which can be overridden using the
`headers_to_split_on` parameter.
Parameters:
----------
headers_to_split_on : List[Tuple[str, str]], optional
Headers to split on, defaulting to common Markdown headers if not specified.
return_each_line : bool, optional
When set to True, returns each line as a separate chunk. Default is False.
Usage example:
--------------
>>> headers_to_split_on = [
>>> ("#", "Header 1"),
>>> ("##", "Header 2"),
>>> ]
>>> splitter = ExperimentalMarkdownSyntaxTextSplitter(
>>> headers_to_split_on=headers_to_split_on
>>> )
>>> chunks = splitter.split(text)
>>> for chunk in chunks:
>>> print(chunk)
This class is currently experimental and subject to change based on feedback and
further development.
"""
DEFAULT_HEADER_KEYS = {
"#": "Header 1",
"##": "Header 2",
"###": "Header 3",
"####": "Header 4",
"#####": "Header 5",
"######": "Header 6",
}
[docs]
def __init__(
self,
headers_to_split_on: Union[List[Tuple[str, str]], None] = None,
return_each_line: bool = False,
strip_headers: bool = True,
):
self.chunks: List[Document] = []
self.current_chunk = Document(page_content="")
self.current_header_stack: List[Tuple[int, str]] = []
self.strip_headers = strip_headers
if headers_to_split_on:
self.splittable_headers = dict(headers_to_split_on)
else:
self.splittable_headers = self.DEFAULT_HEADER_KEYS
self.return_each_line = return_each_line
[docs]
def split_text(self, text: str) -> List[Document]:
raw_lines = text.splitlines(keepends=True)
while raw_lines:
raw_line = raw_lines.pop(0)
header_match = self._match_header(raw_line)
code_match = self._match_code(raw_line)
horz_match = self._match_horz(raw_line)
if header_match:
self._complete_chunk_doc()
if not self.strip_headers:
self.current_chunk.page_content += raw_line
# add the header to the stack
header_depth = len(header_match.group(1))
header_text = header_match.group(2)
self._resolve_header_stack(header_depth, header_text)
elif code_match:
self._complete_chunk_doc()
self.current_chunk.page_content = self._resolve_code_chunk(
raw_line, raw_lines
)
self.current_chunk.metadata["Code"] = code_match.group(1)
self._complete_chunk_doc()
elif horz_match:
self._complete_chunk_doc()
else:
self.current_chunk.page_content += raw_line
self._complete_chunk_doc()
# I don't see why `return_each_line` is a necessary feature of this splitter.
# It's easy enough to to do outside of the class and the caller can have more
# control over it.
if self.return_each_line:
return [
Document(page_content=line, metadata=chunk.metadata)
for chunk in self.chunks
for line in chunk.page_content.splitlines()
if line and not line.isspace()
]
return self.chunks
def _resolve_header_stack(self, header_depth: int, header_text: str) -> None:
for i, (depth, _) in enumerate(self.current_header_stack):
if depth == header_depth:
self.current_header_stack[i] = (header_depth, header_text)
self.current_header_stack = self.current_header_stack[: i + 1]
return
self.current_header_stack.append((header_depth, header_text))
def _resolve_code_chunk(self, current_line: str, raw_lines: List[str]) -> str:
chunk = current_line
while raw_lines:
raw_line = raw_lines.pop(0)
chunk += raw_line
if self._match_code(raw_line):
return chunk
return ""
def _complete_chunk_doc(self) -> None:
chunk_content = self.current_chunk.page_content
# Discard any empty documents
if chunk_content and not chunk_content.isspace():
# Apply the header stack as metadata
for depth, value in self.current_header_stack:
header_key = self.splittable_headers.get("#" * depth)
self.current_chunk.metadata[header_key] = value
self.chunks.append(self.current_chunk)
# Reset the current chunk
self.current_chunk = Document(page_content="")
# Match methods
def _match_header(self, line: str) -> Union[re.Match, None]:
match = re.match(r"^(#{1,6}) (.*)", line)
# Only matches on the configured headers
if match and match.group(1) in self.splittable_headers:
return match
return None
def _match_code(self, line: str) -> Union[re.Match, None]:
matches = [re.match(rule, line) for rule in [r"^```(.*)", r"^~~~(.*)"]]
return next((match for match in matches if match), None)
def _match_horz(self, line: str) -> Union[re.Match, None]:
matches = [
re.match(rule, line) for rule in [r"^\*\*\*+\n", r"^---+\n", r"^___+\n"]
]
return next((match for match in matches if match), None)