import importlib.metadata
import logging
import os
import traceback
import warnings
from contextvars import ContextVar
from typing import Any, Dict, List, Union, cast
from uuid import UUID
import requests
from langchain_core.agents import AgentAction, AgentFinish
from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.messages import BaseMessage
from langchain_core.outputs import LLMResult
from packaging.version import parse
logger = logging.getLogger(__name__)
DEFAULT_API_URL = "https://app.llmonitor.com"
user_ctx = ContextVar[Union[str, None]]("user_ctx", default=None)
user_props_ctx = ContextVar[Union[str, None]]("user_props_ctx", default=None)
PARAMS_TO_CAPTURE = [
"temperature",
"top_p",
"top_k",
"stop",
"presence_penalty",
"frequence_penalty",
"seed",
"function_call",
"functions",
"tools",
"tool_choice",
"response_format",
"max_tokens",
"logit_bias",
]
[docs]
class UserContextManager:
"""Context manager for LLMonitor user context."""
[docs]
def __init__(self, user_id: str, user_props: Any = None) -> None:
user_ctx.set(user_id)
user_props_ctx.set(user_props)
def __enter__(self) -> Any:
pass
def __exit__(self, exc_type: Any, exc_value: Any, exc_tb: Any) -> Any:
user_ctx.set(None)
user_props_ctx.set(None)
[docs]
def identify(user_id: str, user_props: Any = None) -> UserContextManager:
"""Builds an LLMonitor UserContextManager
Parameters:
- `user_id`: The user id.
- `user_props`: The user properties.
Returns:
A context manager that sets the user context.
"""
return UserContextManager(user_id, user_props)
def _serialize(obj: Any) -> Union[Dict[str, Any], List[Any], Any]:
if hasattr(obj, "to_json"):
return obj.to_json()
if isinstance(obj, dict):
return {key: _serialize(value) for key, value in obj.items()}
if isinstance(obj, list):
return [_serialize(element) for element in obj]
return obj
def _parse_input(raw_input: Any) -> Any:
if not raw_input:
return None
# if it's an array of 1, just parse the first element
if isinstance(raw_input, list) and len(raw_input) == 1:
return _parse_input(raw_input[0])
if not isinstance(raw_input, dict):
return _serialize(raw_input)
input_value = raw_input.get("input")
inputs_value = raw_input.get("inputs")
question_value = raw_input.get("question")
query_value = raw_input.get("query")
if input_value:
return input_value
if inputs_value:
return inputs_value
if question_value:
return question_value
if query_value:
return query_value
return _serialize(raw_input)
def _parse_output(raw_output: dict) -> Any:
if not raw_output:
return None
if not isinstance(raw_output, dict):
return _serialize(raw_output)
text_value = raw_output.get("text")
output_value = raw_output.get("output")
output_text_value = raw_output.get("output_text")
answer_value = raw_output.get("answer")
result_value = raw_output.get("result")
if text_value:
return text_value
if answer_value:
return answer_value
if output_value:
return output_value
if output_text_value:
return output_text_value
if result_value:
return result_value
return _serialize(raw_output)
def _parse_lc_role(
role: str,
) -> str:
if role == "human":
return "user"
else:
return role
def _get_user_id(metadata: Any) -> Any:
if user_ctx.get() is not None:
return user_ctx.get()
metadata = metadata or {}
user_id = metadata.get("user_id")
if user_id is None:
user_id = metadata.get("userId") # legacy, to delete in the future
return user_id
def _get_user_props(metadata: Any) -> Any:
if user_props_ctx.get() is not None:
return user_props_ctx.get()
metadata = metadata or {}
return metadata.get("user_props", None)
def _parse_lc_message(message: BaseMessage) -> Dict[str, Any]:
keys = ["function_call", "tool_calls", "tool_call_id", "name"]
parsed = {"text": message.content, "role": _parse_lc_role(message.type)}
parsed.update(
{
key: cast(Any, message.additional_kwargs.get(key))
for key in keys
if message.additional_kwargs.get(key) is not None
}
)
return parsed
def _parse_lc_messages(messages: Union[List[BaseMessage], Any]) -> List[Dict[str, Any]]:
return [_parse_lc_message(message) for message in messages]
[docs]
class LLMonitorCallbackHandler(BaseCallbackHandler):
"""Callback Handler for LLMonitor`.
#### Parameters:
- `app_id`: The app id of the app you want to report to. Defaults to
`None`, which means that `LLMONITOR_APP_ID` will be used.
- `api_url`: The url of the LLMonitor API. Defaults to `None`,
which means that either `LLMONITOR_API_URL` environment variable
or `https://app.llmonitor.com` will be used.
#### Raises:
- `ValueError`: if `app_id` is not provided either as an
argument or as an environment variable.
- `ConnectionError`: if the connection to the API fails.
#### Example:
```python
from langchain_community.llms import OpenAI
from langchain_community.callbacks import LLMonitorCallbackHandler
llmonitor_callback = LLMonitorCallbackHandler()
llm = OpenAI(callbacks=[llmonitor_callback],
metadata={"userId": "user-123"})
llm.invoke("Hello, how are you?")
```
"""
__api_url: str
__app_id: str
__verbose: bool
__llmonitor_version: str
__has_valid_config: bool
[docs]
def __init__(
self,
app_id: Union[str, None] = None,
api_url: Union[str, None] = None,
verbose: bool = False,
) -> None:
super().__init__()
self.__has_valid_config = True
try:
import llmonitor
self.__llmonitor_version = importlib.metadata.version("llmonitor")
self.__track_event = llmonitor.track_event
except ImportError:
logger.warning(
"""[LLMonitor] To use the LLMonitor callback handler you need to
have the `llmonitor` Python package installed. Please install it
with `pip install llmonitor`"""
)
self.__has_valid_config = False
return
if parse(self.__llmonitor_version) < parse("0.0.32"):
logger.warning(
f"""[LLMonitor] The installed `llmonitor` version is
{self.__llmonitor_version}
but `LLMonitorCallbackHandler` requires at least version 0.0.32
upgrade `llmonitor` with `pip install --upgrade llmonitor`"""
)
self.__has_valid_config = False
self.__has_valid_config = True
self.__api_url = api_url or os.getenv("LLMONITOR_API_URL") or DEFAULT_API_URL
self.__verbose = verbose or bool(os.getenv("LLMONITOR_VERBOSE"))
_app_id = app_id or os.getenv("LLMONITOR_APP_ID")
if _app_id is None:
logger.warning(
"""[LLMonitor] app_id must be provided either as an argument or
as an environment variable"""
)
self.__has_valid_config = False
else:
self.__app_id = _app_id
if self.__has_valid_config is False:
return None
try:
res = requests.get(f"{self.__api_url}/api/app/{self.__app_id}")
if not res.ok:
raise ConnectionError()
except Exception:
logger.warning(
f"""[LLMonitor] Could not connect to the LLMonitor API at
{self.__api_url}"""
)
[docs]
def on_llm_start(
self,
serialized: Dict[str, Any],
prompts: List[str],
*,
run_id: UUID,
parent_run_id: Union[UUID, None] = None,
tags: Union[List[str], None] = None,
metadata: Union[Dict[str, Any], None] = None,
**kwargs: Any,
) -> None:
if self.__has_valid_config is False:
return
try:
user_id = _get_user_id(metadata)
user_props = _get_user_props(metadata)
params = kwargs.get("invocation_params", {})
params.update(
serialized.get("kwargs", {})
) # Sometimes, for example with ChatAnthropic, `invocation_params` is empty
name = (
params.get("model")
or params.get("model_name")
or params.get("model_id")
)
if not name and "anthropic" in params.get("_type"):
name = "claude-2"
extra = {
param: params.get(param)
for param in PARAMS_TO_CAPTURE
if params.get(param) is not None
}
input = _parse_input(prompts)
self.__track_event(
"llm",
"start",
user_id=user_id,
run_id=str(run_id),
parent_run_id=str(parent_run_id) if parent_run_id else None,
name=name,
input=input,
tags=tags,
extra=extra,
metadata=metadata,
user_props=user_props,
app_id=self.__app_id,
)
except Exception as e:
warnings.warn(f"[LLMonitor] An error occurred in on_llm_start: {e}")
[docs]
def on_chat_model_start(
self,
serialized: Dict[str, Any],
messages: List[List[BaseMessage]],
*,
run_id: UUID,
parent_run_id: Union[UUID, None] = None,
tags: Union[List[str], None] = None,
metadata: Union[Dict[str, Any], None] = None,
**kwargs: Any,
) -> Any:
if self.__has_valid_config is False:
return
try:
user_id = _get_user_id(metadata)
user_props = _get_user_props(metadata)
params = kwargs.get("invocation_params", {})
params.update(
serialized.get("kwargs", {})
) # Sometimes, for example with ChatAnthropic, `invocation_params` is empty
name = (
params.get("model")
or params.get("model_name")
or params.get("model_id")
)
if not name and "anthropic" in params.get("_type"):
name = "claude-2"
extra = {
param: params.get(param)
for param in PARAMS_TO_CAPTURE
if params.get(param) is not None
}
input = _parse_lc_messages(messages[0])
self.__track_event(
"llm",
"start",
user_id=user_id,
run_id=str(run_id),
parent_run_id=str(parent_run_id) if parent_run_id else None,
name=name,
input=input,
tags=tags,
extra=extra,
metadata=metadata,
user_props=user_props,
app_id=self.__app_id,
)
except Exception as e:
logger.error(f"[LLMonitor] An error occurred in on_chat_model_start: {e}")
[docs]
def on_llm_end(
self,
response: LLMResult,
*,
run_id: UUID,
parent_run_id: Union[UUID, None] = None,
**kwargs: Any,
) -> None:
if self.__has_valid_config is False:
return
try:
token_usage = (response.llm_output or {}).get("token_usage", {})
parsed_output: Any = [
_parse_lc_message(generation.message)
if hasattr(generation, "message")
else generation.text
for generation in response.generations[0]
]
# if it's an array of 1, just parse the first element
if len(parsed_output) == 1:
parsed_output = parsed_output[0]
self.__track_event(
"llm",
"end",
run_id=str(run_id),
parent_run_id=str(parent_run_id) if parent_run_id else None,
output=parsed_output,
token_usage={
"prompt": token_usage.get("prompt_tokens"),
"completion": token_usage.get("completion_tokens"),
},
app_id=self.__app_id,
)
except Exception as e:
logger.error(f"[LLMonitor] An error occurred in on_llm_end: {e}")
[docs]
def on_chain_start(
self,
serialized: Dict[str, Any],
inputs: Dict[str, Any],
*,
run_id: UUID,
parent_run_id: Union[UUID, None] = None,
tags: Union[List[str], None] = None,
metadata: Union[Dict[str, Any], None] = None,
**kwargs: Any,
) -> Any:
if self.__has_valid_config is False:
return
try:
name = serialized.get("id", [None, None, None, None])[3]
type = "chain"
metadata = metadata or {}
agentName = metadata.get("agent_name")
if agentName is None:
agentName = metadata.get("agentName")
if name == "AgentExecutor" or name == "PlanAndExecute":
type = "agent"
if agentName is not None:
type = "agent"
name = agentName
if parent_run_id is not None:
type = "chain"
user_id = _get_user_id(metadata)
user_props = _get_user_props(metadata)
input = _parse_input(inputs)
self.__track_event(
type,
"start",
user_id=user_id,
run_id=str(run_id),
parent_run_id=str(parent_run_id) if parent_run_id else None,
name=name,
input=input,
tags=tags,
metadata=metadata,
user_props=user_props,
app_id=self.__app_id,
)
except Exception as e:
logger.error(f"[LLMonitor] An error occurred in on_chain_start: {e}")
[docs]
def on_chain_end(
self,
outputs: Dict[str, Any],
*,
run_id: UUID,
parent_run_id: Union[UUID, None] = None,
**kwargs: Any,
) -> Any:
if self.__has_valid_config is False:
return
try:
output = _parse_output(outputs)
self.__track_event(
"chain",
"end",
run_id=str(run_id),
parent_run_id=str(parent_run_id) if parent_run_id else None,
output=output,
app_id=self.__app_id,
)
except Exception as e:
logger.error(f"[LLMonitor] An error occurred in on_chain_end: {e}")
[docs]
def on_agent_action(
self,
action: AgentAction,
*,
run_id: UUID,
parent_run_id: Union[UUID, None] = None,
**kwargs: Any,
) -> Any:
if self.__has_valid_config is False:
return
try:
name = action.tool
input = _parse_input(action.tool_input)
self.__track_event(
"tool",
"start",
run_id=str(run_id),
parent_run_id=str(parent_run_id) if parent_run_id else None,
name=name,
input=input,
app_id=self.__app_id,
)
except Exception as e:
logger.error(f"[LLMonitor] An error occurred in on_agent_action: {e}")
[docs]
def on_agent_finish(
self,
finish: AgentFinish,
*,
run_id: UUID,
parent_run_id: Union[UUID, None] = None,
**kwargs: Any,
) -> Any:
if self.__has_valid_config is False:
return
try:
output = _parse_output(finish.return_values)
self.__track_event(
"agent",
"end",
run_id=str(run_id),
parent_run_id=str(parent_run_id) if parent_run_id else None,
output=output,
app_id=self.__app_id,
)
except Exception as e:
logger.error(f"[LLMonitor] An error occurred in on_agent_finish: {e}")
[docs]
def on_chain_error(
self,
error: BaseException,
*,
run_id: UUID,
parent_run_id: Union[UUID, None] = None,
**kwargs: Any,
) -> Any:
if self.__has_valid_config is False:
return
try:
self.__track_event(
"chain",
"error",
run_id=str(run_id),
parent_run_id=str(parent_run_id) if parent_run_id else None,
error={"message": str(error), "stack": traceback.format_exc()},
app_id=self.__app_id,
)
except Exception as e:
logger.error(f"[LLMonitor] An error occurred in on_chain_error: {e}")
[docs]
def on_llm_error(
self,
error: BaseException,
*,
run_id: UUID,
parent_run_id: Union[UUID, None] = None,
**kwargs: Any,
) -> Any:
if self.__has_valid_config is False:
return
try:
self.__track_event(
"llm",
"error",
run_id=str(run_id),
parent_run_id=str(parent_run_id) if parent_run_id else None,
error={"message": str(error), "stack": traceback.format_exc()},
app_id=self.__app_id,
)
except Exception as e:
logger.error(f"[LLMonitor] An error occurred in on_llm_error: {e}")
__all__ = ["LLMonitorCallbackHandler", "identify"]