Skip to main content

Upstash Ratelimit 回调

在本指南中,我们将介绍如何使用 UpstashRatelimitHandler 基于请求数量或令牌数量添加速率限制。该处理程序使用 Upstash 的 ratelimit 库,该库利用了 Upstash Redis

Upstash Ratelimit 的工作原理是在每次调用 limit 方法时向 Upstash Redis 发送一个 HTTP 请求。检查并更新用户的剩余令牌/请求。根据剩余的令牌,我们可以停止执行诸如调用 LLM 或查询向量存储等耗费资源的操作:

response = ratelimit.limit()
if response.allowed:
execute_costly_operation()

UpstashRatelimitHandler 允许您在几分钟内将速率限制逻辑集成到您的链中。

首先,您需要访问 Upstash 控制台 并创建一个 Redis 数据库(查看我们的文档)。创建数据库后,您需要设置环境变量:

UPSTASH_REDIS_REST_URL="****"
UPSTASH_REDIS_REST_TOKEN="****"

接下来,您需要使用以下命令安装 Upstash Ratelimit 和 Redis 库:

pip install upstash-ratelimit upstash-redis

您现在可以开始为您的链添加速率限制了!

每个请求的速率限制

假设我们希望允许用户每分钟调用我们的链 10 次。实现这一点非常简单:

# set env variables
import os

os.environ["UPSTASH_REDIS_REST_URL"] = "****"
os.environ["UPSTASH_REDIS_REST_TOKEN"] = "****"

from langchain_community.callbacks import UpstashRatelimitError, UpstashRatelimitHandler
from langchain_core.runnables import RunnableLambda
from upstash_ratelimit import FixedWindow, Ratelimit
from upstash_redis import Redis

# create ratelimit
ratelimit = Ratelimit(
redis=Redis.from_env(),
# 10 requests per window, where window size is 60 seconds:
limiter=FixedWindow(max_requests=10, window=60),
)

# create handler
user_id = "user_id" # should be a method which gets the user id
handler = UpstashRatelimitHandler(identifier=user_id, request_ratelimit=ratelimit)

# create mock chain
chain = RunnableLambda(str)

# invoke chain with handler:
try:
result = chain.invoke("Hello world!", config={"callbacks": [handler]})
except UpstashRatelimitError:
print("Handling ratelimit.", UpstashRatelimitError)
Error in UpstashRatelimitHandler.on_chain_start callback: UpstashRatelimitError('Request limit reached!')
``````output
Handling ratelimit. <class 'langchain_community.callbacks.upstash_ratelimit_callback.UpstashRatelimitError'>

请注意,我们将处理程序传递给 invoke 方法,而不是在定义链时传递处理程序。

有关除 FixedWindow 之外的速率限制算法,请参见 upstash-ratelimit 文档

在执行我们管道中的任何步骤之前,速率限制将检查用户是否超过了请求限制。如果超过,则会引发 UpstashRatelimitError

每个令牌的速率限制

另一个选项是基于以下内容对链调用进行速率限制:

  1. 提示中的令牌数量
  2. 提示和LLM完成中的令牌数量

这仅在您的链中有LLM时有效。另一个要求是您使用的LLM应该在其 LLMOutput 中返回令牌使用情况。

工作原理

处理程序将在调用 LLM 之前获取剩余的令牌。如果剩余的令牌大于 0,则会调用 LLM。否则将引发 UpstashRatelimitError

在调用 LLM 之后,令牌使用信息将用于从用户的剩余令牌中扣除。在链的这一阶段不会引发错误。

配置

对于第一个配置,只需像这样初始化处理程序:

ratelimit = Ratelimit(
redis=Redis.from_env(),
# 每个窗口 1000 个令牌,窗口大小为 60 秒:
limiter=FixedWindow(max_requests=1000, window=60),
)

handler = UpstashRatelimitHandler(identifier=user_id, token_ratelimit=ratelimit)

对于第二个配置,以下是初始化处理程序的方法:

ratelimit = Ratelimit(
redis=Redis.from_env(),
# 每个窗口 1000 个令牌,窗口大小为 60 秒:
limiter=FixedWindow(max_requests=1000, window=60),
)

handler = UpstashRatelimitHandler(
identifier=user_id,
token_ratelimit=ratelimit,
include_output_tokens=True, # 设置为 True
)

您还可以通过同时传递 request_ratelimittoken_ratelimit 参数来实现基于请求和令牌的速率限制。

以下是一个使用 LLM 的链的示例:

# 设置环境变量
import os

os.environ["UPSTASH_REDIS_REST_URL"] = "****"
os.environ["UPSTASH_REDIS_REST_TOKEN"] = "****"
os.environ["OPENAI_API_KEY"] = "****"

from langchain_community.callbacks import UpstashRatelimitError, UpstashRatelimitHandler
from langchain_core.runnables import RunnableLambda
from langchain_openai import ChatOpenAI
from upstash_ratelimit import FixedWindow, Ratelimit
from upstash_redis import Redis

# 创建速率限制
ratelimit = Ratelimit(
redis=Redis.from_env(),
# 每个窗口 500 个令牌,窗口大小为 60 秒:
limiter=FixedWindow(max_requests=500, window=60),
)

# 创建处理程序
user_id = "user_id" # 应该是获取用户 ID 的方法
handler = UpstashRatelimitHandler(identifier=user_id, token_ratelimit=ratelimit)

# 创建模拟链
as_str = RunnableLambda(str)
model = ChatOpenAI()

chain = as_str | model

# 使用处理程序调用链:
try:
result = chain.invoke("Hello world!", config={"callbacks": [handler]})
except UpstashRatelimitError:
print("处理速率限制。", UpstashRatelimitError)
Error in UpstashRatelimitHandler.on_llm_start callback: UpstashRatelimitError('Token limit reached!')
``````output
Handling ratelimit. <class 'langchain_community.callbacks.upstash_ratelimit_callback.UpstashRatelimitError'>

此页面是否有帮助?


您还可以留下详细的反馈 在 GitHub 上