Redis 高级用法深度解析:Stream 消息队列、Redisson 分布式锁与 Lua 脚本实战指南
引言
提起 Redis,大多数开发者第一时间想到的是缓存——Set/Get 操作、过期策略、缓存穿透防护。然而,Redis 的功能远不止于此。作为一款内存级数据结构服务器,Redis 提供了 Stream 消息队列、RedLock 分布式锁、Lua 脚本原子执行等高级特性,能够在微服务架构、高并发秒杀、异步任务编排等场景中发挥关键作用。
本文将从三个实战维度出发,带你深入 Redis 的高级用法,每部分均配有可运行的代码和配置示例。
一、核心概念速览
| 特性 | 适用场景 | 核心优势 |
|---|---|---|
| Redis Stream | 消息队列、事件溯源、日志收集 | 支持消费者组、消息持久化、ACK 机制 |
| Redisson 分布式锁 | 秒杀、分布式定时任务、互斥资源 | 自动续期、可重入、高可用 |
| Lua 脚本 | 原子化多步操作、限流器 | 一次性发送、原子执行、减少网络开销 |
二、实战一:Redis Stream 构建可靠消息队列
Redis 5.0 引入的 Stream 类型是一个持久化的、支持消费者组的消息队列,比 Pub/Sub 更可靠(消息不会丢失),比 List 更灵活(支持多消费者协同消费)。
2.1 Stream 基础操作
# 添加消息到 Stream(生产者)
redis-cli XADD orders:queue * user_id 1001 product_id 5001 amount 2
# 返回示例: "1728384000001-0"(时间戳-序号)
# 读取消息(消费者 - 非阻塞)
redis-cli XREAD COUNT 2 STREAMS orders:queue 0
# 阻塞读取(等待新消息)
redis-cli XREAD BLOCK 5000 COUNT 10 STREAMS orders:queue $
2.2 消费者组实战
消费者组允许多个消费者实例分工处理消息,每条消息只被组内一个消费者消费。
# 创建消费者组(从 Stream 头部开始消费)
redis-cli XGROUP CREATE orders:queue order-group $ MKSTREAM
# 消费者 A 读取消息
redis-cli XREADGROUP GROUP order-group consumer-a COUNT 2 STREAMS orders:queue >
# 消费者 B 读取消息(自动分配到不同消息)
redis-cli XREADGROUP GROUP order-group consumer-b COUNT 2 STREAMS orders:queue >
# 确认消息已处理
redis-cli XACK orders:queue order-group 1728384000001-0
# 查看未确认的消息(Pending Entries List)
redis-cli XPENDING orders:queue order-group
2.3 Python 生产消费完整示例
import redis
import json
import time
import uuid
from typing import Dict, Any
class OrderStreamProducer:
def __init__(self, host='localhost', port=6379, stream_key='orders:queue'):
self.client = redis.Redis(host=host, port=port, decode_responses=True)
self.stream_key = stream_key
def publish_order(self, order: Dict[str, Any]) -> str:
"""发布订单消息到 Stream"""
message_id = self.client.xadd(
self.stream_key,
fields=order,
maxlen=10000 # 限制队列长度,防止内存溢出
)
print(f"[Producer] 订单已发布: {message_id}")
return message_id
class OrderStreamConsumer:
def __init__(self, host='localhost', port=6379,
stream_key='orders:queue', group_name='order-group',
consumer_name: str = None):
self.client = redis.Redis(host=host, port=port, decode_responses=True)
self.stream_key = stream_key
self.group_name = group_name
self.consumer_name = consumer_name or f'consumer-{uuid.uuid4().hex[:8]}'
self._ensure_group()
def _ensure_group(self):
"""确保消费者组存在"""
try:
self.client.xgroup_create(self.stream_key, self.group_name, id='0', mkstream=True)
except redis.exceptions.ResponseError as e:
if 'BUSYGROUP' not in str(e):
raise
def process_orders(self, batch_size: int = 5, block_ms: int = 2000):
"""批量消费订单消息"""
while True:
try:
results = self.client.xreadgroup(
groupname=self.group_name,
consumername=self.consumer_name,
streams={self.stream_key: '>'},
count=batch_size,
block=block_ms
)
if not results:
continue
for stream_name, messages in results:
for msg_id, msg_data in messages:
print(f"[{self.consumer_name}] 处理订单: {msg_id} → {msg_data}")
# 模拟订单处理逻辑
time.sleep(0.1)
# 确认消息已处理
self.client.xack(self.stream_key, self.group_name, msg_id)
print(f"[{self.consumer_name}] 订单确认: {msg_id}")
except Exception as e:
print(f"[Consumer] 错误: {e}")
time.sleep(1)
# 使用示例
if __name__ == '__main__':
producer = OrderStreamProducer()
producer.publish_order({'user_id': '1001', 'product_id': '5001', 'amount': '2'})
consumer = OrderStreamConsumer(consumer_name='worker-1')
# consumer.process_orders() # 启动消费循环
三、实战二:Redisson 分布式锁确保秒杀原子性
在分布式系统中,Java 的 synchronized 和 Python 的 threading.Lock 无法跨进程生效。Redis 分布式锁是业界标准方案,而 Redisson 框架提供了开箱即用的实现。
3.1 Redisson 配置(Spring Boot)
# application.yml
spring:
redis:
host: 192.168.1.100
port: 6379
password: ${REDIS_PASSWORD}
timeout: 3000ms
lettuce:
pool:
max-active: 32
max-idle: 16
min-idle: 8
# Redisson 独立配置
redisson:
single-server-config:
address: "redis://192.168.1.100:6379"
connection-pool-size: 64
connection-minimum-idle-size: 16
3.2 Java 秒杀代码实战
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
@Service
public class SeckillService {
private static final String LOCK_KEY_PREFIX = "seckill:lock:";
private final RedissonClient redissonClient;
public SeckillService(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
}
/**
* 秒杀接口 - 使用 Redisson 分布式锁保证原子性
* @param productId 商品ID
* @param userId 用户ID
* @return true=抢购成功
*/
public boolean seckill(Long productId, Long userId) {
String lockKey = LOCK_KEY_PREFIX + productId;
RLock lock = redissonClient.getLock(lockKey);
try {
// 尝试加锁,最多等待3秒,锁30秒后自动释放(看门狗会自动续期)
boolean locked = lock.tryLock(3, 30, TimeUnit.SECONDS);
if (!locked) {
System.out.println("用户[" + userId + "] 抢购商品[" + productId + "] 获取锁超时");
return false;
}
// ---------- 业务逻辑:扣减库存 ----------
int stock = getStockFromDB(productId);
if (stock <= 0) {
System.out.println("用户[" + userId + "] 抢购商品[" + productId + "] 库存不足");
return false;
}
// 扣减库存
updateStockInDB(productId, stock - 1);
createOrder(productId, userId);
System.out.println("用户[" + userId + "] 抢购商品[" + productId + "] 成功!剩余库存: " + (stock - 1));
return true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("抢购异常: " + e.getMessage());
return false;
} finally {
// 释放锁(Redisson 自动处理,但显式释放更安全)
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
private int getStockFromDB(Long productId) { return 100; }
private void updateStockInDB(Long productId, int newStock) {}
private void createOrder(Long productId, Long userId) {}
}
3.3 Python 版分布式锁(纯 Redis 实现)
import redis
import time
import uuid
import threading
class RedisDistributedLock:
"""基于 Redis 的分布式锁实现(非 Redisson,但逻辑等价)"""
def __init__(self, redis_client: redis.Redis, lock_key: str,
ttl: int = 30, retry_interval: float = 0.1):
self.client = redis_client
self.lock_key = f"lock:{lock_key}"
self.ttl = ttl
self.retry_interval = retry_interval
self.lock_value = None
self._renewal_thread = None
def acquire(self, timeout: float = 3.0) -> bool:
"""尝试获取锁(带超时)"""
lock_value = uuid.uuid4().hex
deadline = time.time() + timeout
while time.time() < deadline:
acquired = self.client.set(
self.lock_key, lock_value,
nx=True, # 只有 key 不存在时才设置
ex=self.ttl # 自动过期,防止死锁
)
if acquired:
self.lock_value = lock_value
self._start_watchdog()
return True
time.sleep(self.retry_interval)
return False
def _start_watchdog(self):
"""看门狗线程:自动续期锁"""
def _renew():
while self.lock_value:
time.sleep(self.ttl / 3) # 每 TTL/3 秒续期一次
if self.lock_value:
self.client.expire(self.lock_key, self.ttl)
self._renewal_thread = threading.Thread(target=_renew, daemon=True)
self._renewal_thread.start()
def release(self):
"""释放锁(使用 Lua 保证原子性)"""
if self.lock_value:
lua_script = """
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
"""
self.client.eval(lua_script, 1, self.lock_key, self.lock_value)
self.lock_value = None
四、实战三:Lua 脚本实现原子化限流器
Redis 的 Lua 脚本特性允许你在服务端原子执行多条命令,这对实现限流、计数器、令牌桶等场景至关重要。
4.1 滑动窗口限流器 Lua 脚本
-- 文件名: sliding_window_rate_limiter.lua
-- 参数说明:
-- KEYS[1] = 限流 key(通常为 IP 或用户 ID)
-- ARGV[1] = 窗口大小(毫秒)
-- ARGV[2] = 窗口内最大请求数
-- ARGV[3] = 当前时间戳(毫秒)
-- 返回:
-- 0 = 被限流, 1 = 允许通过
local key = KEYS[1]
local window_ms = tonumber(ARGV[1])
local max_requests = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
-- 清理窗口外的旧记录
local window_start = now - window_ms
redis.call("ZREMRANGEBYSCORE", key, 0, window_start)
-- 统计窗口内请求数
local current_count = redis.call("ZCARD", key)
-- 检查是否超过限制
if current_count >= max_requests then
-- 还在限流窗口内,可以返回剩余冷却时间
local oldest = redis.call("ZRANGE", key, 0, 0, "WITHSCORES")
local retry_after = window_ms - (now - tonumber(oldest[2]))
return {0, math.ceil(retry_after / 1000)}
end
-- 允许请求,添加当前记录到有序集合
redis.call("ZADD", key, now, now .. ":" .. math.random())
-- 设置 key 过期时间(窗口大小 + 1 秒,避免内存堆积)
redis.call("PEXPIRE", key, window_ms + 1000)
return {1, 0}
4.2 从 Python 调用限流脚本
import redis
import time
import hashlib
class SlidingWindowRateLimiter:
def __init__(self, host='localhost', port=6379):
self.client = redis.Redis(host=host, port=port, decode_responses=True)
# 加载 Lua 脚本,获得 SHA 缓存加速
with open('sliding_window_rate_limiter.lua', 'r') as f:
self.script_sha = self.client.script_load(f.read())
def is_allowed(self, identifier: str, window_ms: int = 60000,
max_requests: int = 10) -> tuple:
"""
检查请求是否允许通过
返回: (allowed: bool, retry_after: int)
"""
key = f"ratelimit:{identifier}"
now = int(time.time() * 1000)
result = self.client.evalsha(
self.script_sha,
1, # KEYS 数量
key, # KEYS[1]
window_ms, # ARGV[1]
max_requests, # ARGV[2]
now # ARGV[3]
)
allowed = bool(result[0])
retry_after = int(result[1]) if len(result) > 1 else 0
return allowed, retry_after
# 使用示例
limiter = SlidingWindowRateLimiter()
# 模拟用户 1001 的请求
for i in range(15):
allowed, retry = limiter.is_allowed("user:1001", window_ms=60000, max_requests=10)
status = "✅ 通过" if allowed else f"❌ 限流 (请等待 {retry}s)"
print(f"请求 {i+1:2d}: {status}")
time.sleep(0.1)
4.3 Lua 脚本在 Redis 中的注册与调用
# 方式一:直接执行 Lua 字符串
redis-cli EVAL "return redis.call('SET', KEYS[1], ARGV[1])" 1 mykey hello
# 方式二:加载脚本获得 SHA
redis-cli SCRIPT LOAD "return redis.call('GET', KEYS[1])"
# 返回: "4e6d1c0fa8c0..."
# 通过 SHA 调用(更快,避免每次传输脚本)
redis-cli EVALSHA 4e6d1c0fa8c0... 1 mykey
# 查看已加载的脚本
redis-cli SCRIPT EXISTS 4e6d1c0fa8c0...
# 清空脚本缓存(生产环境慎用)
redis-cli SCRIPT FLUSH
五、常见问题与最佳实践
Q1:Redis Stream 和 Kafka 的选择?
答: Stream 适合中小规模(<10万消息/秒)、低延迟、无需复杂分区策略的场景。Kafka 适合大规模日志聚合、需要长期消息回溯、多消费者独立 offset 的场景。Stream 的优势在于部署轻量(无需额外组件)和原生集成 Redis 生态。
Q2:分布式锁的「看门狗」机制是什么?
答: 看门狗(Watchdog)是 Redisson 的核心机制——当持有锁的线程还在执行(未超时),看门狗会每隔 lockWatchdogTimeout/3 毫秒自动续期锁的 TTL,确保业务逻辑未完成时锁不会过期。这避免了预估业务执行时间的难题。
Q3:Redis Lua 脚本有什么注意事项?
答:
1. 不要使用随机值 — Lua 脚本在 Redis Cluster 中可能被重放,redis.random() 会导致不一致
2. 保持简短 — 脚本执行期间会阻塞 Redis 主线程,控制在 10ms 以内
3. 使用 EVALSHA — 生产环境用 EVALSHA 替代 EVAL 减少带宽开销
4. 显式声明 KEYS — KEYS 和 ARGV 要明确区分,方便 Cluster 路由
Q4:Redis Stream 消息积压怎么办?
答: 设置 MAXLEN ~ 100000(近似裁剪)限制队列长度;配置监控告警监控 XLEN;实施背压机制:当队列长度超过阈值时生产者降速;必要时启用消费者扩容。
总结
本文从三个实战维度深入挖掘了 Redis 的高级用法:
| 实战项目 | 核心技术 | 生产收益 |
|---|---|---|
| Stream 消息队列 | XADD / XREADGROUP / XACK | 异步解耦,消息不丢失,支持消费者组 |
| Redisson 分布式锁 | tryLock / Watchdog / Lua 释放 | 跨进程互斥,自动续期,防止死锁 |
| Lua 限流器 | EVALSHA / ZSET 滑动窗口 | 原子化限流,毫秒级响应,内存高效 |
这些高级特性让 Redis 从一个「缓存工具」进化为「分布式系统核心基础设施」。建议你在自己的项目中逐步引入这些能力——先从 Stream 替换简单的 Redis List 队列开始,再逐步引入分布式锁和 Lua 脚本,每一步都能带来架构层面的提升。
最后的小提示: 生产环境中建议 Redis 版本 ≥ 6.2(Stream 消费者组功能的完善版本),并开启 AOF 持久化(appendfsync everysec 兼顾性能与安全)。如果你正在使用 Redis Cluster,Lua 脚本中涉及的所有 key 必须属于同一个 hash slot。















暂无评论内容