Source code for langchain_community.callbacks.streamlit.mutable_expander
from __future__ import annotations
from enum import Enum
from typing import TYPE_CHECKING, Any, Dict, List, NamedTuple, Optional
if TYPE_CHECKING:
from streamlit.delta_generator import DeltaGenerator
from streamlit.type_util import SupportsStr
[docs]class ChildType(Enum):
"""Enumerator of the child type."""
MARKDOWN = "MARKDOWN"
EXCEPTION = "EXCEPTION"
[docs]class ChildRecord(NamedTuple):
"""Child record as a NamedTuple."""
type: ChildType
kwargs: Dict[str, Any]
dg: DeltaGenerator
[docs]class MutableExpander:
"""Streamlit expander that can be renamed and dynamically expanded/collapsed."""
[docs] def __init__(self, parent_container: DeltaGenerator, label: str, expanded: bool):
"""Create a new MutableExpander.
Parameters
----------
parent_container
The `st.container` that the expander will be created inside.
The expander transparently deletes and recreates its underlying
`st.expander` instance when its label changes, and it uses
`parent_container` to ensure it recreates this underlying expander in the
same location onscreen.
label
The expander's initial label.
expanded
The expander's initial `expanded` value.
"""
self._label = label
self._expanded = expanded
self._parent_cursor = parent_container.empty()
self._container = self._parent_cursor.expander(label, expanded)
self._child_records: List[ChildRecord] = []
@property
def label(self) -> str:
"""Expander's label string."""
return self._label
@property
def expanded(self) -> bool:
"""True if the expander was created with `expanded=True`."""
return self._expanded
[docs] def clear(self) -> None:
"""Remove the container and its contents entirely. A cleared container can't
be reused.
"""
self._container = self._parent_cursor.empty()
self._child_records.clear()
[docs] def append_copy(self, other: MutableExpander) -> None:
"""Append a copy of another MutableExpander's children to this
MutableExpander.
"""
other_records = other._child_records.copy()
for record in other_records:
self._create_child(record.type, record.kwargs)
[docs] def update(
self, *, new_label: Optional[str] = None, new_expanded: Optional[bool] = None
) -> None:
"""Change the expander's label and expanded state"""
if new_label is None:
new_label = self._label
if new_expanded is None:
new_expanded = self._expanded
if self._label == new_label and self._expanded == new_expanded:
# No change!
return
self._label = new_label
self._expanded = new_expanded
self._container = self._parent_cursor.expander(new_label, new_expanded)
prev_records = self._child_records
self._child_records = []
# Replay all children into the new container
for record in prev_records:
self._create_child(record.type, record.kwargs)
[docs] def markdown(
self,
body: SupportsStr,
unsafe_allow_html: bool = False,
*,
help: Optional[str] = None,
index: Optional[int] = None,
) -> int:
"""Add a Markdown element to the container and return its index."""
kwargs = {"body": body, "unsafe_allow_html": unsafe_allow_html, "help": help}
new_dg = self._get_dg(index).markdown(**kwargs)
record = ChildRecord(ChildType.MARKDOWN, kwargs, new_dg)
return self._add_record(record, index)
[docs] def exception(
self, exception: BaseException, *, index: Optional[int] = None
) -> int:
"""Add an Exception element to the container and return its index."""
kwargs = {"exception": exception}
new_dg = self._get_dg(index).exception(**kwargs)
record = ChildRecord(ChildType.EXCEPTION, kwargs, new_dg)
return self._add_record(record, index)
def _create_child(self, type: ChildType, kwargs: Dict[str, Any]) -> None:
"""Create a new child with the given params"""
if type == ChildType.MARKDOWN:
self.markdown(**kwargs)
elif type == ChildType.EXCEPTION:
self.exception(**kwargs)
else:
raise RuntimeError(f"Unexpected child type {type}")
def _add_record(self, record: ChildRecord, index: Optional[int]) -> int:
"""Add a ChildRecord to self._children. If `index` is specified, replace
the existing record at that index. Otherwise, append the record to the
end of the list.
Return the index of the added record.
"""
if index is not None:
# Replace existing child
self._child_records[index] = record
return index
# Append new child
self._child_records.append(record)
return len(self._child_records) - 1
def _get_dg(self, index: Optional[int]) -> DeltaGenerator:
if index is not None:
# Existing index: reuse child's DeltaGenerator
assert 0 <= index < len(self._child_records), f"Bad index: {index}"
return self._child_records[index].dg
# No index: use container's DeltaGenerator
return self._container