diff --git a/.cursorrules b/.cursorrules deleted file mode 100644 index a26d216..0000000 --- a/.cursorrules +++ /dev/null @@ -1,26 +0,0 @@ -Always respond in 中文 -不要回答重复的内容(如我提问中的代码) - -此项目名为deepx -项目路径为/home/lipeng/code/ai/deepx -项目分为3部分 -1. 前端。python库的接口风格参考pytorch,其他语言如go,java,c,rust等,后续设计完善。 -2. 调度器,待设计 -3. 执行器,使用c++,cuda,metal,omp simd等,实现不同executor的算子的前向和反向 - -关于概念 -deepx.Tensor仅仅就是一个tensor,不像pytorch的tensor,一个tensor其实包含了自身和梯度2个tensor的数据 - -关于任何编程语言 -注重设计函数时,通过多级的子函数,实现层级模块化分解 - -关于c++ -我的环境为ubuntu22,项目是c++17,使用cmake编译, -返回c++代码区分header和source文件 -由于作者是c++新手,请仔细检查指针和引用,对deepx这种密集计算任务,不要使用智能指针,但注意内存泄漏,函数返回对象等 - -关于python -贴近pytorch的接口风格,不要增加任何注释,我会手动添加注释 - -关于doc目录 -采用Sphinx构建,使用reStructuredText格式 \ No newline at end of file diff --git a/.github/workflows/executor-memcuda.yml b/.github/workflows/executor-memcuda.yml new file mode 100644 index 0000000..4dab250 --- /dev/null +++ b/.github/workflows/executor-memcuda.yml @@ -0,0 +1,80 @@ +name: op/cuda-linux Build +on: + push: + paths: + - 'executor/mem-cuda/**' + pull_request: + paths: + - 'executor/mem-cuda/**' +env: + CUDA_VERSION: "12.9.1" + CUDA_MAJOR_VERSION: "12" + CUDNN_VERSION: "8.9.7.29" + CUTLASS_VERSION: "3.4.1" + +jobs: + build: + strategy: + matrix: + os: [ubuntu-22.04] + backend: [cuda] + runs-on: ${{ matrix.os }} + + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 + + # 使用NVIDIA官方Docker镜像,避免依赖安装问题 + - name: Set up Docker + uses: docker/setup-buildx-action@v2 + + # 使用预装CUDA和工具的NVIDIA容器 + - name: Build and Test in Docker Container + run: | + docker run --rm -v ${{ github.workspace }}:/workspace \ + -w /workspace \ + nvidia/cuda:${{ CUDA_VERSION }}-devel-ubuntu24.04 \ + /bin/bash -c " + # 安装系统依赖 + apt-get update && \ + DEBIAN_FRONTEND=noninteractive apt-get install -y \ + build-essential \ + cmake \ + libyaml-cpp-dev \ + libgtest-dev \ + clang \ + git \ + ninja-build \ + ccache \ + wget \ + && \ + + # 安装 cuDNN - 使用更新的下载链接格式 + wget https://developer.download.nvidia.com/compute/cudnn/redist/cudnn/linux-x86_64/cudnn-linux-x86_64-${CUDNN_VERSION}_cuda${CUDA_MAJOR_VERSION}-archive.tar.xz && \ + tar -xf cudnn-linux-x86_64-${CUDNN_VERSION}_cuda${CUDA_MAJOR_VERSION}-archive.tar.xz -C /usr/local && \ + + # 安装 CUTLASS - 仅安装头文件而不是共享库 + git clone --depth 1 --branch v${CUTLASS_VERSION} https://github.com/NVIDIA/cutlass.git && \ + cd cutlass && \ + # 直接复制头文件到系统目录 + cp -r include/* /usr/local/include/ && \ + cd /workspace && \ + + # 构建 common 库 + cd executor/cpp-common && \ + mkdir -p build && cd build && \ + cmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_CXX_COMPILER_LAUNCHER=ccache -GNinja .. && \ + ninja && \ + + # 构建 CUDA 执行器 + cd ../../mem-cuda && \ + mkdir -p build && cd build && \ + cmake -DCMAKE_BUILD_TYPE=Release \ + -DCMAKE_CXX_COMPILER_LAUNCHER=ccache \ + -DCUDA_TOOLKIT_ROOT_DIR=/usr/local/cuda \ + -DCMAKE_CUDA_ARCHITECTURES=\"60;70;75;80;86\" \ + -DCUTLASS_DIR=/usr/local \ + -GNinja .. && \ + ninja + " \ No newline at end of file diff --git a/.github/workflows/executor-cuda-linux.yml b/.github/workflows/executor-op-cuda-linux.yml similarity index 100% rename from .github/workflows/executor-cuda-linux.yml rename to .github/workflows/executor-op-cuda-linux.yml diff --git a/.github/workflows/executor-ompsimd-linux.yml b/.github/workflows/executor-op-ompsimd-linux.yml similarity index 100% rename from .github/workflows/executor-ompsimd-linux.yml rename to .github/workflows/executor-op-ompsimd-linux.yml diff --git a/Agents.md b/Agents.md new file mode 100644 index 0000000..ef5709a --- /dev/null +++ b/Agents.md @@ -0,0 +1,20 @@ +## agent 规则 ++ This is the only AGENTS.md, there are no recursive AGENTS.md ++ When you are working on a bug, first create a standalone file that reproduces the bug and verify it fails in the expected way. Use this to test if your changes work. Once the change is passing, find an appropriate test file to add the test to and make sure to follow local conventions on the test file. ++ Always respond in 中文,不要回答重复的内容(如我提问中的代码) + +## deepx的架构 + +项目分为3部分 +1. 前端。python库的接口风格参考pytorch +2. 编译,调度器,待设计 +3. 执行器,使用c++,cuda,metal,omp simd等,实现不同executor的算子 + +# 关于deepx的细节概念 ++ deepx.Tensor仅仅就是一个tensor,不像pytorch的tensor,一个tensor其实包含了自身和梯度2个tensor的数据 + + +贴近pytorch的接口风格,不要增加任何注释,我会手动添加注释 + +关于doc目录 +采用Sphinx构建,使用reStructuredText格式 \ No newline at end of file diff --git a/executor/mem-cuda/CMakeLists.txt b/executor/mem-cuda/CMakeLists.txt new file mode 100644 index 0000000..6cdd76c --- /dev/null +++ b/executor/mem-cuda/CMakeLists.txt @@ -0,0 +1,16 @@ +cmake_minimum_required(VERSION 3.18) +project(mem-cuda LANGUAGES CXX) + +find_package(Threads REQUIRED) +find_package(CUDAToolkit REQUIRED) + +# Require redis++ (C++ client) and its hiredis dependency +find_path(REDISPP_INCLUDE redis++/redis++.h) +find_library(HIREDIS_LIB hiredis) + +add_subdirectory(src) +add_subdirectory(tools) +add_subdirectory(test) + +message(STATUS "mem-cuda: CUDA toolkit at ${CUDAToolkit_ROOT}") +message(STATUS "mem-cuda: redis++ include: ${REDISPP_INCLUDE}, hiredis lib: ${HIREDIS_LIB}") diff --git a/executor/mem-cuda/README.md b/executor/mem-cuda/README.md index 2f7fa35..e5a8871 100644 --- a/executor/mem-cuda/README.md +++ b/executor/mem-cuda/README.md @@ -3,89 +3,93 @@ 本目录用于设计/实现单机多进程的 GPU Tensor 统一存储面(CUDA IPC),并通过 Redis 做 name → IPC handle 的集中注册与控制。 ## 目标 -- 单机内多进程共享**可命名**Tensor(同名即同一块 GPU 内存)。 -- 通过 Redis 维护 name、shape、dtype、device、IPC handle 等元信息。 -- 通过 Redis List 接收创建/获取/删除指令,实现统一的控制面。 +- 管理单机内多进程共享的堆Tensor + + 堆tensor不会随着计算完成而被回收 + + 而栈tensor则是计算过程的中间变量,可以被立即回收 +- 通过 Redis 维护 name、shape、dtype、device、IPC handle 等元信息,供mem-cuda进程、计算进程访问。 +- 通过 Redis List 接收创建/获取/删除指令,实现统一的控制面。并提供redis mem api以及example ## 设计概述 ### 1) Redis 元数据(KV/Hash) 对每个 Tensor 名称建立一个 Hash: -- `name`: string - `dtype`: string(如 f32/i8 等) - `shape`: string/json(如 "[2,3,4]") +- `ctime`: int64 +- `node`: string(owner 节点/主机标识) - `device`: int(GPU id) - `bytes`: int -- `ipc_handle`: binary/base64 -- `owner_pid`: int +- `ipc_handle`: binary - `refcount`: int -- `ctime/mtime`: int64 +无需设计tensor owner ### 2) Redis 指令队列(List) -用 list 作为控制通道(生产者/消费者): -- list key: `tensor:cmd` -- 指令格式建议为 JSON: - ```json - {"op":"create|get|delete", "name":"X", "dtype":"f32", "shape":[2,3], "device":0} - ``` -- 处理流程: - - **create**: 分配 GPU 内存 → `cudaIpcGetMemHandle` → 写入 Hash - - **get**: 读取 Hash → `cudaIpcOpenMemHandle` - - **delete**: `refcount--`,为 0 时释放 GPU 内存并删除 Hash +控制通道 list key: `tensor_lifecycle`。 +指令 JSON: +```json +{"op":"create|get|delete", "name":"X", "dtype":"f32", "shape":[2,3], "device":0, "pid":123, "node":"n1"} +``` +处理流程: +- **create**: 分配 GPU 内存 → `cudaIpcGetMemHandle` → 写入 Hash(state=ready, refcount=1) +- **get**: 读取 Hash → `cudaIpcOpenMemHandle` → refcount++ +- **delete**: refcount--,为 0 时释放 GPU 内存并删除 Hash ### 3) CUDA IPC 基本流程 -- `cudaIpcGetMemHandle`:将 `cudaMalloc` 的指针导出为 handle +- `cudaIpcGetMemHandle`:将 `cudaMalloc` 指针导出为 handle - `cudaIpcOpenMemHandle`:其他进程映射同一块 GPU 内存 -- 仅限**同机**;需保证 device id 一致 -- 跨 stream 写读,需要显式同步(事件/流同步策略) +- 仅限同机;需保证 device id 一致 +- 跨 stream 写读需要显式同步(事件/流同步策略) ## 显存池方案 -你的需求是:**参考 PyTorch 的显存池管理**。这里给出两种落地路线: -### 方案 A:接入成熟开源显存池(推荐) -可选项目: - **RMM (RAPIDS Memory Manager)** - 优点:成熟、支持 pool/async allocator、统计完善 - 适合:对稳定性与可观察性要求高的生产环境 -- **CNMeM (NVIDIA)** - - 优点:轻量、易集成 - - 适合:需要最小依赖的场景 -- **CUB caching allocator** - - 优点:性能好、实现简单 - - 适合:希望直接嵌入 CUDA 代码路径 - -> 选择建议:优先 RMM;想保持最小依赖可用 CNMeM 或 CUB。 - -### 方案 B:自研简化版显存池(AI 方案) -如果不引入外部依赖,可先实现一个简化版池: -- 维护按 size 分桶的 free-list(如 1MB、2MB、4MB…) -- 分配时优先复用空闲块,不足时 `cudaMalloc` 新块 -- 回收时挂回 free-list,不立刻 `cudaFree` -- 支持 `recordStream` / `event` 延迟回收,避免跨流释放风险 -**建议先实现 MVP**: -1) 单 GPU -2) 只支持 `create/get/delete` -3) dtype 限定 f32 -4) 单进程先跑通,再放开多进程 + IPC +其他op计算进程可以使用CUB。 -## 安全与一致性 -- Redis 写入与 refcount 需要原子操作(Lua 脚本/事务) -- 崩溃恢复:定期清理 owner_pid 不存在的条目 -- IPC handle 需与 device id 配对,否则会映射失败 - -## 目录建议 +## 目录结构(具体方案) ``` mem-cuda/ README.md doc/ src/ - registry/ # Redis 元数据与命令处理 - allocator/ # 显存池实现或适配层 - ipc/ # cudaIpcGet/Open 封装 + ipc/ # CUDA IPC 封装 + ipc.h + ipc.cpp + ipc_guard.h # 设备一致性与错误处理 + runtime/ # 运行时控制(指令/同步) + lifecycle.h + lifecycle.cpp + sync.h + sync.cpp + common/ + status/json/logging + test/ + ipc_demo.cpp + lifecycle_demo.cpp ``` -## 后续工作清单 -- [ ] 选定显存池方案(RMM / CNMeM / CUB / 自研) -- [ ] 定义 Redis 数据结构与命令协议 -- [ ] 编写 IPC 封装与单机多进程 demo -- [ ] 建立错误恢复与 GC 机制 +模块职责: +- `ipc/`: CUDA IPC handle 导出/打开/关闭封装。 +- `runtime/`: 指令消费/路由与跨 stream 同步策略。 +- `common/`: 状态码、JSON 解析、日志等公共工具聚合。 + +## 后续工作清单(分阶段) +- [ ] 阶段 0:确定目录与接口(完成本 README 细化) +- [ ] 阶段 1:实现 `lifecycle/` +- [ ] 阶段 2:实现 `ipc/` 的最小实现(f32, 单 GPU) +- [ ] 阶段 3:补齐 `sync/` 策略与崩溃恢复/GC + +## 构建依赖与示例 + +- 必要系统依赖:CUDA Toolkit (兼容 CMake `CUDAToolkit`), `cmake` >= 3.18, `make`。 +- Redis C++ 客户端:必须安装 `redis++`(redis-plus-plus) +- RMM 库 + +示例构建命令(在 `executor/mem-cuda` 目录下): + +```bash +mkdir -p build && cd build +cmake .. -DCMAKE_BUILD_TYPE=Release +make -j$(nproc) +``` diff --git a/executor/mem-cuda/src/CMakeLists.txt b/executor/mem-cuda/src/CMakeLists.txt new file mode 100644 index 0000000..2763618 --- /dev/null +++ b/executor/mem-cuda/src/CMakeLists.txt @@ -0,0 +1,28 @@ +cmake_minimum_required(VERSION 3.18) + +# src/CMakeLists.txt - memcuda core (clean) + +set(SRCS + registry/redis_client.cpp + registry/registry.cpp + ipc/ipc.cpp + allocator/cuda_pool.cpp + runtime/lifecycle.cpp + runtime/sync.cpp + common/common.cpp +) + +add_library(memcuda STATIC ${SRCS}) +target_include_directories(memcuda PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}) +target_link_libraries(memcuda PRIVATE CUDA::cudart Threads::Threads) + +# Require redis++ (redis-plus-plus) and hiredis +find_path(REDISPP_INCLUDE redis++/redis++.h) +find_library(HIREDIS_LIB hiredis) + +message(STATUS "Found redis++ include: ${REDISPP_INCLUDE}") +message(STATUS "Found hiredis lib: ${HIREDIS_LIB}") + +target_include_directories(memcuda PRIVATE ${REDISPP_INCLUDE}) +target_link_libraries(memcuda PRIVATE ${HIREDIS_LIB}) +target_compile_definitions(memcuda PRIVATE USE_REDISPP=1) diff --git a/executor/mem-cuda/src/registry/CMakeLists.txt b/executor/mem-cuda/src/registry/CMakeLists.txt new file mode 100644 index 0000000..138c9ad --- /dev/null +++ b/executor/mem-cuda/src/registry/CMakeLists.txt @@ -0,0 +1,4 @@ +cmake_minimum_required(VERSION 3.18) + +# registry CMake - only used to install Lua scripts for runtime +install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/lua_scripts DESTINATION share/mem-cuda) diff --git a/executor/mem-cuda/src/registry/redis_client.cpp b/executor/mem-cuda/src/registry/redis_client.cpp new file mode 100644 index 0000000..acbc32a --- /dev/null +++ b/executor/mem-cuda/src/registry/redis_client.cpp @@ -0,0 +1,61 @@ +#include "redis_client.h" + +#include + +namespace memcuda { + +RedisClient::RedisClient(const std::string& uri) { + auto* r = new sw::redis::Redis(uri); + redis_impl_ = static_cast(r); +} + +RedisClient::~RedisClient() { + auto* r = static_cast(redis_impl_); + delete r; +} + +bool RedisClient::HSet(const std::string& key, const std::string& field, const std::string& value) { + auto* r = static_cast(redis_impl_); + return r->hset(key, field, value); +} + +bool RedisClient::HGet(const std::string& key, const std::string& field, std::string& out) const { + auto* r = static_cast(redis_impl_); + auto val = r->hget(key, field); + if (!val) { + return false; + } + out = *val; + return true; +} + +std::unordered_map RedisClient::HGetAll(const std::string& key) const { + auto* r = static_cast(redis_impl_); + std::unordered_map res; + r->hgetall(key, std::inserter(res, res.begin())); + return res; +} + +long long RedisClient::LPush(const std::string& key, const std::string& value) { + auto* r = static_cast(redis_impl_); + return r->lpush(key, value); +} + +bool RedisClient::BRPop(const std::string& key, int timeout_seconds, std::string& out) const { + auto* r = static_cast(redis_impl_); + auto item = r->brpop(key, std::chrono::seconds(timeout_seconds)); + if (!item) { + return false; + } + out = item->second; + return true; +} + +std::string RedisClient::Eval(const std::string& script, + const std::vector& keys, + const std::vector& args) const { + auto* r = static_cast(redis_impl_); + return r->eval(script, keys.begin(), keys.end(), args.begin(), args.end()); +} + +} diff --git a/executor/mem-cuda/src/registry/redis_client.h b/executor/mem-cuda/src/registry/redis_client.h new file mode 100644 index 0000000..5b5f0b9 --- /dev/null +++ b/executor/mem-cuda/src/registry/redis_client.h @@ -0,0 +1,29 @@ +#pragma once + +#include +#include +#include + +namespace memcuda { + +class RedisClient { +public: + explicit RedisClient(const std::string& uri); + ~RedisClient(); + + bool HSet(const std::string& key, const std::string& field, const std::string& value); + bool HGet(const std::string& key, const std::string& field, std::string& out) const; + std::unordered_map HGetAll(const std::string& key) const; + + long long LPush(const std::string& key, const std::string& value); + bool BRPop(const std::string& key, int timeout_seconds, std::string& out) const; + + std::string Eval(const std::string& script, + const std::vector& keys, + const std::vector& args) const; + +private: + void* redis_impl_; +}; + +} diff --git a/executor/mem-cuda/src/registry/registry.cpp b/executor/mem-cuda/src/registry/registry.cpp new file mode 100644 index 0000000..0bd8115 --- /dev/null +++ b/executor/mem-cuda/src/registry/registry.cpp @@ -0,0 +1,88 @@ +#include "registry.h" + +#include +#include + +namespace memcuda { + +Registry::Registry(RedisClient* client, const std::string& lua_dir) + : client_(client), lua_dir_(lua_dir) {} + +std::string Registry::Key(const std::string& name) const { + return "tensor:" + name; +} + +std::string Registry::Script(const std::string& file) const { + std::ifstream in(lua_dir_ + "/" + file, std::ios::in | std::ios::binary); + std::ostringstream ss; + ss << in.rdbuf(); + return ss.str(); +} + +std::string Registry::CreateOrGet(const std::string& name, + const std::string& dtype, + const std::string& shape, + long long device, + long long bytes, + const std::string& node, + long long pid, + long long ctime, + const std::string& ipc_handle) { + std::vector keys{Key(name)}; + std::vector args{ + dtype, + shape, + std::to_string(device), + std::to_string(bytes), + node, + std::to_string(pid), + std::to_string(ctime), + ipc_handle + }; + return client_->Eval(Script("create_or_get.lua"), keys, args); +} + +long long Registry::RefInc(const std::string& name) { + std::vector keys{Key(name)}; + std::vector args; + auto res = client_->Eval(Script("ref_inc.lua"), keys, args); + return std::stoll(res); +} + +long long Registry::RefDec(const std::string& name) { + std::vector keys{Key(name)}; + std::vector args; + auto res = client_->Eval(Script("ref_dec.lua"), keys, args); + return std::stoll(res); +} + +long long Registry::GcSweep(const std::string& node) { + std::vector keys; + std::vector args{node}; + auto res = client_->Eval(Script("gc_sweep.lua"), keys, args); + return std::stoll(res); +} + +bool Registry::GetMeta(const std::string& name, TensorMeta& out) const { + auto map = client_->HGetAll(Key(name)); + if (map.empty()) { + return false; + } + out.dtype = map["dtype"]; + out.shape = map["shape"]; + out.node = map["node"]; + out.ipc_handle = map["ipc_handle"]; + out.state = map["state"]; + if (map.count("device")) out.device = std::stoll(map["device"]); + if (map.count("bytes")) out.bytes = std::stoll(map["bytes"]); + if (map.count("refcount")) out.refcount = std::stoll(map["refcount"]); + if (map.count("owner_pid")) out.owner_pid = std::stoll(map["owner_pid"]); + if (map.count("ctime")) out.ctime = std::stoll(map["ctime"]); + return true; +} + +RedisClient* Registry::Client() const { + return client_; +} + +} diff --git a/executor/mem-cuda/src/registry/registry.h b/executor/mem-cuda/src/registry/registry.h new file mode 100644 index 0000000..91f8013 --- /dev/null +++ b/executor/mem-cuda/src/registry/registry.h @@ -0,0 +1,53 @@ +#pragma once + +#include +#include +#include + +#include "redis_client.h" + +namespace memcuda { + +struct TensorMeta { + std::string dtype; + std::string shape; + std::string node; + std::string ipc_handle; + long long device = 0; + long long bytes = 0; + long long refcount = 0; + long long owner_pid = 0; + long long ctime = 0; + std::string state; +}; + +class Registry { +public: + Registry(RedisClient* client, const std::string& lua_dir); + + std::string CreateOrGet(const std::string& name, + const std::string& dtype, + const std::string& shape, + long long device, + long long bytes, + const std::string& node, + long long pid, + long long ctime, + const std::string& ipc_handle); + + long long RefInc(const std::string& name); + long long RefDec(const std::string& name); + long long GcSweep(const std::string& node); + + bool GetMeta(const std::string& name, TensorMeta& out) const; + RedisClient* Client() const; + +private: + std::string Script(const std::string& file) const; + std::string Key(const std::string& name) const; + + RedisClient* client_; + std::string lua_dir_; +}; + +} diff --git a/executor/mem-cuda/src/runtime/lifecycle.cpp b/executor/mem-cuda/src/runtime/lifecycle.cpp new file mode 100644 index 0000000..52d4c3f --- /dev/null +++ b/executor/mem-cuda/src/runtime/lifecycle.cpp @@ -0,0 +1,87 @@ +#include "lifecycle.h" + +#include + +namespace memcuda { + +static std::string ExtractString(const std::string& json, const std::string& key) { + auto pos = json.find("\"" + key + "\""); + if (pos == std::string::npos) return ""; + pos = json.find(':', pos); + if (pos == std::string::npos) return ""; + pos++; + while (pos < json.size() && std::isspace(static_cast(json[pos]))) pos++; + if (pos >= json.size()) return ""; + if (json[pos] == '"') { + pos++; + auto end = json.find('"', pos); + if (end == std::string::npos) return ""; + return json.substr(pos, end - pos); + } + auto end = pos; + while (end < json.size() && json[end] != ',' && json[end] != '}' && !std::isspace(static_cast(json[end]))) end++; + return json.substr(pos, end - pos); +} + +static long long ExtractInt(const std::string& json, const std::string& key) { + auto s = ExtractString(json, key); + if (s.empty()) return 0; + return std::stoll(s); +} + +LifecycleWorker::LifecycleWorker(Registry* registry, const std::string& queue_key) + : registry_(registry), queue_key_(queue_key), running_(false) {} + +bool LifecycleWorker::Parse(const std::string& json, LifecycleCommand& out) const { + out.op = ExtractString(json, "op"); + out.name = ExtractString(json, "name"); + out.dtype = ExtractString(json, "dtype"); + out.shape = ExtractString(json, "shape"); + out.node = ExtractString(json, "node"); + out.device = ExtractInt(json, "device"); + out.pid = ExtractInt(json, "pid"); + return !out.op.empty() && !out.name.empty(); +} + +void LifecycleWorker::Handle(const LifecycleCommand& cmd) { + if (cmd.op == "create") { + registry_->CreateOrGet(cmd.name, cmd.dtype, cmd.shape, cmd.device, 0, cmd.node, cmd.pid, 0, ""); + return; + } + if (cmd.op == "get") { + registry_->RefInc(cmd.name); + return; + } + if (cmd.op == "delete") { + registry_->RefDec(cmd.name); + return; + } +} + +void LifecycleWorker::RunOnce(int timeout_seconds) { + if (!registry_) return; + auto* client = registry_->Client(); + if (!client) return; + std::string msg; + if (!client->BRPop(queue_key_, timeout_seconds, msg)) { + return; + } + LifecycleCommand cmd; + if (!Parse(msg, cmd)) { + return; + } + Handle(cmd); +} + +void LifecycleWorker::RunLoop(int timeout_seconds) { + running_.store(true); + while (running_.load()) { + RunOnce(timeout_seconds); + } +} + +void LifecycleWorker::Stop() { + running_.store(false); +} + +} diff --git a/executor/mem-cuda/src/runtime/lifecycle.h b/executor/mem-cuda/src/runtime/lifecycle.h new file mode 100644 index 0000000..65a9c06 --- /dev/null +++ b/executor/mem-cuda/src/runtime/lifecycle.h @@ -0,0 +1,37 @@ +#pragma once + +#include +#include + +#include "../registry/registry.h" + +namespace memcuda { + +struct LifecycleCommand { + std::string op; + std::string name; + std::string dtype; + std::string shape; + std::string node; + long long device = 0; + long long pid = 0; +}; + +class LifecycleWorker { +public: + LifecycleWorker(Registry* registry, const std::string& queue_key); + + void RunOnce(int timeout_seconds); + void RunLoop(int timeout_seconds); + void Stop(); + +private: + bool Parse(const std::string& json, LifecycleCommand& out) const; + void Handle(const LifecycleCommand& cmd); + + Registry* registry_; + std::string queue_key_; + std::atomic running_; +}; + +} diff --git a/executor/mem-cuda/src/runtime/sync.cpp b/executor/mem-cuda/src/runtime/sync.cpp new file mode 100644 index 0000000..650a387 --- /dev/null +++ b/executor/mem-cuda/src/runtime/sync.cpp @@ -0,0 +1,11 @@ +#include "sync.h" + +namespace memcuda { + +StreamSync::StreamSync() {} + +void StreamSync::Record() {} + +void StreamSync::Wait() {} + +} diff --git a/executor/mem-cuda/src/runtime/sync.h b/executor/mem-cuda/src/runtime/sync.h new file mode 100644 index 0000000..c7df35d --- /dev/null +++ b/executor/mem-cuda/src/runtime/sync.h @@ -0,0 +1,12 @@ +#pragma once + +namespace memcuda { + +class StreamSync { +public: + StreamSync(); + void Record(); + void Wait(); +}; + +} diff --git a/executor/mem-cuda/test/CMakeLists.txt b/executor/mem-cuda/test/CMakeLists.txt new file mode 100644 index 0000000..a1f1f7c --- /dev/null +++ b/executor/mem-cuda/test/CMakeLists.txt @@ -0,0 +1,28 @@ +# test/CMakeLists.txt + +add_executable(ipc_demo ipc_demo.cpp) +target_link_libraries(ipc_demo PRIVATE memcuda) + +add_executable(registry_demo registry_demo.cpp) +target_link_libraries(registry_demo PRIVATE memcuda) + +add_executable(lifecycle_demo lifecycle_demo.cpp) +target_link_libraries(lifecycle_demo PRIVATE memcuda) + +install(TARGETS ipc_demo registry_demo lifecycle_demo DESTINATION bin) +# test/CMakeLists.txt + +cmake_minimum_required(VERSION 3.18) + +# test/CMakeLists.txt + +add_executable(ipc_demo ipc_demo.cpp) +target_link_libraries(ipc_demo PRIVATE memcuda) + +add_executable(registry_demo registry_demo.cpp) +target_link_libraries(registry_demo PRIVATE memcuda) + +add_executable(lifecycle_demo lifecycle_demo.cpp) +target_link_libraries(lifecycle_demo PRIVATE memcuda) + +install(TARGETS ipc_demo registry_demo lifecycle_demo DESTINATION bin)