UpstashRatelimitHandler#

class langchain_community.callbacks.upstash_ratelimit_callback.UpstashRatelimitHandler(identifier: str, *, token_ratelimit: None = None, request_ratelimit: None = None, include_output_tokens: bool = False)[source]#

Callback to handle rate limiting based on the number of requests or the number of tokens in the input.

It uses Upstash Ratelimit to track the ratelimit which utilizes Upstash Redis to track the state.

Should not be passed to the chain when initialising the chain. This is because the handler has a state which should be fresh every time invoke is called. Instead, initialise and pass a handler every time you invoke.

Creates UpstashRatelimitHandler. Must be passed an identifier to ratelimit like a user id or an ip address.

Additionally, it must be passed at least one of token_ratelimit or request_ratelimit parameters.

Parameters:
  • Union[int (identifier) – the identifier

  • str] – the identifier

  • Optional[Ratelimit] (request_ratelimit) – Ratelimit to limit the number of tokens. Only works with OpenAI models since only these models provide the number of tokens as information in their output.

  • Optional[Ratelimit] – Ratelimit to limit the number of requests

  • bool (include_output_tokens) – Whether to count output tokens when rate limiting based on number of tokens. Only used when token_ratelimit is passed. False by default.

  • identifier (str)

  • token_ratelimit (None)

  • request_ratelimit (None)

  • include_output_tokens (bool)

Example

from upstash_redis import Redis
from upstash_ratelimit import Ratelimit, FixedWindow

redis = Redis.from_env()
ratelimit = Ratelimit(
    redis=redis,
    # fixed window to allow 10 requests every 10 seconds:
    limiter=FixedWindow(max_requests=10, window=10),
)

user_id = "foo"
handler = UpstashRatelimitHandler(
    identifier=user_id,
    request_ratelimit=ratelimit
)

# Initialize a simple runnable to test
chain = RunnableLambda(str)

# pass handler as callback:
output = chain.invoke(
    "input",
    config={
        "callbacks": [handler]
    }
)

Attributes

ignore_agent

Whether to ignore agent callbacks.

ignore_chain

Whether to ignore chain callbacks.

ignore_chat_model

Whether to ignore chat model callbacks.

ignore_custom_event

Ignore custom event.

ignore_llm

Whether to ignore LLM callbacks.

ignore_retriever

Whether to ignore retriever callbacks.

ignore_retry

Whether to ignore retry callbacks.

raise_error

Whether to raise an error if an exception occurs.

run_inline

Whether to run the callback inline.

Methods

__init__(identifier, *[, token_ratelimit, ...])

Creates UpstashRatelimitHandler.

on_agent_action(action, *, run_id[, ...])

Run on agent action.

on_agent_finish(finish, *, run_id[, ...])

Run on the agent end.

on_chain_end(outputs, *, run_id[, parent_run_id])

Run when chain ends running.

on_chain_error(error, *, run_id[, parent_run_id])

Run when chain errors.

on_chain_start(serialized, inputs, **kwargs)

Run when chain starts running.

on_chat_model_start(serialized, messages, *, ...)

Run when a chat model starts running.

on_custom_event(name, data, *, run_id[, ...])

Override to define a handler for a custom event.

on_llm_end(response, **kwargs)

Run when LLM ends running

on_llm_error(error, *, run_id[, parent_run_id])

Run when LLM errors.

on_llm_new_token(token, *[, chunk, ...])

Run on new LLM token.

on_llm_start(serialized, prompts, **kwargs)

Run when LLM starts running

on_retriever_end(documents, *, run_id[, ...])

Run when Retriever ends running.

on_retriever_error(error, *, run_id[, ...])

Run when Retriever errors.

on_retriever_start(serialized, query, *, run_id)

Run when the Retriever starts running.

on_retry(retry_state, *, run_id[, parent_run_id])

Run on a retry event.

on_text(text, *, run_id[, parent_run_id])

Run on an arbitrary text.

