Skip to content

fix(redis_lock): refactor RedisFairLock to use ZSET for queue management and fix loop shutdown#758

Merged
keeees merged 1 commit intodevelopfrom
refactor/redis-lock
Apr 1, 2026
Merged

fix(redis_lock): refactor RedisFairLock to use ZSET for queue management and fix loop shutdown#758
keeees merged 1 commit intodevelopfrom
refactor/redis-lock

Conversation

@myhMARS
Copy link
Copy Markdown
Collaborator

@myhMARS myhMARS commented Apr 1, 2026

  • Replace list-based queue with sorted set for better dead client cleanup
  • Add zombie cleanup buffer to handle expired queue entries
  • Fix potential None loop reference in graceful shutdown
  • Add task start time to write_message_task result
  • Update lock acquisition script to use ZSET operations
  • Remove unused queue cleanup scripts
  • Ensure proper lock release and renewal failure handling

Summary by Sourcery

重构 RedisFairLock,使用基于有序集合(sorted set)的锁获取脚本,并改进清理和失败处理;同时调整任务执行逻辑,以暴露任务开始时间并安全地关闭事件循环。

New Features:

  • write_message_task 的结果负载中包含任务开始时间戳。

Bug Fixes:

  • 在优雅关闭事件循环时,防止在事件循环可能尚未创建的情况下引发错误。
  • 当续约失败时停止锁续约循环,以确保不会让陈旧锁继续存活。

Enhancements:

  • 将基于列表的 Redis 锁队列替换为由 ZSET 支持的锁获取脚本,该脚本会定期清理已过期的条目。
  • 简化解锁逻辑,仅依赖键解锁脚本,并移除未使用的队列管理脚本。
Original summary in English

Summary by Sourcery

Refactor RedisFairLock to use a sorted-set-based acquisition script with improved cleanup and failure handling, and adjust task execution to expose start time and safely shut down the event loop.

New Features:

  • Include task start timestamp in the write_message_task result payload.

Bug Fixes:

  • Prevent errors when gracefully shutting down an event loop that may not have been created.
  • Stop the lock renewal loop when renewal fails to ensure stale locks are not kept alive.

Enhancements:

  • Replace the list-based Redis lock queue with a ZSET-backed acquisition script that periodically cleans up expired entries.
  • Simplify lock release to rely solely on the key unlock script while removing unused queue management scripts.

…ent and fix loop shutdown

- Replace list-based queue with sorted set for better dead client cleanup
- Add zombie cleanup buffer to handle expired queue entries
- Fix potential None loop reference in graceful shutdown
- Add task start time to write_message_task result
- Update lock acquisition script to use ZSET operations
- Remove unused queue cleanup scripts
- Ensure proper lock release and renewal failure handling
@myhMARS myhMARS requested a review from keeees April 1, 2026 03:16
@myhMARS myhMARS self-assigned this Apr 1, 2026
@myhMARS myhMARS added bug Something isn't working enhancement New feature or request labels Apr 1, 2026
@sourcery-ai
Copy link
Copy Markdown
Contributor

sourcery-ai Bot commented Apr 1, 2026

Reviewer's Guide

重构 RedisFairLock,使其使用 Redis 有序集合(sorted set)和单个获取锁的 Lua 脚本(带“僵尸”条目清理)来管理队列,改进续期与释放行为,并加固 Celery 写入任务循环的生命周期与结果元数据。

使用 ZSET 和 ACQUIRE_SCRIPT 的 RedisFairLock.acquire 时序图

sequenceDiagram
    participant Client
    participant RedisFairLock
    participant RedisServer as Redis

    Client->>RedisFairLock: acquire()
    activate RedisFairLock
    RedisFairLock->>RedisServer: EVAL ACQUIRE_SCRIPT(queue_key, key, value, expire, timeout)
    activate RedisServer
    RedisServer-->>RedisFairLock: 1 (lock acquired) or 0 (not acquired)
    deactivate RedisServer
    alt ok == 1
        RedisFairLock->>RedisFairLock: _locked = True
        alt auto_renewal is True
            RedisFairLock->>RedisFairLock: _start_renewal()
        end
        RedisFairLock-->>Client: True
    else ok == 0 and not timed out
        RedisFairLock->>RedisFairLock: sleep(retry_interval)
        RedisFairLock->>RedisServer: EVAL ACQUIRE_SCRIPT(...)
        RedisServer-->>RedisFairLock: 1 or 0
        RedisFairLock->>Client: (loop continues)
    else timeout reached
        RedisFairLock->>RedisServer: ZREM queue_key value
        RedisFairLock-->>Client: False
    end
    deactivate RedisFairLock
