Redis 高级用法深度解析:Stream 消息队列、Redisson 分布式锁与 Lua 脚本实战指南

Redis 高级用法深度解析:Stream 消息队列、Redisson 分布式锁与 Lua 脚本实战指南

引言

提起 Redis,大多数开发者第一时间想到的是缓存——Set/Get 操作、过期策略、缓存穿透防护。然而,Redis 的功能远不止于此。作为一款内存级数据结构服务器,Redis 提供了 Stream 消息队列、RedLock 分布式锁、Lua 脚本原子执行等高级特性,能够在微服务架构、高并发秒杀、异步任务编排等场景中发挥关键作用。

本文将从三个实战维度出发,带你深入 Redis 的高级用法,每部分均配有可运行的代码和配置示例。


一、核心概念速览

特性 适用场景 核心优势
Redis Stream 消息队列、事件溯源、日志收集 支持消费者组、消息持久化、ACK 机制
Redisson 分布式锁 秒杀、分布式定时任务、互斥资源 自动续期、可重入、高可用
Lua 脚本 原子化多步操作、限流器 一次性发送、原子执行、减少网络开销

二、实战一:Redis Stream 构建可靠消息队列

Redis 5.0 引入的 Stream 类型是一个持久化的、支持消费者组的消息队列,比 Pub/Sub 更可靠(消息不会丢失),比 List 更灵活(支持多消费者协同消费)。

2.1 Stream 基础操作

# 添加消息到 Stream(生产者)
redis-cli XADD orders:queue * user_id 1001 product_id 5001 amount 2

# 返回示例: "1728384000001-0"(时间戳-序号)

# 读取消息(消费者 - 非阻塞)
redis-cli XREAD COUNT 2 STREAMS orders:queue 0

# 阻塞读取(等待新消息)
redis-cli XREAD BLOCK 5000 COUNT 10 STREAMS orders:queue $

2.2 消费者组实战

消费者组允许多个消费者实例分工处理消息,每条消息只被组内一个消费者消费。

# 创建消费者组(从 Stream 头部开始消费)
redis-cli XGROUP CREATE orders:queue order-group $ MKSTREAM

# 消费者 A 读取消息
redis-cli XREADGROUP GROUP order-group consumer-a COUNT 2 STREAMS orders:queue >

# 消费者 B 读取消息(自动分配到不同消息)
redis-cli XREADGROUP GROUP order-group consumer-b COUNT 2 STREAMS orders:queue >

# 确认消息已处理
redis-cli XACK orders:queue order-group 1728384000001-0

# 查看未确认的消息(Pending Entries List)
redis-cli XPENDING orders:queue order-group

2.3 Python 生产消费完整示例

import redis
import json
import time
import uuid
from typing import Dict, Any

class OrderStreamProducer:
    def __init__(self, host='localhost', port=6379, stream_key='orders:queue'):
        self.client = redis.Redis(host=host, port=port, decode_responses=True)
        self.stream_key = stream_key

    def publish_order(self, order: Dict[str, Any]) -> str:
        """发布订单消息到 Stream"""
        message_id = self.client.xadd(
            self.stream_key,
            fields=order,
            maxlen=10000  # 限制队列长度,防止内存溢出
        )
        print(f"[Producer] 订单已发布: {message_id}")
        return message_id