on_tool_end(output, *, run_id[, parent_run_id])

Run when the tool ends running.

on_tool_error(error, *, run_id[, parent_run_id])

Run when tool errors.

on_tool_start(serialized, input_str, *, run_id)

Run when the tool starts running.

reset([identifier])

Creates a new UpstashRatelimitHandler object with the same ratelimit configurations but with a new identifier if it's provided.

__init__(identifier: str, *, token_ratelimit: None = None, request_ratelimit: None = None, include_output_tokens: bool = False)[source]#

Creates UpstashRatelimitHandler. Must be passed an identifier to ratelimit like a user id or an ip address.

Additionally, it must be passed at least one of token_ratelimit or request_ratelimit parameters.

Parameters:
  • Union[int (identifier) – the identifier

  • str] – the identifier

  • Optional[Ratelimit] (request_ratelimit) – Ratelimit to limit the number of tokens. Only works with OpenAI models since only these models provide the number of tokens as information in their output.

  • Optional[Ratelimit] – Ratelimit to limit the number of requests

  • bool (include_output_tokens) – Whether to count output tokens when rate limiting based on number of tokens. Only used when token_ratelimit is passed. False by default.

  • identifier (str)

  • token_ratelimit (None)

  • request_ratelimit (None)

  • include_output_tokens (bool)

Example

from upstash_redis import Redis
from upstash_ratelimit import Ratelimit, FixedWindow

redis = Redis.from_env()
ratelimit = Ratelimit(
    redis=redis,
    # fixed window to allow 10 requests every 10 seconds:
    limiter=FixedWindow(max_requests=10, window=10),
)

user_id = "foo"
handler = UpstashRatelimitHandler(
    identifier=user_id,
    request_ratelimit=ratelimit
)

# Initialize a simple runnable to test
chain = RunnableLambda(str)

# pass handler as callback:
output = chain.invoke(
    "input",
    config={
        "callbacks": [handler]
    }
)
on_agent_action(action: AgentAction, *, run_id: UUID, parent_run_id: UUID | None = None, **kwargs: Any) Any#

Run on agent action.

Parameters:
  • action (AgentAction) – The agent action.

  • run_id (UUID) – The run ID. This is the ID of the current run.

  • parent_run_id (UUID) – The parent run ID. This is the ID of the parent run.

  • kwargs (Any) – Additional keyword arguments.

Return type:

Any

on_agent_finish(finish: AgentFinish, *, run_id: UUID, parent_run_id: UUID | None = None, **kwargs: Any) Any#

Run on the agent end.

Parameters:
  • finish (AgentFinish) – The agent finish.

  • run_id (UUID) – The run ID. This is the ID of the current run.

  • parent_run_id (UUID) – The parent run ID. This is the ID of the parent run.

  • kwargs (Any) – Additional keyword arguments.

Return type:

Any

on_chain_end(outputs: dict[str, Any], *, run_id: UUID, parent_run_id: UUID | None = None, **kwargs: Any) Any#

Run when chain ends running.

Parameters:
  • outputs (Dict[str, Any]) – The outputs of the chain.

  • run_id (UUID) – The run ID. This is the ID of the current run.

  • parent_run_id (UUID) – The parent run ID. This is the ID of the parent run.

  • kwargs (Any) – Additional keyword arguments.

Return type:

Any

on_chain_error(error: BaseException, *, run_id: UUID, parent_run_id: UUID | None = None, **kwargs: Any) Any#

Run when chain errors.

Parameters:
  • error (BaseException) – The error that occurred.

  • run_id (UUID) – The run ID. This is the ID of the current run.

  • parent_run_id (UUID) – The parent run ID. This is the ID of the parent run.

  • kwargs (Any) – Additional keyword arguments.

Return type:

Any

on_chain_start(serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs: Any) Any[source]#

Run when chain starts running.

on_chain_start runs multiple times during a chain execution. To make sure that it’s only called once, we keep a bool state _checked. If not self._checked, we call limit with request_ratelimit and raise UpstashRatelimitError if the identifier is rate limited.

Parameters:
  • serialized (Dict[str, Any])

  • inputs (Dict[str, Any])

  • kwargs (Any)

Return type:

Any

on_chat_model_start(serialized: dict[str, Any], messages: list[list[BaseMessage]], *, run_id: UUID, parent_run_id: UUID | None = None, tags: list[str] | None = None, metadata: dict[str, Any] | None = None, **kwargs: Any) Any#

Run when a chat model starts running.

ATTENTION: This method is called for chat models. If you’re implementing

a handler for a non-chat model, you should use on_llm_start instead.

Parameters:
  • serialized (Dict[str, Any]) – The serialized chat model.

  • messages (List[List[BaseMessage]]) – The messages.

  • run_id (UUID) – The run ID. This is the ID of the current run.

  • parent_run_id (UUID) – The parent run ID. This is the ID of the parent run.

  • tags (Optional[List[str]]) – The tags.

  • metadata (Optional[Dict[str, Any]]) – The metadata.

  • kwargs (Any) – Additional keyword arguments.

Return type:

Any

on_custom_event(name: str, data: Any, *, run_id: UUID, tags: list[str] | None = None, metadata: dict[str, Any] | None = None, **kwargs: Any) Any#

Override to define a handler for a custom event.

Parameters:
  • name (str) – The name of the custom event.

  • data (Any) – The data for the custom event. Format will match the format specified by the user.

  • run_id (UUID) – The ID of the run.

  • tags (list[str] | None) – The tags associated with the custom event (includes inherited tags).

  • metadata (dict[str, Any] | None) – The metadata associated with the custom event (includes inherited metadata).

  • kwargs (Any)

Return type:

Any

Added in version 0.2.15.

on_llm_end(response: LLMResult, **kwargs: Any) None[source]#

Run when LLM ends running

If the include_output_tokens is set to True, number of tokens in LLM completion are counted for rate limiting

Parameters:
Return type:

None

on_llm_error(error: BaseException, *, run_id: UUID, parent_run_id: UUID | None = None, **kwargs: Any) Any#

Run when LLM errors.

Parameters:
  • error (BaseException) – The error that occurred.

  • run_id (UUID) – The run ID. This is the ID of the current run.

  • parent_run_id (UUID) – The parent run ID. This is the ID of the parent run.

  • kwargs (Any) – Additional keyword arguments.

Return type:

Any

on_llm_new_token(token: str, *, chunk: GenerationChunk | ChatGenerationChunk | None = None, run_id: UUID, parent_run_id: UUID | None = None, **kwargs: Any) Any#

Run on new LLM token. Only available when streaming is enabled.

Parameters:
  • token (str) – The new token.

  • chunk (GenerationChunk | ChatGenerationChunk) – The new generated chunk, containing content and other information.

  • run_id (UUID) – The run ID. This is the ID of the current run.

  • parent_run_id (UUID) – The parent run ID. This is the ID of the parent run.

  • kwargs (Any) – Additional keyword arguments.

Return type:

Any

on_llm_start(serialized: Dict[str, Any], prompts: List[str], **kwargs: Any) None[source]#

Run when LLM starts running

Parameters:
  • serialized (Dict[str, Any])

  • prompts (List[str])

  • kwargs (Any)

Return type:

None

on_retriever_end(documents: Sequence[Document], *, run_id: UUID, parent_run_id: UUID | None = None, **kwargs: Any) Any#

Run when Retriever ends running.

Parameters:
  • documents (Sequence[Document]) – The documents retrieved.

  • run_id (UUID) – The run ID. This is the ID of the current run.

  • parent_run_id (UUID) – The parent run ID. This is the ID of the parent run.

  • kwargs (Any) – Additional keyword arguments.

Return type:

Any

on_retriever_error(error: BaseException, *, run_id: UUID, parent_run_id: UUID | None = None, **kwargs: Any) Any#