Loading

Celery 写入任务循环生命周期与 RedisFairLock 使用的时序图

sequenceDiagram
    actor Worker
    participant Task as MemoryWriteTask
    participant RedisFairLock
    participant EventLoop as AsyncioLoop

    Worker->>Task: execute()
    activate Task
    Task->>RedisFairLock: create instance(key, redis_client,...)
    Task->>Task: task_start_time = time()
    Task->>EventLoop: set_asyncio_event_loop()
    activate EventLoop
    Task->>EventLoop: run_until_complete(_run())
    EventLoop-->>Task: result
    deactivate EventLoop
    alt redis_client is not None
        Task->>RedisFairLock: acquire() (context manager entry)
        RedisFairLock-->>Task: lock acquired
        Task->>Task: perform work
        Task->>RedisFairLock: release() (context manager exit)
    end
    Task->>Task: compute elapsed_time
    Task-->>Worker: status, result, start_at, end_user_id, config_id, elapsed_time
    alt loop is not None
        Task->>EventLoop: _shutdown_loop_gracefully(loop)
    end
    deactivate Task
Loading

重构后 RedisFairLock 的类图

classDiagram
    class RedisFairLock {
        +int CLEANUP_BUFFER
        -str key
        -str queue_key
        -str value
        -int expire
        -float retry_interval
        -float timeout
        -bool auto_renewal
        -redis.StrictRedis redis
        -bool _locked
        -threading.Event _stop_renew
        -threading.Thread _renew_thread
        +RedisFairLock(key, redis_client, expire, retry_interval, timeout, auto_renewal)
        +bool acquire()
        -void _renewal_loop()
        -void _start_renewal()
        -void _stop_renewal_loop()
        +void release()
        +RedisFairLock __enter__()
        +void __exit__(exc_type, exc_value, traceback)
    }
Loading

Redis ACQUIRE_SCRIPT 获取锁与僵尸条目清理的流程图

flowchart TD
    A[Start ACQUIRE_SCRIPT] --> B[Read queue_key and lock_key]
    B --> C[Set local client_id, expire, time_out]
    C --> D[Get now from redis.call time]
    D --> E{ZSET contains client_id?}
    E -- No --> F[ZADD queue_key now client_id]
    E -- Yes --> G[Skip enqueue]
    F --> H
    G --> H
    H[Find expired entries with ZRANGEBYSCORE queue_key 0 now - time_out] --> I[For each expired value, ZREM queue_key value]
    I --> J[Get first client in ZSET with ZRANGE queue_key 0 0]
    J --> K{first == client_id?}
    K -- No --> L[Return 0]
    K -- Yes --> M{SET lock_key client_id NX EX expire success?}
    M -- Yes --> N[ZREM queue_key client_id and return 1]
    M -- No --> O{GET lock_key == client_id?}
    O -- Yes --> P[EXPIRE lock_key expire and return 1]
    O -- No --> Q[Return 0]
    L --> R[End]
    N --> R
    P --> R
    Q --> R
Loading

文件级变更

Change Details Files
将 RedisFairLock 队列从基于列表的操作重构为基于 ZSET 加单个 Lua 获取脚本的实现,并内置“僵尸”条目清理。
  • 引入 ACQUIRE_SCRIPT Lua 脚本:在有序集合中按时间戳入队客户端 ID,移除过期条目,并在调用方位于队首时有条件地设置/续期锁。
  • 将队列键名从基于列表的 ':queue' 键改为基于 ZSET 的 ':zset' 键,并更新客户端值格式以嵌入时间戳。
  • 用基于 ACQUIRE_SCRIPT 的 eval 获取逻辑和超时时的 zrem 操作,替换原先的列表操作(rpush/lindex/lpop/lrem)。
api/app/utils/redis_lock.py
收紧锁续期和释放语义,并移除已不再使用的队列管理脚本。
  • 更新续期循环:捕获 RENEW_SCRIPT 的返回结果,如果续期失败则停止循环,防止持续执行陈旧的续期尝试。
  • 移除 CLEANUP_DEAD_HEAD_SCRIPT 和 SAFE_RELEASE_QUEUE_SCRIPT 以及 release() 中相应的调用,仅依赖 UNLOCK_SCRIPT 删除键来释放锁。
  • 调整 retry_interval 的默认值以降低 acquire 轮询频率,并新增一个用于超时计算的“僵尸清理缓冲”常量。
api/app/utils/redis_lock.py
加固 Celery 写入任务循环的生命周期,并丰富任务结果元数据。
  • 将 loop 初始化为 None,并在调用 _shutdown_loop_gracefully 前进行 truthy 检查,避免在提前返回或出错时使用未初始化的事件循环。
  • 在创建事件循环之前记录 task_start_time,并作为 'start_at' 字段包含在返回的结果负载中。
  • 确保 import 顺序一致,以支持在任务函数中新增的 time 使用。
api/app/tasks.py
api/app/utils/redis_lock.py

Tips and commands

与 Sourcery 交互

  • 触发新评审: 在 pull request 中评论 @sourcery-ai review
  • 继续讨论: 直接回复 Sourcery 的评审评论。
  • 从评审评论生成 GitHub Issue: 在评审评论下回复,要求 Sourcery 从该评论创建 issue。你也可以直接回复 @sourcery-ai issue,从该评论生成 issue。
  • 生成 pull request 标题: 在 pull request 标题中的任意位置写上 @sourcery-ai 即可随时生成标题。也可以在 pull request 中评论 @sourcery-ai title 来(重新)生成标题。
  • 生成 pull request 摘要: 在 pull request 正文任意位置写上 @sourcery-ai summary,即可在对应位置生成 PR 摘要。也可以在 pull request 中评论 @sourcery-ai summary 来(重新)生成摘要。
  • 生成评审指南: 在 pull request 中评论 @sourcery-ai guide,即可(重新)生成评审者指南。
  • 标记所有 Sourcery 评论为已解决: 在 pull request 中评论 @sourcery-ai resolve,将所有 Sourcery 评论标记为已解决。在你已经处理了所有评论且不希望再看到它们时非常有用。
  • 撤销所有 Sourcery 评审: 在 pull request 中评论 @sourcery-ai dismiss,撤销所有现有的 Sourcery 评审。特别适合在你想从头开始一次新评审时使用——不要忘记再评论 @sourcery-ai review 来触发新的评审!

自定义你的体验

访问你的 dashboard 以:

  • 开启或关闭评审特性,例如 Sourcery 生成的 pull request 摘要、评审者指南等。
  • 更改评审语言。
  • 添加、移除或编辑自定义评审指令。
  • 调整其他评审设置。

获取帮助

Original review guide in English

Reviewer's Guide

Refactors RedisFairLock to manage its queue using a Redis sorted set and a single acquire Lua script (with zombie cleanup), improves renewal and release behavior, and hardens the Celery write task loop lifecycle and result metadata.

Sequence diagram for RedisFairLock.acquire using ZSET and ACQUIRE_SCRIPT

sequenceDiagram
    participant Client
    participant RedisFairLock
    participant RedisServer as Redis

    Client->>RedisFairLock: acquire()
    activate RedisFairLock
    RedisFairLock->>RedisServer: EVAL ACQUIRE_SCRIPT(queue_key, key, value, expire, timeout)
    activate RedisServer
    RedisServer-->>RedisFairLock: 1 (lock acquired) or 0 (not acquired)
    deactivate RedisServer
    alt ok == 1
        RedisFairLock->>RedisFairLock: _locked = True
        alt auto_renewal is True
            RedisFairLock->>RedisFairLock: _start_renewal()
        end
        RedisFairLock-->>Client: True
    else ok == 0 and not timed out
        RedisFairLock->>RedisFairLock: sleep(retry_interval)
        RedisFairLock->>RedisServer: EVAL ACQUIRE_SCRIPT(...)
        RedisServer-->>RedisFairLock: 1 or 0
        RedisFairLock->>Client: (loop continues)
    else timeout reached
        RedisFairLock->>RedisServer: ZREM queue_key value
        RedisFairLock-->>Client: False
    end
    deactivate RedisFairLock
Loading

Sequence diagram for Celery write task loop lifecycle and RedisFairLock usage

sequenceDiagram
    actor Worker
    participant Task as MemoryWriteTask
    participant RedisFairLock
    participant EventLoop as AsyncioLoop

    Worker->>Task: execute()
    activate Task
    Task->>RedisFairLock: create instance(key, redis_client,...)
    Task->>Task: task_start_time = time()
    Task->>EventLoop: set_asyncio_event_loop()
    activate EventLoop
    Task->>EventLoop: run_until_complete(_run())
    EventLoop-->>Task: result
    deactivate EventLoop
    alt redis_client is not None
        Task->>RedisFairLock: acquire() (context manager entry)
        RedisFairLock-->>Task: lock acquired
        Task->>Task: perform work
        Task->>RedisFairLock: release() (context manager exit)
    end
    Task->>Task: compute elapsed_time
    Task-->>Worker: status, result, start_at, end_user_id, config_id, elapsed_time
    alt loop is not None
        Task->>EventLoop: _shutdown_loop_gracefully(loop)
    end
    deactivate Task
