Source code for langchain_experimental.cpal.models

from __future__ import annotations  # allows pydantic model to reference itself

import re
from typing import Any, List, Optional, Union

from langchain_community.graphs.networkx_graph import NetworkxEntityGraph
from pydantic import (
    BaseModel,
    ConfigDict,
    Field,
    PrivateAttr,
    field_validator,
    model_validator,
)

from langchain_experimental.cpal.constants import Constant


[docs] class NarrativeModel(BaseModel): """ Narrative input as three story elements. """ story_outcome_question: str story_hypothetical: str story_plot: str # causal stack of operations @field_validator("*", mode="before") def empty_str_to_none(cls, v: str) -> Union[str, None]: """Empty strings are not allowed""" if v == "": return None return v
[docs] class EntityModel(BaseModel): """Entity in the story.""" name: str = Field(description="entity name") code: str = Field(description="entity actions") value: float = Field(description="entity initial value") depends_on: List[str] = Field(default=[], description="ancestor entities") # TODO: generalize to multivariate math # TODO: acyclic graph model_config = ConfigDict( validate_assignment=True, ) @field_validator("name") def lower_case_name(cls, v: str) -> str: v = v.lower() return v
[docs] class CausalModel(BaseModel): """Casual data.""" attribute: str = Field(description="name of the attribute to be calculated") entities: List[EntityModel] = Field(description="entities in the story")
# TODO: root validate each `entity.depends_on` using system's entity names
[docs] class EntitySettingModel(BaseModel): """Entity initial conditions. Initial conditions for an entity {"name": "bud", "attribute": "pet_count", "value": 12} """ name: str = Field(description="name of the entity") attribute: str = Field(description="name of the attribute to be calculated") value: float = Field(description="entity's attribute value (calculated)") @field_validator("name") def lower_case_transform(cls, v: str) -> str: v = v.lower() return v
[docs] class SystemSettingModel(BaseModel): """System initial conditions. Initial global conditions for the system. {"parameter": "interest_rate", "value": .05} """ parameter: str value: float
[docs] class InterventionModel(BaseModel): """Intervention data of the story aka initial conditions. >>> intervention.dict() { entity_settings: [ {"name": "bud", "attribute": "pet_count", "value": 12}, {"name": "pat", "attribute": "pet_count", "value": 0}, ], system_settings: None, } """ entity_settings: List[EntitySettingModel] system_settings: Optional[List[SystemSettingModel]] = None @field_validator("system_settings") def lower_case_name(cls, v: str) -> Union[str, None]: if v is not None: raise NotImplementedError("system_setting is not implemented yet") return v
[docs] class QueryModel(BaseModel): """Query data of the story. translate a question about the story outcome into a programmatic expression""" question: str = Field( # type: ignore[literal-required] alias=Constant.narrative_input.value ) # input # type: ignore[literal-required] expression: str # output, part of llm completion llm_error_msg: str # output, part of llm completion _result_table: str = PrivateAttr() # result of the executed query
[docs] class ResultModel(BaseModel): """Result of the story query.""" question: str = Field( # type: ignore[literal-required] alias=Constant.narrative_input.value ) # input # type: ignore[literal-required] _result_table: str = PrivateAttr() # result of the executed query
[docs] class StoryModel(BaseModel): """Story data.""" causal_operations: Any = Field() intervention: Any = Field() query: Any = Field() _outcome_table: Any = PrivateAttr(default=None) _networkx_wrapper: Any = PrivateAttr(default=None) def __init__(self, **kwargs: Any): super().__init__(**kwargs) self._compute() # TODO: when langchain adopts pydantic.v2 replace w/ `__post_init__` # misses hints github.com/pydantic/pydantic/issues/1729#issuecomment-1300576214 @model_validator(mode="before") @classmethod def check_intervention_is_valid(cls, values: dict) -> Any: valid_names = [e.name for e in values["causal_operations"].entities] for setting in values["intervention"].entity_settings: if setting.name not in valid_names: error_msg = f""" Hypothetical question has an invalid entity name. `{setting.name}` not in `{valid_names}` """ raise ValueError(error_msg) return values def _block_back_door_paths(self) -> None: # stop intervention entities from depending on others intervention_entities = [ entity_setting.name for entity_setting in self.intervention.entity_settings ] for entity in self.causal_operations.entities: if entity.name in intervention_entities: entity.depends_on = [] entity.code = "pass" def _set_initial_conditions(self) -> None: for entity_setting in self.intervention.entity_settings: for entity in self.causal_operations.entities: if entity.name == entity_setting.name: entity.value = entity_setting.value def _make_graph(self) -> None: self._networkx_wrapper = NetworkxEntityGraph() for entity in self.causal_operations.entities: for parent_name in entity.depends_on: self._networkx_wrapper._graph.add_edge( parent_name, entity.name, relation=entity.code ) # TODO: is it correct to drop entities with no impact on the outcome (?) self.causal_operations.entities = [ entity for entity in self.causal_operations.entities if entity.name in self._networkx_wrapper.get_topological_sort() ] def _sort_entities(self) -> None: # order the sequence of causal actions sorted_nodes = self._networkx_wrapper.get_topological_sort() self.causal_operations.entities.sort(key=lambda x: sorted_nodes.index(x.name)) def _forward_propagate(self) -> None: try: import pandas as pd except ImportError as e: raise ImportError( "Unable to import pandas, please install with `pip install pandas`." ) from e entity_scope = { entity.name: entity for entity in self.causal_operations.entities } for entity in self.causal_operations.entities: if entity.code == "pass": continue else: # gist.github.com/dean0x7d/df5ce97e4a1a05be4d56d1378726ff92 exec(entity.code, globals(), entity_scope) row_values = [entity.dict() for entity in entity_scope.values()] self._outcome_table = pd.DataFrame(row_values) def _run_query(self) -> None: def humanize_sql_error_msg(error: str) -> str: pattern = r"column\s+(.*?)\s+not found" col_match = re.search(pattern, error) if col_match: return ( "SQL error: " + col_match.group(1) + " is not an attribute in your story!" ) else: return str(error) if self.query.llm_error_msg == "": try: import duckdb df = self._outcome_table # noqa query_result = duckdb.sql(self.query.expression).df() self.query._result_table = query_result except duckdb.BinderException as e: self.query._result_table = humanize_sql_error_msg(str(e)) except ImportError as e: raise ImportError( "Unable to import duckdb, please install with `pip install duckdb`." ) from e except Exception as e: self.query._result_table = str(e) else: msg = "LLM maybe failed to translate question to SQL query." raise ValueError( { "question": self.query.question, "llm_error_msg": self.query.llm_error_msg, "msg": msg, } ) def _compute(self) -> Any: self._block_back_door_paths() self._set_initial_conditions() self._make_graph() self._sort_entities() self._forward_propagate() self._run_query()
[docs] def print_debug_report(self) -> None: report = { "outcome": self._outcome_table, "query": self.query.dict(), "result": self.query._result_table, } from pprint import pprint pprint(report)