class OrderStreamConsumer:
    def __init__(self, host='localhost', port=6379,
                 stream_key='orders:queue', group_name='order-group',
                 consumer_name: str = None):
        self.client = redis.Redis(host=host, port=port, decode_responses=True)
        self.stream_key = stream_key
        self.group_name = group_name
        self.consumer_name = consumer_name or f'consumer-{uuid.uuid4().hex[:8]}'
        self._ensure_group()

    def _ensure_group(self):
        """确保消费者组存在"""
        try:
            self.client.xgroup_create(self.stream_key, self.group_name, id='0', mkstream=True)
        except redis.exceptions.ResponseError as e:
            if 'BUSYGROUP' not in str(e):
                raise

    def process_orders(self, batch_size: int = 5, block_ms: int = 2000):
        """批量消费订单消息"""
        while True:
            try:
                results = self.client.xreadgroup(
                    groupname=self.group_name,
                    consumername=self.consumer_name,
                    streams={self.stream_key: '>'},
                    count=batch_size,
                    block=block_ms
                )
                if not results:
                    continue

                for stream_name, messages in results:
                    for msg_id, msg_data in messages:
                        print(f"[{self.consumer_name}] 处理订单: {msg_id} → {msg_data}")
                        # 模拟订单处理逻辑
                        time.sleep(0.1)
                        # 确认消息已处理
                        self.client.xack(self.stream_key, self.group_name, msg_id)
                        print(f"[{self.consumer_name}] 订单确认: {msg_id}")

            except Exception as e:
                print(f"[Consumer] 错误: {e}")
                time.sleep(1)

# 使用示例
if __name__ == '__main__':
    producer = OrderStreamProducer()
    producer.publish_order({'user_id': '1001', 'product_id': '5001', 'amount': '2'})

    consumer = OrderStreamConsumer(consumer_name='worker-1')
    # consumer.process_orders()  # 启动消费循环

三、实战二:Redisson 分布式锁确保秒杀原子性

在分布式系统中,Java 的 synchronized 和 Python 的 threading.Lock 无法跨进程生效。Redis 分布式锁是业界标准方案,而 Redisson 框架提供了开箱即用的实现。

3.1 Redisson 配置(Spring Boot)

# application.yml
spring:
  redis:
    host: 192.168.1.100
    port: 6379
    password: ${REDIS_PASSWORD}
    timeout: 3000ms
    lettuce:
      pool:
        max-active: 32
        max-idle: 16
        min-idle: 8

# Redisson 独立配置
redisson:
  single-server-config:
    address: "redis://192.168.1.100:6379"
    connection-pool-size: 64
    connection-minimum-idle-size: 16

3.2 Java 秒杀代码实战

import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;

@Service
public class SeckillService {

    private static final String LOCK_KEY_PREFIX = "seckill:lock:";

    private final RedissonClient redissonClient;

    public SeckillService(RedissonClient redissonClient) {
        this.redissonClient = redissonClient;
    }

    /**
     * 秒杀接口 - 使用 Redisson 分布式锁保证原子性
     * @param productId 商品ID
     * @param userId    用户ID
     * @return true=抢购成功
     */
    public boolean seckill(Long productId, Long userId) {
        String lockKey = LOCK_KEY_PREFIX + productId;
        RLock lock = redissonClient.getLock(lockKey);

        try {
            // 尝试加锁,最多等待3秒,锁30秒后自动释放(看门狗会自动续期)
            boolean locked = lock.tryLock(3, 30, TimeUnit.SECONDS);
            if (!locked) {
                System.out.println("用户[" + userId + "] 抢购商品[" + productId + "] 获取锁超时");
                return false;
            }

            // ---------- 业务逻辑:扣减库存 ----------
            int stock = getStockFromDB(productId);
            if (stock <= 0) {
                System.out.println("用户[" + userId + "] 抢购商品[" + productId + "] 库存不足");
                return false;
            }

            // 扣减库存
            updateStockInDB(productId, stock - 1);
            createOrder(productId, userId);

            System.out.println("用户[" + userId + "] 抢购商品[" + productId + "] 成功!剩余库存: " + (stock - 1));
            return true;

        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            System.err.println("抢购异常: " + e.getMessage());
            return false;
        } finally {
            // 释放锁(Redisson 自动处理,但显式释放更安全)
            if (lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }
    }

    private int getStockFromDB(Long productId) { return 100; }
    private void updateStockInDB(Long productId, int newStock) {}
    private void createOrder(Long productId, Long userId) {}
}

3.3 Python 版分布式锁(纯 Redis 实现)

import redis
import time
import uuid
import threading

class RedisDistributedLock:
    """基于 Redis 的分布式锁实现(非 Redisson,但逻辑等价)"""

    def __init__(self, redis_client: redis.Redis, lock_key: str,
                 ttl: int = 30, retry_interval: float = 0.1):
        self.client = redis_client
        self.lock_key = f"lock:{lock_key}"
        self.ttl = ttl
        self.retry_interval = retry_interval
        self.lock_value = None
        self._renewal_thread = None

    def acquire(self, timeout: float = 3.0) -> bool:
        """尝试获取锁(带超时)"""
        lock_value = uuid.uuid4().hex
        deadline = time.time() + timeout

        while time.time() < deadline:
            acquired = self.client.set(
                self.lock_key, lock_value,
                nx=True,                  # 只有 key 不存在时才设置
                ex=self.ttl               # 自动过期,防止死锁
            )
            if acquired:
                self.lock_value = lock_value
                self._start_watchdog()
                return True
            time.sleep(self.retry_interval)
        return False

    def _start_watchdog(self):
        """看门狗线程:自动续期锁"""
        def _renew():
            while self.lock_value:
                time.sleep(self.ttl / 3)  # 每 TTL/3 秒续期一次
                if self.lock_value:
                    self.client.expire(self.lock_key, self.ttl)
        self._renewal_thread = threading.Thread(target=_renew, daemon=True)
        self._renewal_thread.start()

    def release(self):
        """释放锁(使用 Lua 保证原子性)"""
        if self.lock_value:
            lua_script = """
            if redis.call("GET", KEYS[1]) == ARGV[1] then
                return redis.call("DEL", KEYS[1])
            else
                return 0
            end
            """
            self.client.eval(lua_script, 1, self.lock_key, self.lock_value)
            self.lock_value = None

四、实战三:Lua 脚本实现原子化限流器

Redis 的 Lua 脚本特性允许你在服务端原子执行多条命令,这对实现限流、计数器、令牌桶等场景至关重要。

4.1 滑动窗口限流器 Lua 脚本

-- 文件名: sliding_window_rate_limiter.lua
-- 参数说明:
--   KEYS[1] = 限流 key(通常为 IP 或用户 ID)
--   ARGV[1] = 窗口大小(毫秒)
--   ARGV[2] = 窗口内最大请求数
--   ARGV[3] = 当前时间戳(毫秒)
-- 返回:
--   0 = 被限流, 1 = 允许通过

local key = KEYS[1]
local window_ms = tonumber(ARGV[1])
local max_requests = tonumber(ARGV[2])
local now = tonumber(ARGV[3])

-- 清理窗口外的旧记录
local window_start = now - window_ms
redis.call("ZREMRANGEBYSCORE", key, 0, window_start)

-- 统计窗口内请求数
local current_count = redis.call("ZCARD", key)

-- 检查是否超过限制
if current_count >= max_requests then
    -- 还在限流窗口内,可以返回剩余冷却时间
    local oldest = redis.call("ZRANGE", key, 0, 0, "WITHSCORES")
    local retry_after = window_ms - (now - tonumber(oldest[2]))
    return {0, math.ceil(retry_after / 1000)}
end

-- 允许请求,添加当前记录到有序集合
redis.call("ZADD", key, now, now .. ":" .. math.random())
-- 设置 key 过期时间(窗口大小 + 1 秒,避免内存堆积)
redis.call("PEXPIRE", key, window_ms + 1000)

return {1, 0}

4.2 从 Python 调用限流脚本

import redis
import time
import hashlib

class SlidingWindowRateLimiter:
    def __init__(self, host='localhost', port=6379):
        self.client = redis.Redis(host=host, port=port, decode_responses=True)
        # 加载 Lua 脚本,获得 SHA 缓存加速
        with open('sliding_window_rate_limiter.lua', 'r') as f:
            self.script_sha = self.client.script_load(f.read())

    def is_allowed(self, identifier: str, window_ms: int = 60000,
                   max_requests: int = 10) -> tuple:
        """
        检查请求是否允许通过
        返回: (allowed: bool, retry_after: int)
        """
        key = f"ratelimit:{identifier}"
        now = int(time.time() * 1000)

        result = self.client.evalsha(
            self.script_sha,
            1,              # KEYS 数量
            key,            # KEYS[1]
            window_ms,      # ARGV[1]
            max_requests,   # ARGV[2]
            now             # ARGV[3]
        )