Loading

Class diagram for refactored RedisFairLock

classDiagram
    class RedisFairLock {
        +int CLEANUP_BUFFER
        -str key
        -str queue_key
        -str value
        -int expire
        -float retry_interval
        -float timeout
        -bool auto_renewal
        -redis.StrictRedis redis
        -bool _locked
        -threading.Event _stop_renew
        -threading.Thread _renew_thread
        +RedisFairLock(key, redis_client, expire, retry_interval, timeout, auto_renewal)
        +bool acquire()
        -void _renewal_loop()
        -void _start_renewal()
        -void _stop_renewal_loop()
        +void release()
        +RedisFairLock __enter__()
        +void __exit__(exc_type, exc_value, traceback)
    }
Loading

Flow diagram for Redis ACQUIRE_SCRIPT lock acquisition and zombie cleanup

flowchart TD
    A[Start ACQUIRE_SCRIPT] --> B[Read queue_key and lock_key]
    B --> C[Set local client_id, expire, time_out]
    C --> D[Get now from redis.call time]
    D --> E{ZSET contains client_id?}
    E -- No --> F[ZADD queue_key now client_id]
    E -- Yes --> G[Skip enqueue]
    F --> H
    G --> H
    H[Find expired entries with ZRANGEBYSCORE queue_key 0 now - time_out] --> I[For each expired value, ZREM queue_key value]
    I --> J[Get first client in ZSET with ZRANGE queue_key 0 0]
    J --> K{first == client_id?}
    K -- No --> L[Return 0]
    K -- Yes --> M{SET lock_key client_id NX EX expire success?}
    M -- Yes --> N[ZREM queue_key client_id and return 1]
    M -- No --> O{GET lock_key == client_id?}
    O -- Yes --> P[EXPIRE lock_key expire and return 1]
    O -- No --> Q[Return 0]
    L --> R[End]
    N --> R
    P --> R
    Q --> R
Loading

File-Level Changes

Change Details Files
Refactor RedisFairLock queue from list-based operations to a ZSET plus a single Lua acquire script with built-in zombie entry cleanup.
  • Introduce ACQUIRE_SCRIPT Lua script that enqueues client IDs in a sorted set with timestamps, removes expired entries, and conditionally sets/renews the lock if the caller is at the head of the queue.
  • Change queue key naming from a list-based ':queue' key to a ZSET-based ':zset' key and update client value format to embed a timestamp.
  • Replace list operations (rpush/lindex/lpop/lrem) with eval-based acquisition using ACQUIRE_SCRIPT and zrem on timeout.
api/app/utils/redis_lock.py
Tighten lock renewal and release semantics and remove now-unused queue management scripts.
  • Update renewal loop to capture the result of RENEW_SCRIPT and stop the loop if renewal fails, preventing stale renew attempts.
  • Remove CLEANUP_DEAD_HEAD_SCRIPT and SAFE_RELEASE_QUEUE_SCRIPT and corresponding calls from release(), relying solely on key deletion via UNLOCK_SCRIPT.
  • Adjust retry_interval default value to slow down acquire polling and add a zombie cleanup buffer constant used in timeout calculations.
api/app/utils/redis_lock.py
Harden Celery write task loop lifecycle and enrich task result metadata.
  • Initialize loop to None and guard _shutdown_loop_gracefully with a truthy loop check to avoid using an uninitialized event loop on early returns or errors.
  • Record task_start_time before creating the event loop and include it in the returned result payload as 'start_at'.
  • Ensure imports are ordered consistently to support the new time usage in the task function.
api/app/tasks.py
api/app/utils/redis_lock.py

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - 我已经审查了你的更改,一切看起来都很棒!


Sourcery 对开源项目是免费的——如果你觉得我们的评审有帮助,请考虑分享它们 ✨
帮我变得更有用!请在每条评论上点👍或👎,我会根据你的反馈来改进后续的评审。
Original comment in English

Hey - I've reviewed your changes and they look great!


Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@keeees keeees merged commit 68fdf5d into develop Apr 1, 2026
1 check passed
@myhMARS myhMARS deleted the refactor/redis-lock branch April 1, 2026 10:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something isn't working enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants