Skip to content

Conversation

@Asher-XunZhang
Copy link

@Asher-XunZhang Asher-XunZhang commented Dec 22, 2025

Description

motivation: [RFC]: Mooncake-Conductor: Design and Implementation of a Global Scheduler Module for KV-Cache-Centric Disaggregated Architecture #977

Add a kv event publish system: it consists of events and an asynchronous sender, a kv event publisher based on ZMQ. Co-work with @Liziqi-77 @yejj710

Type of Change

  • Types
    • Bug fix
    • New feature
      • Transfer Engine
      • Mooncake Store
      • Mooncake EP
      • Integration
      • P2P Store
      • Python Wheel
    • Breaking change
    • CI/CD
    • Documentation update
    • Other

How Has This Been Tested?

I've added 2 new UTs: kv_event_publisher_test and thread_safe_queue_test

Checklist

  • I have performed a self-review of my own code.
  • I have updated the documentation.
  • I have added tests to prove my changes are effective.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @Asher-XunZhang, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a new, asynchronous KV event publishing system to the Mooncake Store, leveraging ZeroMQ for efficient and scalable event dissemination. The system is designed to provide real-time insights into key-value cache operations, such as block storage, updates, and removals, by publishing structured event data. It includes new data models for event information, a robust ZeroMQ-based publisher with configurable operational parameters, and integrates these capabilities deeply into the core client and master services. This enhancement aims to improve observability and enable external systems to react to changes within the Mooncake Store.

Highlights

  • New KV Event Publishing System: Introduced a comprehensive KV event publishing system for Mooncake Store, enabling asynchronous publication of key-value cache events via ZeroMQ.
  • ZeroMQ and MessagePack Integration: Integrated cppzmq and msgpack-cxx libraries to facilitate efficient and structured event communication, including their installation from source in the dependencies.sh script.
  • Event Data Structures: Defined new C++ data structures, including KVCacheEvent (base class), StoreEventInfo (detailed block metadata), BlockStoreEvent, BlockUpdateEvent, RemoveAllEvent, and EventBatch for event serialization and handling.
  • Asynchronous Event Publisher: Implemented ZmqEventPublisher for asynchronous event handling, featuring a thread-safe queue, batch processing, configurable parameters (e.g., endpoint, batch size, send interval), and a replay mechanism.
  • Core Service Integration: Modified Client, MasterClient, MasterService, PyClient, RealClient, and WrappedMasterService to incorporate event publishing logic, allowing events to be published during key operations like PutEnd, Remove, and BatchEvict.
  • Configuration and Metrics: Added extensive configuration options for the KV event publisher within MasterConfig and related builder classes, along with new metrics for monitoring publisher activity and performance.
  • New Unit Tests: Included new unit tests (kv_event_publisher_test and thread_safe_queue_test) to ensure the reliability and performance of the new event publishing system and its underlying thread-safe queue.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@Asher-XunZhang Asher-XunZhang changed the title [WIP] Update KV event publisher [WIP] Update KV event publish system Dec 22, 2025
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a comprehensive KV event publishing system using ZMQ and MessagePack. The changes are extensive, adding new dependencies, configurations, C++ classes for events and the publisher, and integrating event publishing into the master service logic. The implementation includes a thread-safe queue, a thread pool for async operations, and a robust ZMQ publisher with features like batching and a replay buffer. The code is generally well-structured. However, I've identified some areas for improvement regarding security in the dependency installation script, maintainability in serialization code, and consistency in code comments. I've also found a minor memory leak in the new tests.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a KV event publishing system for Mooncake Store, implementing an asynchronous event publisher based on ZeroMQ with comprehensive batch processing capabilities. The system enables publishing of block store, block update, and remove events through a thread-safe queue mechanism.

Key Changes:

  • Implemented ZmqEventPublisher with async event publishing and batch processing
  • Added ThreadSafeQueue for MPSC (Multiple Producer Single Consumer) scenarios with timeout support
  • Integrated event publishing into master service operations (PutEnd, Remove, RemoveAll, eviction)

Reviewed changes

Copilot reviewed 28 out of 28 changed files in this pull request and generated 14 comments.

Show a summary per file
File Description
mooncake-store/tests/thread_safe_queue_test.cpp Comprehensive unit tests for ThreadSafeQueue with 1223 lines covering basic operations, timeouts, batch operations, MPSC scenarios, and stress tests
mooncake-store/tests/kv_event_publisher_test.cpp Unit tests for ZmqEventPublisher covering serialization, concurrent publishing, graceful shutdown, and performance
mooncake-store/tests/CMakeLists.txt Added test targets for thread_safe_queue_test and kv_event_publisher_test with ZMQ/msgpack dependencies
mooncake-store/src/kv_event_publisher.cpp Core implementation of ZmqEventPublisher with 740 lines including smart port binding, batch sending, and replay functionality
mooncake-store/src/master_service.cpp Integrated event publisher into master service lifecycle and key operations (PutEnd, Remove, eviction)
mooncake-store/src/rpc_service.cpp Updated RPC wrappers to propagate StoreEventInfo through PutEnd and BatchPutEnd calls
mooncake-store/src/real_client.cpp Added StoreEventInfo parameter to batch_put_from_multi_buffers methods
mooncake-store/src/master.cpp Added command-line flags and configuration loading for KV event publisher (lines 104-650)
mooncake-store/src/client_service.cpp Updated BatchPut flow to create and propagate key-event info mappings
mooncake-store/src/CMakeLists.txt Added cppzmq and msgpack-cxx dependencies to mooncake_store library
mooncake-store/include/thread_safe_queue.h Header-only implementation of ThreadSafeQueue with blocking/non-blocking operations and batch pop support
mooncake-store/include/thread_pool.h Added header guards to thread pool header
mooncake-store/include/kv_event_publisher.h Main header aggregating event, config, and publisher headers
mooncake-store/include/kv_event/kv_event_zmq_publisher.h ZmqEventPublisher class declaration with stats, async publishing, and resource management
mooncake-store/include/kv_event/kv_event_publisher_config.h Configuration structure for event publisher with network, performance, and batching parameters
mooncake-store/include/kv_event/kv_event.hpp Event type definitions (BlockStoreEvent, BlockUpdateEvent, RemoveAllEvent) and EventBatch with msgpack serialization
mooncake-store/include/master_service.h Added publisher member, GetPublisherStats method, and GetReplicasDescriptorList helper
mooncake-store/include/master_config.h Added enable_kv_event_publish and kv_event_publisher_config to all config classes
mooncake-store/include/master_client.h Updated PutEnd and BatchPutEnd signatures to accept StoreEventInfo
mooncake-store/include/rpc_service.h Updated RPC method signatures with StoreEventInfo parameters
mooncake-store/include/pyclient.h Added StoreEventInfo parameter to batch_put_from_multi_buffers interface
mooncake-store/include/real_client.h Updated batch_put_from_multi_buffers signatures
mooncake-store/include/dummy_client.h Updated batch_put_from_multi_buffers signature for consistency
mooncake-store/include/client_service.h Added CreateKeyEventInfosMap and updated BatchPut/FinalizeBatchPut signatures
mooncake-integration/store/store_py.cpp Added Python binding for StoreEventInfo class
dependencies.sh Added libzmq3-dev system package and installation scripts for cppzmq and msgpack-cxx

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@Asher-XunZhang Asher-XunZhang changed the title [WIP] Update KV event publish system [WIP][Store][Feature] Update KV event publish system Dec 23, 2025
@Asher-XunZhang Asher-XunZhang force-pushed the zmq_event_publisher branch 2 times, most recently from b5a750f to 5b4f29c Compare December 24, 2025 09:02
@Asher-XunZhang Asher-XunZhang force-pushed the zmq_event_publisher branch 2 times, most recently from ef997de to f241a70 Compare December 25, 2025 12:06
@Asher-XunZhang Asher-XunZhang changed the title [WIP][Store][Feature] Update KV event publish system [Store][Feature] Update KV event publish system Dec 26, 2025
@stmatengss
Copy link
Collaborator

@Asher-XunZhang, Could you split this PR into smaller PRs? I will assign reviewers for expedited review.

@Asher-XunZhang
Copy link
Author

@Asher-XunZhang, Could you split this PR into smaller PRs? I will assign reviewers for expedited review.

@stmatengss No problem, I have split this PR. This is the first PR for the interface declaration: (#1306). The PRs for other implementations and related dependencies need to wait until this PR is merged; otherwise, it will cause CI compilation to fail.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants