from __future__ import annotations
import inspect
from collections import defaultdict
from collections.abc import Sequence
from dataclasses import dataclass, field
from enum import Enum
from typing import (
TYPE_CHECKING,
Any,
Callable,
NamedTuple,
Optional,
Protocol,
TypedDict,
Union,
overload,
)
from uuid import UUID, uuid4
from pydantic import BaseModel
from langchain_core.utils.pydantic import _IgnoreUnserializable, is_basemodel_subclass
if TYPE_CHECKING:
from langchain_core.runnables.base import Runnable as RunnableType
[docs]
class Stringifiable(Protocol):
def __str__(self) -> str: ...
[docs]
class LabelsDict(TypedDict):
"""Dictionary of labels for nodes and edges in a graph."""
nodes: dict[str, str]
"""Labels for nodes."""
edges: dict[str, str]
"""Labels for edges."""
[docs]
def is_uuid(value: str) -> bool:
"""Check if a string is a valid UUID.
Args:
value: The string to check.
Returns:
True if the string is a valid UUID, False otherwise.
"""
try:
UUID(value)
return True
except ValueError:
return False
[docs]
class Edge(NamedTuple):
"""Edge in a graph.
Parameters:
source: The source node id.
target: The target node id.
data: Optional data associated with the edge. Defaults to None.
conditional: Whether the edge is conditional. Defaults to False.
"""
source: str
target: str
data: Optional[Stringifiable] = None
conditional: bool = False
[docs]
def copy(
self, *, source: Optional[str] = None, target: Optional[str] = None
) -> Edge:
"""Return a copy of the edge with optional new source and target nodes.
Args:
source: The new source node id. Defaults to None.
target: The new target node id. Defaults to None.
Returns:
A copy of the edge with the new source and target nodes.
"""
return Edge(
source=source or self.source,
target=target or self.target,
data=self.data,
conditional=self.conditional,
)
[docs]
class Node(NamedTuple):
"""Node in a graph.
Parameters:
id: The unique identifier of the node.
name: The name of the node.
data: The data of the node.
metadata: Optional metadata for the node. Defaults to None.
"""
id: str
name: str
data: Union[type[BaseModel], RunnableType]
metadata: Optional[dict[str, Any]]
[docs]
def copy(self, *, id: Optional[str] = None, name: Optional[str] = None) -> Node:
"""Return a copy of the node with optional new id and name.
Args:
id: The new node id. Defaults to None.
name: The new node name. Defaults to None.
Returns:
A copy of the node with the new id and name.
"""
return Node(
id=id or self.id,
name=name or self.name,
data=self.data,
metadata=self.metadata,
)
[docs]
class Branch(NamedTuple):
"""Branch in a graph.
Parameters:
condition: A callable that returns a string representation of the condition.
ends: Optional dictionary of end node ids for the branches. Defaults
to None.
"""
condition: Callable[..., str]
ends: Optional[dict[str, str]]
[docs]
class CurveStyle(Enum):
"""Enum for different curve styles supported by Mermaid"""
BASIS = "basis"
BUMP_X = "bumpX"
BUMP_Y = "bumpY"
CARDINAL = "cardinal"
CATMULL_ROM = "catmullRom"
LINEAR = "linear"
MONOTONE_X = "monotoneX"
MONOTONE_Y = "monotoneY"
NATURAL = "natural"
STEP = "step"
STEP_AFTER = "stepAfter"
STEP_BEFORE = "stepBefore"
[docs]
@dataclass
class NodeStyles:
"""Schema for Hexadecimal color codes for different node types.
Parameters:
default: The default color code. Defaults to "fill:#f2f0ff,line-height:1.2".
first: The color code for the first node. Defaults to "fill-opacity:0".
last: The color code for the last node. Defaults to "fill:#bfb6fc".
"""
default: str = "fill:#f2f0ff,line-height:1.2"
first: str = "fill-opacity:0"
last: str = "fill:#bfb6fc"
[docs]
class MermaidDrawMethod(Enum):
"""Enum for different draw methods supported by Mermaid"""
PYPPETEER = "pyppeteer" # Uses Pyppeteer to render the graph
API = "api" # Uses Mermaid.INK API to render the graph
[docs]
def node_data_str(id: str, data: Union[type[BaseModel], RunnableType]) -> str:
"""Convert the data of a node to a string.
Args:
id: The node id.
data: The node data.
Returns:
A string representation of the data.
"""
from langchain_core.runnables.base import Runnable
if not is_uuid(id):
return id
elif isinstance(data, Runnable):
data_str = data.get_name()
else:
data_str = data.__name__
return data_str if not data_str.startswith("Runnable") else data_str[8:]
[docs]
def node_data_json(
node: Node, *, with_schemas: bool = False
) -> dict[str, Union[str, dict[str, Any]]]:
"""Convert the data of a node to a JSON-serializable format.
Args:
node: The node to convert.
with_schemas: Whether to include the schema of the data if
it is a Pydantic model. Defaults to False.
Returns:
A dictionary with the type of the data and the data itself.
"""
from langchain_core.load.serializable import to_json_not_implemented
from langchain_core.runnables.base import Runnable, RunnableSerializable
if isinstance(node.data, RunnableSerializable):
json: dict[str, Any] = {
"type": "runnable",
"data": {
"id": node.data.lc_id(),
"name": node_data_str(node.id, node.data),
},
}
elif isinstance(node.data, Runnable):
json = {
"type": "runnable",
"data": {
"id": to_json_not_implemented(node.data)["id"],
"name": node_data_str(node.id, node.data),
},
}
elif inspect.isclass(node.data) and is_basemodel_subclass(node.data):
json = (
{
"type": "schema",
"data": node.data.model_json_schema(
schema_generator=_IgnoreUnserializable
),
}
if with_schemas
else {
"type": "schema",
"data": node_data_str(node.id, node.data),
}
)
else:
json = {
"type": "unknown",
"data": node_data_str(node.id, node.data),
}
if node.metadata is not None:
json["metadata"] = node.metadata
return json
[docs]
@dataclass
class Graph:
"""Graph of nodes and edges.
Parameters:
nodes: Dictionary of nodes in the graph. Defaults to an empty dictionary.
edges: List of edges in the graph. Defaults to an empty list.
"""
nodes: dict[str, Node] = field(default_factory=dict)
edges: list[Edge] = field(default_factory=list)
[docs]
def to_json(self, *, with_schemas: bool = False) -> dict[str, list[dict[str, Any]]]:
"""Convert the graph to a JSON-serializable format.
Args:
with_schemas: Whether to include the schemas of the nodes if they are
Pydantic models. Defaults to False.
Returns:
A dictionary with the nodes and edges of the graph.
"""
stable_node_ids = {
node.id: i if is_uuid(node.id) else node.id
for i, node in enumerate(self.nodes.values())
}
edges: list[dict[str, Any]] = []
for edge in self.edges:
edge_dict = {
"source": stable_node_ids[edge.source],
"target": stable_node_ids[edge.target],
}
if edge.data is not None:
edge_dict["data"] = edge.data
if edge.conditional:
edge_dict["conditional"] = True
edges.append(edge_dict)
return {
"nodes": [
{
"id": stable_node_ids[node.id],
**node_data_json(node, with_schemas=with_schemas),
}
for node in self.nodes.values()
],
"edges": edges,
}
def __bool__(self) -> bool:
return bool(self.nodes)
[docs]
def next_id(self) -> str:
"""Return a new unique node
identifier that can be used to add a node to the graph."""
return uuid4().hex
[docs]
def add_node(
self,
data: Union[type[BaseModel], RunnableType],
id: Optional[str] = None,
*,
metadata: Optional[dict[str, Any]] = None,
) -> Node:
"""Add a node to the graph and return it.
Args:
data: The data of the node.
id: The id of the node. Defaults to None.
metadata: Optional metadata for the node. Defaults to None.
Returns:
The node that was added to the graph.
Raises:
ValueError: If a node with the same id already exists.
"""
if id is not None and id in self.nodes:
msg = f"Node with id {id} already exists"
raise ValueError(msg)
id = id or self.next_id()
node = Node(id=id, data=data, metadata=metadata, name=node_data_str(id, data))
self.nodes[node.id] = node
return node
[docs]
def remove_node(self, node: Node) -> None:
"""Remove a node from the graph and all edges connected to it.
Args:
node: The node to remove.
"""
self.nodes.pop(node.id)
self.edges = [
edge
for edge in self.edges
if edge.source != node.id and edge.target != node.id
]
[docs]
def add_edge(
self,
source: Node,
target: Node,
data: Optional[Stringifiable] = None,
conditional: bool = False,
) -> Edge:
"""Add an edge to the graph and return it.
Args:
source: The source node of the edge.
target: The target node of the edge.
data: Optional data associated with the edge. Defaults to None.
conditional: Whether the edge is conditional. Defaults to False.
Returns:
The edge that was added to the graph.
Raises:
ValueError: If the source or target node is not in the graph.
"""
if source.id not in self.nodes:
msg = f"Source node {source.id} not in graph"
raise ValueError(msg)
if target.id not in self.nodes:
msg = f"Target node {target.id} not in graph"
raise ValueError(msg)
edge = Edge(
source=source.id, target=target.id, data=data, conditional=conditional
)
self.edges.append(edge)
return edge
[docs]
def extend(
self, graph: Graph, *, prefix: str = ""
) -> tuple[Optional[Node], Optional[Node]]:
"""Add all nodes and edges from another graph.
Note this doesn't check for duplicates, nor does it connect the graphs.
Args:
graph: The graph to add.
prefix: The prefix to add to the node ids. Defaults to "".
Returns:
A tuple of the first and last nodes of the subgraph.
"""
if all(is_uuid(node.id) for node in graph.nodes.values()):
prefix = ""
def prefixed(id: str) -> str:
return f"{prefix}:{id}" if prefix else id
# prefix each node
self.nodes.update(
{prefixed(k): v.copy(id=prefixed(k)) for k, v in graph.nodes.items()}
)
# prefix each edge's source and target
self.edges.extend(
[
edge.copy(source=prefixed(edge.source), target=prefixed(edge.target))
for edge in graph.edges
]
)
# return (prefixed) first and last nodes of the subgraph
first, last = graph.first_node(), graph.last_node()
return (
first.copy(id=prefixed(first.id)) if first else None,
last.copy(id=prefixed(last.id)) if last else None,
)
[docs]
def reid(self) -> Graph:
"""Return a new graph with all nodes re-identified,
using their unique, readable names where possible."""
node_name_to_ids = defaultdict(list)
for node in self.nodes.values():
node_name_to_ids[node.name].append(node.id)
unique_labels = {
node_id: node_name if len(node_ids) == 1 else f"{node_name}_{i + 1}"
for node_name, node_ids in node_name_to_ids.items()
for i, node_id in enumerate(node_ids)
}
def _get_node_id(node_id: str) -> str:
label = unique_labels[node_id]
if is_uuid(node_id):
return label
else:
return node_id
return Graph(
nodes={
_get_node_id(id): node.copy(id=_get_node_id(id))
for id, node in self.nodes.items()
},
edges=[
edge.copy(
source=_get_node_id(edge.source),
target=_get_node_id(edge.target),
)
for edge in self.edges
],
)
[docs]
def first_node(self) -> Optional[Node]:
"""Find the single node that is not a target of any edge.
If there is no such node, or there are multiple, return None.
When drawing the graph, this node would be the origin."""
return _first_node(self)
[docs]
def last_node(self) -> Optional[Node]:
"""Find the single node that is not a source of any edge.
If there is no such node, or there are multiple, return None.
When drawing the graph, this node would be the destination."""
return _last_node(self)
[docs]
def trim_first_node(self) -> None:
"""Remove the first node if it exists and has a single outgoing edge,
i.e., if removing it would not leave the graph without a "first" node."""
first_node = self.first_node()
if (
first_node
and _first_node(self, exclude=[first_node.id])
and len({e for e in self.edges if e.source == first_node.id}) == 1
):
self.remove_node(first_node)
[docs]
def trim_last_node(self) -> None:
"""Remove the last node if it exists and has a single incoming edge,
i.e., if removing it would not leave the graph without a "last" node."""
last_node = self.last_node()
if (
last_node
and _last_node(self, exclude=[last_node.id])
and len({e for e in self.edges if e.target == last_node.id}) == 1
):
self.remove_node(last_node)
[docs]
def draw_ascii(self) -> str:
"""Draw the graph as an ASCII art string."""
from langchain_core.runnables.graph_ascii import draw_ascii
return draw_ascii(
{node.id: node.name for node in self.nodes.values()},
self.edges,
)
[docs]
def print_ascii(self) -> None:
"""Print the graph as an ASCII art string."""
print(self.draw_ascii()) # noqa: T201
@overload
def draw_png(
self,
output_file_path: str,
fontname: Optional[str] = None,
labels: Optional[LabelsDict] = None,
) -> None: ...
@overload
def draw_png(
self,
output_file_path: None,
fontname: Optional[str] = None,
labels: Optional[LabelsDict] = None,
) -> bytes: ...
[docs]
def draw_png(
self,
output_file_path: Optional[str] = None,
fontname: Optional[str] = None,
labels: Optional[LabelsDict] = None,
) -> Union[bytes, None]:
"""Draw the graph as a PNG image.
Args:
output_file_path: The path to save the image to. If None, the image
is not saved. Defaults to None.
fontname: The name of the font to use. Defaults to None.
labels: Optional labels for nodes and edges in the graph. Defaults to None.
Returns:
The PNG image as bytes if output_file_path is None, None otherwise.
"""
from langchain_core.runnables.graph_png import PngDrawer
default_node_labels = {node.id: node.name for node in self.nodes.values()}
return PngDrawer(
fontname,
LabelsDict(
nodes={
**default_node_labels,
**(labels["nodes"] if labels is not None else {}),
},
edges=labels["edges"] if labels is not None else {},
),
).draw(self, output_file_path)
[docs]
def draw_mermaid(
self,
*,
with_styles: bool = True,
curve_style: CurveStyle = CurveStyle.LINEAR,
node_colors: Optional[NodeStyles] = None,
wrap_label_n_words: int = 9,
) -> str:
"""Draw the graph as a Mermaid syntax string.
Args:
with_styles: Whether to include styles in the syntax. Defaults to True.
curve_style: The style of the edges. Defaults to CurveStyle.LINEAR.
node_colors: The colors of the nodes. Defaults to NodeStyles().
wrap_label_n_words: The number of words to wrap the node labels at.
Defaults to 9.
Returns:
The Mermaid syntax string.
"""
from langchain_core.runnables.graph_mermaid import draw_mermaid
graph = self.reid()
first_node = graph.first_node()
last_node = graph.last_node()
return draw_mermaid(
nodes=graph.nodes,
edges=graph.edges,
first_node=first_node.id if first_node else None,
last_node=last_node.id if last_node else None,
with_styles=with_styles,
curve_style=curve_style,
node_styles=node_colors,
wrap_label_n_words=wrap_label_n_words,
)
[docs]
def draw_mermaid_png(
self,
*,
curve_style: CurveStyle = CurveStyle.LINEAR,
node_colors: Optional[NodeStyles] = None,
wrap_label_n_words: int = 9,
output_file_path: Optional[str] = None,
draw_method: MermaidDrawMethod = MermaidDrawMethod.API,
background_color: str = "white",
padding: int = 10,
) -> bytes:
"""Draw the graph as a PNG image using Mermaid.
Args:
curve_style: The style of the edges. Defaults to CurveStyle.LINEAR.
node_colors: The colors of the nodes. Defaults to NodeStyles().
wrap_label_n_words: The number of words to wrap the node labels at.
Defaults to 9.
output_file_path: The path to save the image to. If None, the image
is not saved. Defaults to None.
draw_method: The method to use to draw the graph.
Defaults to MermaidDrawMethod.API.
background_color: The color of the background. Defaults to "white".
padding: The padding around the graph. Defaults to 10.
Returns:
The PNG image as bytes.
"""
from langchain_core.runnables.graph_mermaid import draw_mermaid_png
mermaid_syntax = self.draw_mermaid(
curve_style=curve_style,
node_colors=node_colors,
wrap_label_n_words=wrap_label_n_words,
)
return draw_mermaid_png(
mermaid_syntax=mermaid_syntax,
output_file_path=output_file_path,
draw_method=draw_method,
background_color=background_color,
padding=padding,
)
def _first_node(graph: Graph, exclude: Sequence[str] = ()) -> Optional[Node]:
"""Find the single node that is not a target of any edge.
Exclude nodes/sources with ids in the exclude list.
If there is no such node, or there are multiple, return None.
When drawing the graph, this node would be the origin."""
targets = {edge.target for edge in graph.edges if edge.source not in exclude}
found: list[Node] = []
for node in graph.nodes.values():
if node.id not in exclude and node.id not in targets:
found.append(node)
return found[0] if len(found) == 1 else None
def _last_node(graph: Graph, exclude: Sequence[str] = ()) -> Optional[Node]:
"""Find the single node that is not a source of any edge.
Exclude nodes/targets with ids in the exclude list.
If there is no such node, or there are multiple, return None.
When drawing the graph, this node would be the destination."""
sources = {edge.source for edge in graph.edges if edge.target not in exclude}
found: list[Node] = []
for node in graph.nodes.values():
if node.id not in exclude and node.id not in sources:
found.append(node)
return found[0] if len(found) == 1 else None