Run when Retriever errors.

Parameters:
  • error (BaseException) – The error that occurred.

  • run_id (UUID) – The run ID. This is the ID of the current run.

  • parent_run_id (UUID) – The parent run ID. This is the ID of the parent run.

  • kwargs (Any) – Additional keyword arguments.

Return type:

Any

on_retriever_start(serialized: dict[str, Any], query: str, *, run_id: UUID, parent_run_id: UUID | None = None, tags: list[str] | None = None, metadata: dict[str, Any] | None = None, **kwargs: Any) Any#

Run when the Retriever starts running.

Parameters:
  • serialized (Dict[str, Any]) – The serialized Retriever.

  • query (str) – The query.

  • run_id (UUID) – The run ID. This is the ID of the current run.

  • parent_run_id (UUID) – The parent run ID. This is the ID of the parent run.

  • tags (Optional[List[str]]) – The tags.

  • metadata (Optional[Dict[str, Any]]) – The metadata.

  • kwargs (Any) – Additional keyword arguments.

Return type:

Any

on_retry(retry_state: RetryCallState, *, run_id: UUID, parent_run_id: UUID | None = None, **kwargs: Any) Any#

Run on a retry event.

Parameters:
  • retry_state (RetryCallState) – The retry state.

  • run_id (UUID) – The run ID. This is the ID of the current run.

  • parent_run_id (UUID) – The parent run ID. This is the ID of the parent run.

  • kwargs (Any) – Additional keyword arguments.

Return type:

Any

on_text(text: str, *, run_id: UUID, parent_run_id: UUID | None = None, **kwargs: Any) Any#

Run on an arbitrary text.

Parameters:
  • text (str) – The text.

  • run_id (UUID) – The run ID. This is the ID of the current run.

  • parent_run_id (UUID) – The parent run ID. This is the ID of the parent run.

  • kwargs (Any) – Additional keyword arguments.

Return type:

Any

on_tool_end(output: Any, *, run_id: UUID, parent_run_id: UUID | None = None, **kwargs: Any) Any#

Run when the tool ends running.

Parameters:
  • output (Any) – The output of the tool.

  • run_id (UUID) – The run ID. This is the ID of the current run.

  • parent_run_id (UUID) – The parent run ID. This is the ID of the parent run.

  • kwargs (Any) – Additional keyword arguments.

Return type:

Any

on_tool_error(error: BaseException, *, run_id: UUID, parent_run_id: UUID | None = None, **kwargs: Any) Any#

Run when tool errors.

Parameters:
  • error (BaseException) – The error that occurred.

  • run_id (UUID) – The run ID. This is the ID of the current run.

  • parent_run_id (UUID) – The parent run ID. This is the ID of the parent run.

  • kwargs (Any) – Additional keyword arguments.

Return type:

Any

on_tool_start(serialized: dict[str, Any], input_str: str, *, run_id: UUID, parent_run_id: UUID | None = None, tags: list[str] | None = None, metadata: dict[str, Any] | None = None, inputs: dict[str, Any] | None = None, **kwargs: Any) Any#

Run when the tool starts running.

Parameters:
  • serialized (Dict[str, Any]) – The serialized tool.

  • input_str (str) – The input string.

  • run_id (UUID) – The run ID. This is the ID of the current run.

  • parent_run_id (UUID) – The parent run ID. This is the ID of the parent run.

  • tags (Optional[List[str]]) – The tags.

  • metadata (Optional[Dict[str, Any]]) – The metadata.

  • inputs (Optional[Dict[str, Any]]) – The inputs.

  • kwargs (Any) – Additional keyword arguments.

Return type:

Any

reset(identifier: str | None = None) UpstashRatelimitHandler[source]#

Creates a new UpstashRatelimitHandler object with the same ratelimit configurations but with a new identifier if it’s provided.

Also resets the state of the handler.

Parameters:

identifier (str | None)

Return type:

UpstashRatelimitHandler

Examples using UpstashRatelimitHandler