        allowed = bool(result[0])
        retry_after = int(result[1]) if len(result) > 1 else 0
        return allowed, retry_after


# 使用示例
limiter = SlidingWindowRateLimiter()

# 模拟用户 1001 的请求
for i in range(15):
    allowed, retry = limiter.is_allowed("user:1001", window_ms=60000, max_requests=10)
    status = "✅ 通过" if allowed else f"❌ 限流 (请等待 {retry}s)"
    print(f"请求 {i+1:2d}: {status}")
    time.sleep(0.1)

4.3 Lua 脚本在 Redis 中的注册与调用

# 方式一:直接执行 Lua 字符串
redis-cli EVAL "return redis.call('SET', KEYS[1], ARGV[1])" 1 mykey hello

# 方式二:加载脚本获得 SHA
redis-cli SCRIPT LOAD "return redis.call('GET', KEYS[1])"
# 返回: "4e6d1c0fa8c0..."

# 通过 SHA 调用(更快,避免每次传输脚本)
redis-cli EVALSHA 4e6d1c0fa8c0... 1 mykey

# 查看已加载的脚本
redis-cli SCRIPT EXISTS 4e6d1c0fa8c0...

# 清空脚本缓存(生产环境慎用)
redis-cli SCRIPT FLUSH

五、常见问题与最佳实践

Q1:Redis Stream 和 Kafka 的选择?

答: Stream 适合中小规模(<10万消息/秒)、低延迟、无需复杂分区策略的场景。Kafka 适合大规模日志聚合、需要长期消息回溯、多消费者独立 offset 的场景。Stream 的优势在于部署轻量(无需额外组件)和原生集成 Redis 生态。

Q2:分布式锁的「看门狗」机制是什么?

答: 看门狗(Watchdog)是 Redisson 的核心机制——当持有锁的线程还在执行(未超时),看门狗会每隔 lockWatchdogTimeout/3 毫秒自动续期锁的 TTL,确保业务逻辑未完成时锁不会过期。这避免了预估业务执行时间的难题。

Q3:Redis Lua 脚本有什么注意事项?

答:
1. 不要使用随机值 — Lua 脚本在 Redis Cluster 中可能被重放,redis.random() 会导致不一致
2. 保持简短 — 脚本执行期间会阻塞 Redis 主线程,控制在 10ms 以内
3. 使用 EVALSHA — 生产环境用 EVALSHA 替代 EVAL 减少带宽开销
4. 显式声明 KEYS — KEYS 和 ARGV 要明确区分,方便 Cluster 路由

Q4:Redis Stream 消息积压怎么办?

答: 设置 MAXLEN ~ 100000(近似裁剪)限制队列长度;配置监控告警监控 XLEN;实施背压机制:当队列长度超过阈值时生产者降速;必要时启用消费者扩容。


总结

本文从三个实战维度深入挖掘了 Redis 的高级用法:

实战项目 核心技术 生产收益
Stream 消息队列 XADD / XREADGROUP / XACK 异步解耦,消息不丢失,支持消费者组
Redisson 分布式锁 tryLock / Watchdog / Lua 释放 跨进程互斥,自动续期,防止死锁
Lua 限流器 EVALSHA / ZSET 滑动窗口 原子化限流,毫秒级响应,内存高效

这些高级特性让 Redis 从一个「缓存工具」进化为「分布式系统核心基础设施」。建议你在自己的项目中逐步引入这些能力——先从 Stream 替换简单的 Redis List 队列开始,再逐步引入分布式锁和 Lua 脚本,每一步都能带来架构层面的提升。

最后的小提示: 生产环境中建议 Redis 版本 ≥ 6.2(Stream 消费者组功能的完善版本),并开启 AOF 持久化(appendfsync everysec 兼顾性能与安全)。如果你正在使用 Redis Cluster,Lua 脚本中涉及的所有 key 必须属于同一个 hash slot。

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

取消
昵称表情代码图片快捷回复

    暂无评论内容