diff --git a/CMakeLists.txt b/CMakeLists.txt index e283879..50007aa 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,5 +1,5 @@ cmake_minimum_required(VERSION 3.14) -project(VeloTask VERSION 1.0.0 LANGUAGES CXX) +project(TaskFlow VERSION 1.0.0 LANGUAGES CXX) set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_STANDARD_REQUIRED ON) @@ -18,24 +18,40 @@ if(NOT nlohmann_json_FOUND) FetchContent_MakeAvailable(json) endif() -add_library(VeloTask INTERFACE) -add_library(VeloTask::VeloTask ALIAS VeloTask) +add_library(TaskFlow INTERFACE) +add_library(TaskFlow::TaskFlow ALIAS TaskFlow) -target_include_directories(VeloTask INTERFACE +target_include_directories(TaskFlow INTERFACE $ $ ) -target_link_libraries(VeloTask INTERFACE +target_link_libraries(TaskFlow INTERFACE Threads::Threads nlohmann_json::nlohmann_json ) if(CMAKE_PROJECT_NAME STREQUAL PROJECT_NAME) - option(VELOTASK_BUILD_EXAMPLES "Build example applications" ON) - if(VELOTASK_BUILD_EXAMPLES) - add_executable(velotask_example examples/main.cpp) - target_link_libraries(velotask_example PRIVATE VeloTask::VeloTask) + option(TASKFLOW_BUILD_EXAMPLES "Build example applications" ON) + if(TASKFLOW_BUILD_EXAMPLES) + # Build individual example executables + add_executable(basic_task_submission examples/basic_task_submission.cpp) + target_link_libraries(basic_task_submission PRIVATE TaskFlow::TaskFlow) + + add_executable(multiple_tasks examples/multiple_tasks.cpp) + target_link_libraries(multiple_tasks PRIVATE TaskFlow::TaskFlow) + + add_executable(task_with_failure examples/task_with_failure.cpp) + target_link_libraries(task_with_failure PRIVATE TaskFlow::TaskFlow) + + add_executable(task_with_progress examples/task_with_progress.cpp) + target_link_libraries(task_with_progress PRIVATE TaskFlow::TaskFlow) + + add_executable(task_with_result examples/task_with_result.cpp) + target_link_libraries(task_with_result PRIVATE TaskFlow::TaskFlow) + + add_executable(persistent_task examples/persistent_task.cpp) + target_link_libraries(persistent_task PRIVATE TaskFlow::TaskFlow) endif() endif() @@ -44,30 +60,30 @@ include(CMakePackageConfigHelpers) install(DIRECTORY include/ DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}) -install(TARGETS VeloTask - EXPORT VeloTaskTargets +install(TARGETS TaskFlow + EXPORT TaskFlowTargets ) -install(EXPORT VeloTaskTargets - FILE VeloTaskTargets.cmake - NAMESPACE VeloTask:: - DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/VeloTask +install(EXPORT TaskFlowTargets + FILE TaskFlowTargets.cmake + NAMESPACE TaskFlow:: + DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/TaskFlow ) configure_package_config_file( - "${CMAKE_CURRENT_SOURCE_DIR}/VeloTaskConfig.cmake.in" - "${CMAKE_CURRENT_BINARY_DIR}/VeloTaskConfig.cmake" - INSTALL_DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/VeloTask + "${CMAKE_CURRENT_SOURCE_DIR}/cmake/TaskFlowConfig.cmake.in" + "${CMAKE_CURRENT_BINARY_DIR}/TaskFlowConfig.cmake" + INSTALL_DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/TaskFlow ) write_basic_package_version_file( - "${CMAKE_CURRENT_BINARY_DIR}/VeloTaskConfigVersion.cmake" + "${CMAKE_CURRENT_BINARY_DIR}/TaskFlowConfigVersion.cmake" VERSION ${PROJECT_VERSION} COMPATIBILITY SameMajorVersion ) install(FILES - "${CMAKE_CURRENT_BINARY_DIR}/VeloTaskConfig.cmake" - "${CMAKE_CURRENT_BINARY_DIR}/VeloTaskConfigVersion.cmake" - DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/VeloTask + "${CMAKE_CURRENT_BINARY_DIR}/TaskFlowConfig.cmake" + "${CMAKE_CURRENT_BINARY_DIR}/TaskFlowConfigVersion.cmake" + DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/TaskFlow ) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000..321ff36 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,248 @@ +# Contributing to TaskFlow + +Thank you for your interest in contributing to TaskFlow! We welcome contributions from everyone. This document provides guidelines and information for contributors. + +## 📋 Table of Contents + +- [Getting Started](#getting-started) +- [Development Environment](#development-environment) +- [Code Style](#code-style) +- [Building and Testing](#building-and-testing) +- [Submitting Changes](#submitting-changes) +- [Pull Request Process](#pull-request-process) +- [Reporting Issues](#reporting-issues) +- [License](#license) + +## 🚀 Getting Started + +### Prerequisites + +- **C++17** compatible compiler (GCC 7+, Clang 5+, MSVC 2017+) +- **CMake** 3.14 or higher +- **Git** for version control + +### Fork and Clone + +1. Fork the repository on GitHub +2. Clone your fork locally: + ```bash + git clone https://github.com/your-username/VeloTask.git + cd VeloTask + ``` +3. Add the upstream repository: + ```bash + git remote add upstream https://github.com/merlotqi/VeloTask.git + ``` + +### Dependencies + +TaskFlow uses the following dependencies: + +- **[nlohmann/json](https://github.com/nlohmann/json)**: For JSON data handling (automatically fetched via CMake if not found) +- **Threads**: System threading library + +## 🛠 Development Environment + +### Setting Up + +1. Ensure you have CMake installed +2. Configure the project: + ```bash + mkdir build && cd build + cmake -S .. -B . + ``` + +3. Build the project: + ```bash + cmake --build . + ``` + +### IDE Setup + +- **Visual Studio Code**: Use the provided `.vscode/` settings and extensions +- **CLion**: Automatically detects CMake configuration +- **VS Code with CMake Tools**: Recommended for best CMake integration + +## 💅 Code Style + +TaskFlow follows specific coding standards to maintain code quality and consistency. + +### Formatting + +- Use **clang-format** with the provided `.clang-format` configuration +- Based on Google style with 120 character line limit +- Run clang-format before committing: + ```bash + find . -name "*.cpp" -o -name "*.hpp" | xargs clang-format -i + ``` + +### Naming Conventions + +- **Classes/Structs**: PascalCase (e.g., `TaskManager`, `TaskCtx`) +- **Functions/Methods**: camelCase (e.g., `submit_task()`, `get_progress()`) +- **Variables**: snake_case (e.g., `task_id`, `progress_info`) +- **Constants**: SCREAMING_SNAKE_CASE (e.g., `MAX_THREADS`) +- **Namespaces**: lowercase (e.g., `taskflow`) + +### Code Guidelines + +- **C++17 Features**: Use modern C++17 features where appropriate +- **Error Handling**: Use exceptions for exceptional cases, return values for expected errors +- **Documentation**: Document public APIs with clear comments +- **Thread Safety**: Ensure thread safety for shared resources +- **Performance**: Consider performance implications of changes + +### File Organization + +``` +include/taskflow/ # Public headers +├── task_manager.hpp # Main task management +├── task_traits.hpp # Task trait definitions +├── task_ctx.hpp # Task execution context +├── state_storage.hpp # Internal state management +├── threadpool.hpp # Thread pool implementation +└── any_task.hpp # Type-erased task wrapper + +examples/ # Example programs +├── task_types.hpp # Shared task type definitions +├── basic_task_submission.cpp +├── multiple_tasks.cpp +└── ... +``` + +## 🏗 Building and Testing + +### Build Process + +```bash +# Configure +cmake -S . -B build + +# Build +cmake --build build + +# Build with specific configuration +cmake --build build --config Release +``` + +### Build Options + +- `TASKFLOW_BUILD_EXAMPLES=ON` (default): Build example programs +- `CMAKE_BUILD_TYPE=Debug|Release`: Build configuration + +### Running Examples + +After building, you can run individual examples: + +```bash +# Run basic task submission example +./build/basic_task_submission + +# Run multiple tasks example +./build/multiple_tasks + +# Run task with progress example +./build/task_with_progress +``` + +### Testing + +Currently, TaskFlow uses example programs for testing. Each example demonstrates specific functionality: + +- `basic_task_submission`: Basic task submission and execution +- `multiple_tasks`: Concurrent task execution +- `task_with_failure`: Error handling +- `task_with_progress`: Progress reporting +- `task_with_result`: Result storage +- `persistent_task`: Persistent task reawakening + +## 📝 Submitting Changes + +### Commit Guidelines + +- Use clear, descriptive commit messages +- Start with a verb in imperative mood (e.g., "Add", "Fix", "Update") +- Reference issue numbers when applicable (e.g., "Fix #123: Handle edge case") +- Keep commits focused on single changes + +### Example Commit Messages + +``` +Add support for custom progress types +Fix memory leak in task cancellation +Update documentation for persistent tasks +Refactor thread pool for better performance +``` + +### Branch Naming + +- Use descriptive branch names +- Prefix with feature type: `feature/`, `bugfix/`, `docs/`, `refactor/` + +``` +feature/add-custom-result-types +bugfix/handle-cancellation-race-condition +docs/update-contribution-guide +refactor/simplify-task-traits +``` + +## 🔄 Pull Request Process + +1. **Create a Branch**: Create a feature branch from `main` +2. **Make Changes**: Implement your changes with tests +3. **Test Locally**: Ensure all examples build and run correctly +4. **Format Code**: Run clang-format on your changes +5. **Commit**: Make focused commits with clear messages +6. **Push**: Push your branch to your fork +7. **Create PR**: Open a pull request against the main repository + +### Pull Request Template + +When creating a pull request, include: + +- **Title**: Clear, descriptive title +- **Description**: Detailed explanation of changes +- **Related Issues**: Reference any related issues +- **Testing**: Describe how you tested the changes +- **Breaking Changes**: Note any breaking changes + +### Code Review + +- Address review comments promptly +- Be open to feedback and suggestions +- Update your PR based on review feedback +- Keep the PR focused and avoid scope creep + +## 🐛 Reporting Issues + +### Bug Reports + +When reporting bugs, please include: + +- **Description**: Clear description of the issue +- **Steps to Reproduce**: Step-by-step instructions +- **Expected Behavior**: What should happen +- **Actual Behavior**: What actually happens +- **Environment**: OS, compiler version, CMake version +- **Code Sample**: Minimal code to reproduce the issue + +### Feature Requests + +For feature requests, include: + +- **Description**: What feature you'd like to see +- **Use Case**: Why this feature would be useful +- **Implementation Ideas**: Any thoughts on implementation +- **Alternatives**: Alternative approaches considered + +## 📄 License + +By contributing to TaskFlow, you agree that your contributions will be licensed under the same license as the project (see LICENSE file). + +## 🙏 Recognition + +Contributors will be acknowledged in the project documentation. We appreciate all contributions, from bug reports to major features! + +--- + +Thank you for contributing to TaskFlow! Your help makes this project better for everyone. 🎉 diff --git a/README.md b/README.md index 31b2098..7fd09f4 100644 --- a/README.md +++ b/README.md @@ -1,100 +1,204 @@ -# VeloTask +# TaskFlow -**VeloTask** is a lightweight, high-performance, and thread-safe C++17 asynchronous task management library. It provides a multi-level scheduling architecture including a priority-based task pool, a robust thread pool, and a centralized management singleton. +**TaskFlow** is a lightweight, high-performance, and thread-safe C++17 asynchronous task management library. It provides a simple yet powerful API for submitting and managing asynchronous tasks with progress tracking, cancellation, and error handling. ## 🚀 Key Features -* **Priority Scheduling**: Supports 5 levels of priority (`lowest`, `low`, `normal`, `high`, `critical`) with customizable values (0-255) using a sorted `multimap` queue. -* **Flexible Task Types**: -* **Class-based**: Inherit from `TaskBase` for complex logic. -* **Lambda-based**: Use `FunctionalTask` for quick, "fire-and-forget" or progress-monitored functions. -* **Batch Operations**: Submit and cancel multiple tasks in a single call for improved efficiency. -* **Automatic Retries**: Configurable retry mechanism for failed tasks with exponential backoff potential. -* **Task Dependencies**: Support for DAG (Directed Acyclic Graph) scheduling with inter-task dependencies to avoid manual ordering. -* **Life-cycle Management**: Distinguish between `disposable` (clean up after finish) and `persistent` (reusable/restartable) tasks. -* **Observability**: Integrated `TaskObserver` pattern to track progress, completion, and errors in real-time. -* **Smart Cleanup**: Background threads automatically prune historical task info based on time (TTL) and capacity (LRU) to prevent memory bloating. -* **Cross-Platform**: Thread naming support for both Windows (`SetThreadDescription`) and Linux (`pthread_setname_np`). +* **Simple API**: Submit tasks as lambda functions or callable objects +* **Flexible Observability**: Choose between no observation, basic state tracking, or full progress reporting +* **Cancellation Support**: Tasks can check for cancellation and handle it gracefully +* **Error Handling**: Comprehensive error reporting and state management +* **Persistent Tasks**: Reusable tasks that can be reawakened with new parameters +* **Thread-Safe**: Designed for concurrent access from multiple threads +* **C++17**: Modern C++ with concepts and constexpr where available +* **Cross-Platform**: Works on Windows, Linux, and macOS --- ## 🏗 Architecture -1. **TaskManager**: The primary singleton interface. Handles task registration, submission, and global status tracking. -2. **TaskPool**: Manages the life cycle of tasks, handles scheduling logic, and maintains the priority queue. -3. **ThreadPool**: A low-level execution engine that manages a fixed set of worker threads. -4. **TaskBase**: The abstract base class providing common functionality like cancellation, waiting, and state management. +1. **TaskManager**: The main singleton that manages task submission and execution +2. **TaskCtx**: Context object passed to tasks for state management and progress reporting +3. **StateStorage**: Internal storage for task states, progress, and errors +4. **Thread Pool**: Manages worker threads for task execution --- ## 💻 Quick Start -### 1. Define a Custom Task +### 1. Include the Header ```cpp -class MyDownloadTask : public velo::TaskBase { -public: - using TaskBase::TaskBase; - std::string getName() const override { return "Downloader"; } - std::string getDescription() const override { return "Downloads files via HTTP"; } - - void execute() override { - // Business logic here - for(int i = 0; i <= 100; i += 10) { - if (isCancelRequested()) return; - updateProgress({{"percent", i}}); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } - finishExecute({{"status", "complete"}}); +#include +``` + +### 2. Submit a Task + +```cpp +// Get the task manager instance +auto& manager = taskflow::TaskManager::getInstance(); + +// Start processing (specify number of threads) +manager.start_processing(4); + +// Submit a simple task +auto task_id = manager.submit_task([](taskflow::TaskCtx& ctx) { + std::cout << "Task " << ctx.id << " is running" << std::endl; + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + ctx.success(); // Mark as successful +}); +``` + +### 3. Monitor Task State + +```cpp +// Query task state +auto state = manager.query_state(task_id); +if (state) { + std::cout << "Task state: " << static_cast(*state) << std::endl; +} +``` + +### 4. Task with Progress + +```cpp +auto progress_task = manager.submit_task([](taskflow::TaskCtx& ctx) { + for (int i = 0; i <= 100; i += 25) { + ctx.report_progress(static_cast(i) / 100.0f, "Step " + std::to_string(i)); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); } - void cleanup() override {} -}; + ctx.success(); +}); + +// Monitor progress +if (auto progress = manager.get_progress(progress_task)) { + std::cout << "Progress: " << progress->progress * 100.0f << "% - " << progress->message << std::endl; +} +``` + +### 5. Handle Errors + +```cpp +auto failing_task = manager.submit_task([](taskflow::TaskCtx& ctx) { + try { + // Some work that might fail + throw std::runtime_error("Something went wrong"); + } catch (const std::exception& e) { + ctx.failure(e.what()); + } +}); +// Check for errors +if (auto error = manager.get_error(failing_task)) { + std::cout << "Error: " << *error << std::endl; +} ``` -### 2. Register and Submit +### 6. Cancellation ```cpp -auto& manager = velo::TaskManager::getInstance(); +auto cancellable_task = manager.submit_task([](taskflow::TaskCtx& ctx) { + while (!ctx.is_cancelled()) { + // Do work, check for cancellation periodically + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + if (ctx.is_cancelled()) { + ctx.failure("Task was cancelled"); + } else { + ctx.success(); + } +}); -// Register the type -manager.register_task_type("download_service"); +// Cancel the task +manager.cancel_task(cancellable_task); +``` + +### 7. Task Results -// Submit via Type Name -std::string id = manager.submitTask("download_service", {{"url", "https://example.com"}}, - velo::TaskLifecycle::disposable, - velo::TaskPriority::high); +Tasks can now store execution results that persist beyond completion: -// OR Submit via Lambda -manager.submitTask([](const json& input, auto progress, auto is_cancelled) { - progress({{"step", "starting"}}); - return json({{"result", "ok"}}); +```cpp +auto result_task = manager.submit_task([](taskflow::TaskCtx& ctx) { + // Process data and create result + nlohmann::json result = { + {"processed_items", 42}, + {"success_rate", 0.95}, + {"output", "processed data"} + }; + + // Store result with task completion + ctx.success_with_result(taskflow::ResultPayload::json(result)); }); +// Retrieve result after completion +if (auto result = manager.get_result(result_task)) { + if (result->kind == taskflow::ResultKind::json) { + std::cout << "Result: " << result->data.json_data.dump() << std::endl; + } +} ``` -### 3. Monitor Progress +### 8. Persistent Tasks + +Persistent tasks can be reawakened with new parameters after completion: ```cpp -auto info = manager.getTaskInfo(id); -if (info) { - std::cout << "State: " << (int)info->state << std::endl; - std::cout << "Progress: " << info->currentProgressInfo.dump() << std::endl; +// Submit a persistent task +auto persistent_id = manager.submit_task([](taskflow::TaskCtx& ctx) { + std::cout << "Initial execution" << std::endl; + ctx.success(); +}, taskflow::TaskLifecycle::persistent); + +// Check if it's persistent +if (manager.is_persistent_task(persistent_id)) { + // Reawaken with new logic + manager.reawaken_task(persistent_id, [](taskflow::TaskCtx& ctx) { + std::cout << "Reawakened with new logic!" << std::endl; + ctx.success(); + }); } +``` + +--- + +## 🔧 Task Traits Configuration + +You can customize task behavior by specializing the `task_traits` template: + +```cpp +// Example: Define a custom task type with specific traits +struct MyProgressTask { + static constexpr taskflow::TaskObservability observability = taskflow::TaskObservability::progress; + static constexpr bool cancellable = true; +}; +// Use the custom task +MyProgressTask my_task; +auto task_id = manager.submit_task(my_task); ``` +### Available Task Capabilities + +- **Observability Levels**: + - `TaskObservability::none`: No observation capabilities + - `TaskObservability::basic`: Basic state observation (start/end only) + - `TaskObservability::progress`: Full observation with progress reporting + +- **Cancellation**: Set `cancellable = true` to enable cancellation support + +- **Lifecycle**: Choose between `TaskLifecycle::disposable` and `TaskLifecycle::persistent` + --- ## ⚙️ Configuration -Within `taskmanager.hpp`, you can adjust the following constants to fit your memory constraints: +The TaskManager automatically manages cleanup of completed tasks: -* `max_info_age_`: How long finished task records stay in memory (default: **24 hours**). -* `max_info_count_`: Maximum number of records before the LRU trimmer kicks in (default: **1000 records**). -* `cleanup_loop`: Runs every **5 minutes** to keep the manager healthy. +* **Cleanup Interval**: Runs every 30 minutes by default +* **Max Task Age**: Tasks older than 24 hours are automatically cleaned up +* **Thread Count**: Specify number of worker threads when calling `start_processing()` ## 🛠 Dependencies -* [nlohmann/json](https://github.com/nlohmann/json): For input/output data and progress reporting. -* **C++17** or higher. +* [nlohmann/json](https://github.com/nlohmann/json): For internal data handling +* **C++17** or higher diff --git a/VeloTaskConfig.cmake.in b/cmake/TaskFlowConfig.cmake.in similarity index 64% rename from VeloTaskConfig.cmake.in rename to cmake/TaskFlowConfig.cmake.in index 9df03e0..ae32297 100644 --- a/VeloTaskConfig.cmake.in +++ b/cmake/TaskFlowConfig.cmake.in @@ -5,4 +5,4 @@ include(CMakeFindDependencyMacro) find_dependency(Threads) find_dependency(nlohmann_json) -include("${CMAKE_CURRENT_LIST_DIR}/veloTaskTargets.cmake") +include("${CMAKE_CURRENT_LIST_DIR}/TaskFlowTargets.cmake") diff --git a/examples/basic_task_submission.cpp b/examples/basic_task_submission.cpp new file mode 100644 index 0000000..efcc579 --- /dev/null +++ b/examples/basic_task_submission.cpp @@ -0,0 +1,38 @@ +#include +#include + +int main() { + std::cout << "TaskFlow Example: Basic Task Submission" << std::endl; + + // Get the task manager instance + auto& manager = taskflow::TaskManager::getInstance(); + + // Start processing with 4 threads + manager.start_processing(4); + + // Basic task submission + auto task = [] (taskflow::TaskCtx& ctx) { + std::cout << "Executing task: " << ctx.id << std::endl; + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + ctx.success(); + }; + + auto id = manager.submit_task(task); + std::cout << "Submitted task: " << id << std::endl; + + // Wait for completion + while (true) { + auto state = manager.query_state(id); + if (state && *state != taskflow::TaskState::running && *state != taskflow::TaskState::created) { + std::cout << "Task completed with state: " << static_cast(*state) << std::endl; + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + + // Stop processing + manager.stop_processing(); + + std::cout << "Example completed!" << std::endl; + return 0; +} diff --git a/examples/main.cpp b/examples/main.cpp deleted file mode 100644 index 2d16300..0000000 --- a/examples/main.cpp +++ /dev/null @@ -1,127 +0,0 @@ -#include -#include - -class ExampleTask : public velo::TaskBase { -public: - using TaskBase::TaskBase; - std::string getName() const override { return "ExampleTask"; } - std::string getDescription() const override { return "An example task"; } - - void execute() override { - std::cout << "Executing task: " << getTaskId() << std::endl; - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - finishExecute({{"result", "success"}}); - } - void cleanup() override {} -}; - -class FailingTask : public velo::TaskBase { -public: - using TaskBase::TaskBase; - std::string getName() const override { return "FailingTask"; } - std::string getDescription() const override { return "A task that may fail"; } - - void execute() override { - static int counter = 0; - counter++; - if (counter % 3 != 0) { // Fail 2 out of 3 times - std::cout << "Task " << getTaskId() << " failed, attempt " << counter << std::endl; - failExecute("Simulated failure"); - } else { - std::cout << "Task " << getTaskId() << " succeeded on attempt " << counter << std::endl; - finishExecute({{"result", "success"}}); - } - } - void cleanup() override {} -}; - -int main() { - std::cout << "VeloTask library examples" << std::endl; - - auto& manager = velo::TaskManager::getInstance(); - - // Register task types - manager.registerTaskType("example"); - - // Example 1: Basic task submission - std::cout << "\n1. Basic task submission:" << std::endl; - auto id1 = manager.submitTask("example", {}, velo::TaskLifecycle::disposable, velo::TaskPriority::normal); - std::cout << "Submitted task: " << id1 << std::endl; - - // Wait for completion - auto info = manager.getTaskInfo(id1); - while (info && info->state != velo::TaskState::success && info->state != velo::TaskState::failure) { - std::this_thread::sleep_for(std::chrono::milliseconds(50)); - info = manager.getTaskInfo(id1); - } - std::cout << "Task completed with state: " << (int)info->state << std::endl; - - // Example 2: Batch submission - std::cout << "\n2. Batch task submission:" << std::endl; - std::vector> batchTasks = { - {"example", {}, velo::TaskLifecycle::disposable, velo::TaskPriority::high}, - {"example", {}, velo::TaskLifecycle::disposable, velo::TaskPriority::low}, - {"example", {}, velo::TaskLifecycle::disposable, velo::TaskPriority::normal} - }; - auto batchIds = manager.submitTasks(batchTasks); - std::cout << "Submitted " << batchIds.size() << " tasks in batch" << std::endl; - - // Wait for all to complete - bool allDone = false; - while (!allDone) { - allDone = true; - for (const auto& id : batchIds) { - auto taskInfo = manager.getTaskInfo(id); - if (taskInfo && (taskInfo->state == velo::TaskState::pending || taskInfo->state == velo::TaskState::running)) { - allDone = false; - break; - } - } - if (!allDone) std::this_thread::sleep_for(std::chrono::milliseconds(50)); - } - std::cout << "All batch tasks completed" << std::endl; - - // Example 3: Retry mechanism - std::cout << "\n3. Retry mechanism:" << std::endl; - auto failingTask = std::make_shared("failing_task", velo::TaskLifecycle::disposable, velo::TaskPriority::normal, 2); - auto failId = manager.submitTask(failingTask, {}); - std::cout << "Submitted failing task: " << failId << std::endl; - - // Wait for completion - info = manager.getTaskInfo(failId); - while (info && info->state != velo::TaskState::success && info->state != velo::TaskState::failure) { - std::this_thread::sleep_for(std::chrono::milliseconds(50)); - info = manager.getTaskInfo(failId); - } - std::cout << "Failing task final state: " << (int)info->state << std::endl; - - // Example 4: Task dependencies - std::cout << "\n4. Task dependencies:" << std::endl; - auto dep1 = manager.submitTask("example", {}, velo::TaskLifecycle::disposable, velo::TaskPriority::normal); - auto dep2 = manager.submitTask("example", {}, velo::TaskLifecycle::disposable, velo::TaskPriority::normal); - - // Create dependent task - auto dependentTask = std::make_shared("dependent_task"); - dependentTask->setDependencies({dep1, dep2}); - auto depId = manager.submitTask(dependentTask, {}); - - std::cout << "Submitted dependent task: " << depId << " (depends on " << dep1 << " and " << dep2 << ")" << std::endl; - - // Wait for dependent task - info = manager.getTaskInfo(depId); - while (info && info->state != velo::TaskState::success && info->state != velo::TaskState::failure) { - std::this_thread::sleep_for(std::chrono::milliseconds(50)); - info = manager.getTaskInfo(depId); - } - std::cout << "Dependent task completed" << std::endl; - - // Example 5: Custom priority - std::cout << "\n5. Custom priority:" << std::endl; - auto customTask = std::make_shared("custom_priority_task"); - customTask->setPriority(static_cast(150)); // Higher than critical - auto customId = manager.submitTask(customTask, {}); - std::cout << "Submitted task with custom priority 150: " << customId << std::endl; - - std::cout << "\nAll examples completed!" << std::endl; - return 0; -} diff --git a/examples/multiple_tasks.cpp b/examples/multiple_tasks.cpp new file mode 100644 index 0000000..aaeeb77 --- /dev/null +++ b/examples/multiple_tasks.cpp @@ -0,0 +1,47 @@ +#include +#include +#include + +int main() { + std::cout << "TaskFlow Example: Multiple Tasks" << std::endl; + + // Get the task manager instance + auto& manager = taskflow::TaskManager::getInstance(); + + // Start processing with 4 threads + manager.start_processing(4); + + // Multiple task submission + std::vector task_ids; + for (int i = 0; i < 3; ++i) { + auto task = [i] (taskflow::TaskCtx& ctx) { + std::cout << "Task " << i << " (ID: " << ctx.id << ") executing" << std::endl; + std::this_thread::sleep_for(std::chrono::milliseconds(50 + i * 25)); + ctx.success(); + }; + auto id = manager.submit_task(task); + task_ids.push_back(id); + std::cout << "Submitted task " << i << ": " << id << std::endl; + } + + // Wait for all to complete + bool all_done = false; + while (!all_done) { + all_done = true; + for (auto id : task_ids) { + auto state = manager.query_state(id); + if (!state || *state == taskflow::TaskState::running || *state == taskflow::TaskState::created) { + all_done = false; + break; + } + } + if (!all_done) std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + std::cout << "All tasks completed" << std::endl; + + // Stop processing + manager.stop_processing(); + + std::cout << "Example completed!" << std::endl; + return 0; +} diff --git a/examples/persistent_task.cpp b/examples/persistent_task.cpp new file mode 100644 index 0000000..421c1ed --- /dev/null +++ b/examples/persistent_task.cpp @@ -0,0 +1,85 @@ +#include +#include + +int main() { + std::cout << "TaskFlow Example: Persistent Task Reawakening" << std::endl; + + // Get the task manager instance + auto& manager = taskflow::TaskManager::getInstance(); + + // Start processing with 4 threads + manager.start_processing(4); + + // Counter to track reawakenings + static int reawaken_count = 0; + + auto persistent_task = [] (taskflow::TaskCtx& ctx) { + std::cout << "Persistent task " << ctx.id << " executing (count: " << ++reawaken_count << ")" << std::endl; + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + // Store result for persistent task + nlohmann::json result = { + {"execution_count", reawaken_count}, + {"task_type", "persistent"} + }; + ctx.success_with_result(taskflow::ResultPayload::json(result)); + }; + + auto persistent_id = manager.submit_task(persistent_task, taskflow::TaskLifecycle::persistent); + std::cout << "Submitted persistent task: " << persistent_id << std::endl; + std::cout << "Is persistent: " << (manager.is_persistent_task(persistent_id) ? "Yes" : "No") << std::endl; + + // Wait for first execution + while (true) { + auto state = manager.query_state(persistent_id); + if (state && *state != taskflow::TaskState::running && *state != taskflow::TaskState::created) { + std::cout << "Persistent task first execution completed" << std::endl; + + // Get first result + if (auto result = manager.get_result(persistent_id)) { + std::cout << "First execution result: " << result->data.json_data.dump() << std::endl; + } + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + + // Reawaken the persistent task with new parameters + std::cout << "Reawakening persistent task..." << std::endl; + auto new_persistent_task = [] (taskflow::TaskCtx& ctx) { + std::cout << "Reawakened persistent task " << ctx.id << " with new logic!" << std::endl; + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + // Store updated result + nlohmann::json result = { + {"execution_count", 2}, + {"task_type", "persistent_reawakened"}, + {"message", "This is the reawakened execution"} + }; + ctx.success_with_result(taskflow::ResultPayload::json(result)); + }; + + bool reawaken_success = manager.reawaken_task(persistent_id, new_persistent_task); + std::cout << "Reawaken request: " << (reawaken_success ? "Success" : "Failed") << std::endl; + + // Wait for reawakened execution + while (true) { + auto state = manager.query_state(persistent_id); + if (state && *state != taskflow::TaskState::running && *state != taskflow::TaskState::created) { + std::cout << "Persistent task reawakened execution completed" << std::endl; + + // Get updated result + if (auto result = manager.get_result(persistent_id)) { + std::cout << "Reawakened execution result: " << result->data.json_data.dump() << std::endl; + } + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + + // Stop processing + manager.stop_processing(); + + std::cout << "Example completed!" << std::endl; + return 0; +} diff --git a/examples/task_types.hpp b/examples/task_types.hpp new file mode 100644 index 0000000..83fff0d --- /dev/null +++ b/examples/task_types.hpp @@ -0,0 +1,77 @@ +#pragma once + +#include +#include + +// Example task types with different observability levels +struct NoObservationTask { + static constexpr taskflow::TaskObservability observability = taskflow::TaskObservability::none; + static constexpr bool cancellable = false; + + void operator()(taskflow::TaskCtx& ctx) const { + std::cout << "No observation task executing" << std::endl; + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + ctx.success(); + } +}; + +struct BasicObservationTask { + static constexpr taskflow::TaskObservability observability = taskflow::TaskObservability::basic; + static constexpr bool cancellable = false; + + void operator()(taskflow::TaskCtx& ctx) const { + std::cout << "Basic observation task executing" << std::endl; + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + ctx.success(); + } +}; + +struct ProgressObservationTask { + static constexpr taskflow::TaskObservability observability = taskflow::TaskObservability::progress; + static constexpr bool cancellable = false; + + void operator()(taskflow::TaskCtx& ctx) const { + std::cout << "Progress observation task executing" << std::endl; + ctx.report_progress(0.5f, "Halfway done"); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + ctx.success(); + } +}; + +struct CancellableTask { + static constexpr taskflow::TaskObservability observability = taskflow::TaskObservability::basic; + static constexpr bool cancellable = true; + + void operator()(taskflow::TaskCtx& ctx) const { + std::cout << "Cancellable task executing" << std::endl; + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + ctx.success(); + } +}; + +// Custom task with progress_info trait detection +struct CustomProgressTask { + // Indicate that this task has custom progress info + static constexpr bool progress_info = true; + + void operator()(taskflow::TaskCtx& ctx) const { + std::cout << "Custom progress task executing" << std::endl; + + // Report progress with custom data structure + struct CustomProgress { + int current_step; + int total_steps; + char phase[32]; + double percentage; + }; + + ctx.report_progress(CustomProgress{10, 100, "starting", 10.0}); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + + ctx.report_progress(CustomProgress{50, 100, "processing", 50.0}); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + + ctx.report_progress(CustomProgress{100, 100, "completed", 100.0}); + ctx.success(); + } +}; diff --git a/examples/task_with_failure.cpp b/examples/task_with_failure.cpp new file mode 100644 index 0000000..3233ac1 --- /dev/null +++ b/examples/task_with_failure.cpp @@ -0,0 +1,40 @@ +#include +#include + +int main() { + std::cout << "TaskFlow Example: Task with Failure" << std::endl; + + // Get the task manager instance + auto& manager = taskflow::TaskManager::getInstance(); + + // Start processing with 4 threads + manager.start_processing(4); + + // Task with failure + auto failing_task = [] (taskflow::TaskCtx& ctx) { + std::cout << "Task " << ctx.id << " failing" << std::endl; + ctx.failure("Simulated failure"); + }; + + auto fail_id = manager.submit_task(failing_task); + std::cout << "Submitted failing task: " << fail_id << std::endl; + + // Wait for completion + while (true) { + auto state = manager.query_state(fail_id); + if (state && *state != taskflow::TaskState::running && *state != taskflow::TaskState::created) { + std::cout << "Failing task completed with state: " << static_cast(*state) << std::endl; + if (auto error = manager.get_error(fail_id)) { + std::cout << "Error message: " << *error << std::endl; + } + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + + // Stop processing + manager.stop_processing(); + + std::cout << "Example completed!" << std::endl; + return 0; +} diff --git a/examples/task_with_progress.cpp b/examples/task_with_progress.cpp new file mode 100644 index 0000000..278d9be --- /dev/null +++ b/examples/task_with_progress.cpp @@ -0,0 +1,43 @@ +#include +#include + +int main() { + std::cout << "TaskFlow Example: Task with Progress" << std::endl; + + // Get the task manager instance + auto& manager = taskflow::TaskManager::getInstance(); + + // Start processing with 4 threads + manager.start_processing(4); + + // Task with progress + auto progress_task = [] (taskflow::TaskCtx& ctx) { + for (int i = 0; i <= 100; i += 25) { + ctx.report_progress(static_cast(i) / 100.0f, "Processing step " + std::to_string(i)); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + ctx.success(); + }; + + auto progress_id = manager.submit_task(progress_task); + std::cout << "Submitted progress task: " << progress_id << std::endl; + + // Monitor progress + while (true) { + auto state = manager.query_state(progress_id); + if (state && *state != taskflow::TaskState::running && *state != taskflow::TaskState::created) { + std::cout << "Progress task completed with state: " << static_cast(*state) << std::endl; + break; + } + if (auto progress = manager.get_progress(progress_id)) { + std::cout << "Progress: " << progress->first * 100.0f << "% - " << progress->second << std::endl; + } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + // Stop processing + manager.stop_processing(); + + std::cout << "Example completed!" << std::endl; + return 0; +} diff --git a/examples/task_with_result.cpp b/examples/task_with_result.cpp new file mode 100644 index 0000000..47b1e2e --- /dev/null +++ b/examples/task_with_result.cpp @@ -0,0 +1,58 @@ +#include +#include + +int main() { + std::cout << "TaskFlow Example: Task with Custom Result Storage" << std::endl; + + // Get the task manager instance + auto& manager = taskflow::TaskManager::getInstance(); + + // Start processing with 4 threads + manager.start_processing(4); + + // Task with custom result storage + auto result_task = [] (taskflow::TaskCtx& ctx) { + std::cout << "Task " << ctx.id << " storing custom result" << std::endl; + + // Store different types of results + nlohmann::json result = { + {"task_id", ctx.id}, + {"status", "completed"}, + {"timestamp", std::chrono::system_clock::now().time_since_epoch().count()}, + {"data", { + {"processed_items", 42}, + {"success_rate", 0.95} + }} + }; + + ctx.success_with_result(taskflow::ResultPayload::json(result)); + }; + + auto result_id = manager.submit_task(result_task); + std::cout << "Submitted result task: " << result_id << std::endl; + + // Wait for completion and get result + while (true) { + auto state = manager.query_state(result_id); + if (state && *state != taskflow::TaskState::running && *state != taskflow::TaskState::created) { + std::cout << "Result task completed with state: " << static_cast(*state) << std::endl; + + // Retrieve and display the result + if (auto result = manager.get_result(result_id)) { + if (result->kind == taskflow::ResultKind::json) { + std::cout << "Result JSON: " << result->data.json_data.dump(2) << std::endl; + } + } else { + std::cout << "No result found" << std::endl; + } + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + + // Stop processing + manager.stop_processing(); + + std::cout << "Example completed!" << std::endl; + return 0; +} diff --git a/include/taskflow/any_task.hpp b/include/taskflow/any_task.hpp new file mode 100644 index 0000000..c1b6107 --- /dev/null +++ b/include/taskflow/any_task.hpp @@ -0,0 +1,69 @@ +#pragma once + +#include +#include +#include + +#include "task_ctx.hpp" +#include "task_traits.hpp" + +namespace taskflow { + +struct AnyTask { + TaskID id; + TaskLifecycle lifecycle{TaskLifecycle::disposable}; + std::unique_ptr storage; + void (*invoke)(void*, TaskRuntimeCtx&, ResultStorage*); + + AnyTask() : id(0), storage(nullptr, nullptr), invoke(nullptr) {} + template + AnyTask(TaskID task_id, Task task, TaskLifecycle lifecycle_type = TaskLifecycle::disposable, + typename std::enable_if, int>::type = 0) + : id(task_id), + lifecycle(lifecycle_type), + storage(new Task(std::move(task)), [](void* ptr) { delete static_cast(ptr); }), + invoke([](void* ptr, TaskRuntimeCtx& rctx, ResultStorage* result_storage) { + Task& task_ref = *static_cast(ptr); + execute(task_ref, rctx, result_storage); + }) {} + + AnyTask(AnyTask&& other) noexcept : id(other.id), storage(std::move(other.storage)), invoke(other.invoke) { + other.invoke = nullptr; + } + AnyTask& operator=(AnyTask&& other) noexcept { + if (this != &other) { + id = other.id; + storage = std::move(other.storage); + invoke = other.invoke; + other.invoke = nullptr; + } + return *this; + } + + void execute_task(TaskRuntimeCtx& rctx, ResultStorage* result_storage) { + if (invoke && storage) { + invoke(storage.get(), rctx, result_storage); + } + } + + [[nodiscard]] bool valid() const { return invoke != nullptr && storage != nullptr; } + + void reset() { + id = 0; + storage.reset(); + invoke = nullptr; + } +}; + +template +typename std::enable_if, AnyTask>::type make_any_task(TaskID id, Task task, TaskLifecycle lifecycle = TaskLifecycle::disposable) { + return AnyTask(id, std::move(task), lifecycle); +} + +template +typename std::enable_if, void>::type execute(Task& task, TaskRuntimeCtx& rctx); + +template +typename std::enable_if, void>::type execute(Task&& task, TaskRuntimeCtx& rctx); + +} // namespace taskflow diff --git a/include/taskflow/state_storage.hpp b/include/taskflow/state_storage.hpp new file mode 100644 index 0000000..a34dbf4 --- /dev/null +++ b/include/taskflow/state_storage.hpp @@ -0,0 +1,179 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "task_traits.hpp" + +namespace taskflow { + +class StateStorage { + public: + void set_state(TaskID id, TaskState state) { + std::unique_lock lock(mutex_); + states_[id] = state; + timestamps_[id] = std::chrono::system_clock::now(); + } + + [[nodiscard]] std::optional get_state(TaskID id) const { + std::shared_lock lock(mutex_); + auto it = states_.find(id); + return it != states_.end() ? std::optional(it->second) : std::nullopt; + } + + [[nodiscard]] bool has_task(TaskID id) const { + std::shared_lock lock(mutex_); + return states_.find(id) != states_.end(); + } + + void remove_task(TaskID id) { + std::unique_lock lock(mutex_); + states_.erase(id); + timestamps_.erase(id); + progress_info_.erase(id); + error_messages_.erase(id); + result_locators_.erase(id); + } + + // Template method to set progress with custom type + template + void set_progress(TaskID id, ProgressType progress_info) { + static_assert(is_valid_progress_type::value, + "\n==============================================================================\n" + " [TaskFlow Error]: Invalid ProgressType detected!\n" + " ------------------------------------------------------------------------------\n" + " 1. SIZE LIMIT: Progress objects must be <= 1024 bytes (1KB) to ensure low-latency.\n" + " 2. TYPE LIMIT: Only trivially copyable structs, std::string, or nlohmann::json are allowed.\n" + " 3. RECOMMENDATION: Do not attach large business data here. Use ResultStorage instead.\n" + "=============================================================================="); + std::unique_lock lock(mutex_); + progress_info_[id] = std::make_any(std::move(progress_info)); + timestamps_[id] = std::chrono::system_clock::now(); + } + + // Template method to get progress with custom type + template + [[nodiscard]] std::optional get_progress(TaskID id) const { + std::shared_lock lock(mutex_); + auto it = progress_info_.find(id); + if (it != progress_info_.end()) { + try { + return std::any_cast(it->second); + } catch (const std::bad_any_cast&) { + return std::nullopt; + } + } + return std::nullopt; + } + + // Backward compatibility method + void set_progress(TaskID id, float progress, const std::string& message = "") { + std::unique_lock lock(mutex_); + progress_info_[id] = std::make_any>(progress, message); + timestamps_[id] = std::chrono::system_clock::now(); + } + + void set_error(TaskID id, const std::string& message) { + std::unique_lock lock(mutex_); + error_messages_[id] = message; + } + + [[nodiscard]] std::optional get_error(TaskID id) const { + std::shared_lock lock(mutex_); + auto it = error_messages_.find(id); + return it != error_messages_.end() ? std::optional(it->second) : std::nullopt; + } + + void set_result_locator(TaskID id, ResultLocator locator) { + std::unique_lock lock(mutex_); + result_locators_[id] = locator; + } + + [[nodiscard]] std::optional get_result_locator(TaskID id) const { + std::shared_lock lock(mutex_); + auto it = result_locators_.find(id); + return it != result_locators_.end() ? std::optional(it->second) : std::nullopt; + } + + [[nodiscard]] std::optional get_timestamp(TaskID id) const { + std::shared_lock lock(mutex_); + auto it = timestamps_.find(id); + return it != timestamps_.end() ? std::optional(it->second) : std::nullopt; + } + + [[nodiscard]] std::vector get_all_task_ids() const { + std::shared_lock lock(mutex_); + std::vector ids; + ids.reserve(states_.size()); + for (const auto& [id, _] : states_) { + ids.push_back(id); + } + return ids; + } + + struct Statistics { + size_t total_tasks{0}; + size_t running_tasks{0}; + size_t completed_tasks{0}; + size_t failed_tasks{0}; + }; + + [[nodiscard]] Statistics get_statistics() const { + std::shared_lock lock(mutex_); + Statistics stats; + stats.total_tasks = states_.size(); + + for (const auto& [_, state] : states_) { + switch (state) { + case TaskState::running: + stats.running_tasks++; + break; + case TaskState::success: + stats.completed_tasks++; + break; + case TaskState::failure: + stats.failed_tasks++; + break; + default: + break; + } + } + + return stats; + } + + void cleanup_old_tasks(std::chrono::hours max_age) { + auto now = std::chrono::system_clock::now(); + auto cutoff = now - max_age; + + std::unique_lock lock(mutex_); + for (auto it = timestamps_.begin(); it != timestamps_.end();) { + if (it->second < cutoff) { + auto id = it->first; + states_.erase(id); + progress_info_.erase(id); + error_messages_.erase(id); + it = timestamps_.erase(it); + } else { + ++it; + } + } + } + + private: + mutable std::shared_mutex mutex_; + + std::unordered_map states_; + std::unordered_map timestamps_; + std::unordered_map progress_info_; + std::unordered_map error_messages_; + std::unordered_map result_locators_; +}; + +} // namespace taskflow diff --git a/include/taskflow/task_ctx.hpp b/include/taskflow/task_ctx.hpp new file mode 100644 index 0000000..b21b9df --- /dev/null +++ b/include/taskflow/task_ctx.hpp @@ -0,0 +1,123 @@ +#pragma once + +#include +#include + +#include "state_storage.hpp" +#include "task_traits.hpp" + +namespace taskflow { + +// Forward declarations +class StateStorage; + +// TaskCtx - Friendly API for task lifecycle management +struct TaskCtx { + TaskID id; + StateStorage* states; + ResultStorage* result_storage{nullptr}; // Optional result storage + std::function cancellation_checker; // For cancellation checking + + // Lifecycle management + void begin(); + void update_progress(float progress, const std::string& message = ""); + void success(); + void success_with_result(ResultPayload result); + void failure(const std::string& error_message = ""); + void failure_with_result(const std::string& error_message, ResultPayload result); + + // Cancellation support - check with callback if available + [[nodiscard]] bool is_cancelled() const { return cancellation_checker ? cancellation_checker(id) : false; } + + // Progress reporting - accepts any type defined by traits + template + void report_progress(const ProgressType& progress_info) { + if (states) { + states->set_progress(id, progress_info); + } + } + + // Backward compatibility + void report_progress(float progress, const std::string& message = "") { + if (states) { + states->set_progress(id, progress, message); + } + } + + // State queries + [[nodiscard]] TaskState current_state() const; + [[nodiscard]] bool is_running() const { return current_state() == TaskState::running; } + [[nodiscard]] bool is_completed() const { + auto state = current_state(); + return state == TaskState::success || state == TaskState::failure; + } +}; + +// Task execution context +struct TaskRuntimeCtx { + TaskID id; + StateStorage* states; + std::function cancellation_checker; + + explicit TaskRuntimeCtx(TaskID task_id, StateStorage* storage, std::function checker = nullptr) + : id(task_id), states(storage), cancellation_checker(checker) {} +}; + +} // namespace taskflow + +// TaskCtx implementation +inline void taskflow::TaskCtx::begin() { + if (states) { + states->set_state(id, taskflow::TaskState::running); + } +} + +inline void taskflow::TaskCtx::update_progress(float progress, const std::string& message) { + if (states) { + states->set_progress(id, progress, message); + } +} + +inline void taskflow::TaskCtx::success() { + if (states) { + states->set_state(id, taskflow::TaskState::success); + } +} + +inline void taskflow::TaskCtx::failure(const std::string& error_message) { + if (states) { + states->set_state(id, taskflow::TaskState::failure); + states->set_error(id, error_message); + } +} + + + +inline taskflow::TaskState taskflow::TaskCtx::current_state() const { + if (states) { + auto state = states->get_state(id); + return state.value_or(taskflow::TaskState::created); + } + return taskflow::TaskState::created; +} + +inline void taskflow::TaskCtx::success_with_result(taskflow::ResultPayload result) { + if (result_storage && states) { + auto locator = result_storage->store_result(std::move(result)); + states->set_result_locator(id, locator); + states->set_state(id, taskflow::TaskState::success); + } else { + success(); + } +} + +inline void taskflow::TaskCtx::failure_with_result(const std::string& error_message, taskflow::ResultPayload result) { + if (result_storage && states) { + auto locator = result_storage->store_result(std::move(result)); + states->set_result_locator(id, locator); + states->set_state(id, taskflow::TaskState::failure); + states->set_error(id, error_message); + } else { + failure(error_message); + } +} diff --git a/include/taskflow/task_manager.hpp b/include/taskflow/task_manager.hpp new file mode 100644 index 0000000..9daf6fa --- /dev/null +++ b/include/taskflow/task_manager.hpp @@ -0,0 +1,305 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "any_task.hpp" +#include "state_storage.hpp" +#include "task_traits.hpp" +#include "threadpool.hpp" + +namespace taskflow { + +class TaskManager { + public: + static TaskManager& getInstance() { + static TaskManager instance; + return instance; + } + + ~TaskManager() { + stop_cleanup_ = true; + cleanup_cv_.notify_all(); // Wake up cleanup thread + if (cleanup_thread_.joinable()) { + cleanup_thread_.join(); + } + } + + TaskManager(const TaskManager&) = delete; + TaskManager& operator=(const TaskManager&) = delete; + + // Submit task for execution + template + TaskID submit_task(Task task, TaskLifecycle lifecycle = TaskLifecycle::disposable) { + TaskID id = generate_task_id(); + + // Initialize state + states_.set_state(id, TaskState::created); + + // Store persistent tasks for later reawakening + if (lifecycle == TaskLifecycle::persistent) { + std::unique_lock lock(persistent_tasks_mutex_); + persistent_tasks_[id] = std::make_unique(make_any_task(id, std::move(task), lifecycle)); + } + + // Submit to thread pool for execution + if (thread_pool_) { + thread_pool_->execute([this, id, task = std::move(task), lifecycle]() mutable { + AnyTask any_task = make_any_task(id, std::move(task), lifecycle); + if (any_task.valid()) { + TaskRuntimeCtx rctx{id, &states_, [this](TaskID tid) { return is_task_cancelled(tid); }}; + any_task.execute_task(rctx, result_storage_.get()); + } + }); + } + + return id; + } + + // Reawaken a persistent task with new parameters + template + bool reawaken_task(TaskID id, Task new_task) { + std::unique_lock lock(persistent_tasks_mutex_); + auto it = persistent_tasks_.find(id); + if (it == persistent_tasks_.end()) { + return false; // Task not found or not persistent + } + + // Reset task state for reawakening + states_.set_state(id, TaskState::created); + + // Update the stored task with new parameters + *it->second = make_any_task(id, std::move(new_task), TaskLifecycle::persistent); + + // Submit for re-execution + if (thread_pool_) { + thread_pool_->execute([this, id]() { + std::unique_lock lock(persistent_tasks_mutex_); + auto it = persistent_tasks_.find(id); + if (it != persistent_tasks_.end() && it->second->valid()) { + TaskRuntimeCtx rctx{id, &states_, [this](TaskID tid) { return is_task_cancelled(tid); }}; + it->second->execute_task(rctx, result_storage_.get()); + } + }); + } + + return true; + } + + // Check if a task is persistent + [[nodiscard]] bool is_persistent_task(TaskID id) const { + std::shared_lock lock(persistent_tasks_mutex_); + auto it = persistent_tasks_.find(id); + return it != persistent_tasks_.end(); + } + + // Query task state + [[nodiscard]] std::optional query_state(TaskID id) const { return states_.get_state(id); } + + // Get task progress - template version for custom types + template + [[nodiscard]] std::optional get_progress(TaskID id) const { + return states_.get_progress(id); + } + + // Backward compatibility + [[nodiscard]] std::optional> get_progress(TaskID id) const { + return states_.get_progress>(id); + } + + // Get task error message + [[nodiscard]] std::optional get_error(TaskID id) const { return states_.get_error(id); } + + // Get task result + [[nodiscard]] std::optional get_result(TaskID id) const { + if (auto locator = states_.get_result_locator(id)) { + return result_storage_->get_result(*locator); + } + return std::nullopt; + } + + // Cancel task - sets cancellation flag, task handles it internally + bool cancel_task(TaskID id) { + // Store cancellation flag in a thread-safe way + std::unique_lock lock(cancellation_mutex_); + cancelled_tasks_[id] = true; + return states_.has_task(id); + } + + // Check if task is cancelled + [[nodiscard]] bool is_task_cancelled(TaskID id) const { + std::shared_lock lock(cancellation_mutex_); + auto it = cancelled_tasks_.find(id); + return it != cancelled_tasks_.end() && it->second; + } + + // Get statistics + [[nodiscard]] StateStorage::Statistics get_statistics() const { return states_.get_statistics(); } + + // Cleanup completed tasks + void cleanup_completed_tasks(std::chrono::hours max_age = std::chrono::hours(24)) { + states_.cleanup_old_tasks(max_age); + } + + // Start task processing (typically called once) + void start_processing(size_t num_threads = std::thread::hardware_concurrency()) { + if (processing_started_) return; + + processing_started_ = true; + thread_pool_ = std::make_unique(num_threads); + } + + // Stop task processing + void stop_processing() { + thread_pool_.reset(); + processing_started_ = false; + } + + private: + TaskManager() : cleanup_thread_([this] { cleanup_loop(); }), result_storage_(std::make_unique()) {} + + TaskID generate_task_id() { + static std::atomic next_id{1}; + return next_id.fetch_add(1, std::memory_order_relaxed); + } + + void cleanup_loop() { + while (!stop_cleanup_.load(std::memory_order_acquire)) { + // Wait for either timeout or stop signal + { + std::unique_lock lock(cleanup_mutex_); + cleanup_cv_.wait_for(lock, std::chrono::minutes(30), + [this] { return stop_cleanup_.load(std::memory_order_acquire); }); + } + + if (!stop_cleanup_.load(std::memory_order_acquire)) { + cleanup_completed_tasks(std::chrono::hours(1)); + } + } + } + + // Task processing + std::unique_ptr thread_pool_; + std::atomic processing_started_{false}; + + // State management + StateStorage states_; + + // Cancellation management + mutable std::shared_mutex cancellation_mutex_; + std::unordered_map cancelled_tasks_; + + // Persistent tasks storage + mutable std::shared_mutex persistent_tasks_mutex_; + std::unordered_map> persistent_tasks_; + + // Result storage + std::unique_ptr result_storage_; + + // Cleanup + std::thread cleanup_thread_; + std::mutex cleanup_mutex_; + std::condition_variable cleanup_cv_; + std::atomic stop_cleanup_{false}; +}; + +// Execution algorithm - State machine for task lifecycle +template +typename std::enable_if, void>::type execute_task(Task& task, TaskRuntimeCtx& rctx, ResultStorage* result_storage) { + TaskCtx ctx{rctx.id, rctx.states, result_storage, rctx.cancellation_checker}; + + try { + // Begin execution + ctx.begin(); + + // Execute the task + task(ctx); + + // If not cancelled and not failed, mark as success + if (!ctx.is_cancelled() && !ctx.is_completed()) { + ctx.success(); + } + } catch (const std::exception& e) { + // Handle exceptions + ctx.failure(e.what()); + } catch (...) { + // Handle unknown exceptions + ctx.failure("unknown_exception"); + } +} + +// Execute task with cancellation support +template +typename std::enable_if, void>::type execute_cancellable_task(Task& task, + TaskRuntimeCtx& rctx, ResultStorage* result_storage) { + TaskCtx ctx{rctx.id, rctx.states, result_storage, rctx.cancellation_checker}; + + try { + ctx.begin(); + + // For cancellable tasks, task itself handles cancellation + task(ctx); + + // Task completed normally or handled cancellation internally + if (!ctx.is_completed()) { + ctx.success(); + } + } catch (const std::exception& e) { + ctx.failure(e.what()); + } catch (...) { + ctx.failure("unknown_exception"); + } +} + +// Execute task with observation +template +typename std::enable_if, void>::type execute_observable_task(Task& task, + TaskRuntimeCtx& rctx, ResultStorage* result_storage) { + TaskCtx ctx{rctx.id, rctx.states, result_storage, rctx.cancellation_checker}; + + try { + ctx.begin(); + + task(ctx); + + if (!ctx.is_cancelled() && !ctx.is_completed()) { + ctx.success(); + } + } catch (const std::exception& e) { + ctx.failure(e.what()); + } catch (...) { + ctx.failure("unknown_exception"); + } +} + +// Generic task execution dispatcher +template +typename std::enable_if, void>::type execute(Task& task, TaskRuntimeCtx& rctx, ResultStorage* result_storage) { + if constexpr (is_cancellable_task_v && is_progress_observable_task_v) { + execute_observable_task(task, rctx, result_storage); + } else if constexpr (is_cancellable_task_v && is_basic_observable_task_v) { + execute_cancellable_task(task, rctx, result_storage); + } else if constexpr (is_progress_observable_task_v) { + execute_observable_task(task, rctx, result_storage); + } else if constexpr (is_basic_observable_task_v) { + execute_task(task, rctx, result_storage); + } else if constexpr (is_cancellable_task_v) { + execute_cancellable_task(task, rctx, result_storage); + } else { + execute_task(task, rctx, result_storage); + } +} + +// Execute task by value (for rvalue tasks) +template +typename std::enable_if, void>::type execute(Task&& task, TaskRuntimeCtx& rctx, ResultStorage* result_storage) { + Task task_copy = std::forward(task); + execute(task_copy, rctx, result_storage); +} + +} // namespace taskflow diff --git a/include/taskflow/task_traits.hpp b/include/taskflow/task_traits.hpp new file mode 100644 index 0000000..baa58b3 --- /dev/null +++ b/include/taskflow/task_traits.hpp @@ -0,0 +1,310 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace taskflow { + +namespace detail { + +// Extract type name from function signature +template +constexpr std::string_view type_name_impl() { +#if defined(__clang__) + constexpr std::string_view prefix = "std::string_view taskflow::detail::type_name_impl() [T = "; + constexpr std::string_view suffix = "]"; +#elif defined(__GNUC__) + constexpr std::string_view prefix = "constexpr std::string_view taskflow::detail::type_name_impl() [with T = "; + constexpr std::string_view suffix = "; std::string_view = std::basic_string_view]"; +#elif defined(_MSC_VER) + constexpr std::string_view prefix = "taskflow::detail::type_name_impl<"; + constexpr std::string_view suffix = ">(void)"; +#else + constexpr std::string_view prefix = ""; + constexpr std::string_view suffix = ""; +#endif + + constexpr std::string_view function = __PRETTY_FUNCTION__; + const size_t start = function.find(prefix) + prefix.size(); + const size_t end = function.find(suffix, start); + + if (start == std::string_view::npos || end == std::string_view::npos || start >= end) { + return "unknown_type"; + } + + return function.substr(start, end - start); +} + +} // namespace detail + +// Get type name as string_view +template +constexpr std::string_view type_name() { + return detail::type_name_impl(); +} + +// For C++23 compatibility +#if __cplusplus >= 202302L +using std::type_name; +#endif + +// Forward declarations +struct TaskCtx; +class StateStorage; + +// Task state enumeration +enum class TaskState : std::uint8_t { + created, + running, + success, + failure +}; + +// Task priority enumeration +enum class TaskPriority : std::uint8_t { + lowest = 0, + low = 25, + normal = 50, + high = 75, + critical = 100, +}; + +// Task Lifecycle enumeration +enum class TaskLifecycle : std::uint8_t { + disposable, + persistent, +}; + +// Task Observability enumeration +enum class TaskObservability : std::uint8_t { + none, // No observation capabilities + basic, // Basic observation (start/end states only) + progress // Full observation (states + progress reporting) +}; + +// Task ID type +using TaskID = std::uint64_t; + +// Result ID type +using ResultID = std::uint64_t; + +// Result kind enumeration +enum class ResultKind : std::uint8_t { + none, // No result + json, // JSON data + file, // File path + text, // Plain text + binary, // Binary data + custom // Custom result type +}; + +// Result locator for referencing stored results +struct ResultLocator { + ResultID id{0}; + ResultKind kind{ResultKind::none}; + std::string metadata; // Additional metadata (optional) + + ResultLocator() = default; + ResultLocator(ResultID result_id, ResultKind result_kind, std::string meta = "") + : id(result_id), kind(result_kind), metadata(std::move(meta)) {} + + [[nodiscard]] bool valid() const { return id != 0; } + + // Comparison operators for use in std::map + bool operator<(const ResultLocator& other) const { + if (id != other.id) return id < other.id; + if (static_cast(kind) != static_cast(other.kind)) { + return static_cast(kind) < static_cast(other.kind); + } + return metadata < other.metadata; + } + + bool operator==(const ResultLocator& other) const { + return id == other.id && kind == other.kind && metadata == other.metadata; + } +}; + +// Result payload variants +struct ResultPayload { + ResultKind kind{ResultKind::none}; + + // Union-like storage for different result types + struct { + nlohmann::json json_data; + std::string text_data; + std::string file_path; + std::vector binary_data; + std::string custom_data; + } data; + + ResultPayload() = default; + + // JSON result + static ResultPayload json(const nlohmann::json& j) { + ResultPayload p; + p.kind = ResultKind::json; + p.data.json_data = j; + return p; + } + + // Text result + static ResultPayload text(std::string text) { + ResultPayload p; + p.kind = ResultKind::text; + p.data.text_data = std::move(text); + return p; + } + + // File result + static ResultPayload file(std::string path) { + ResultPayload p; + p.kind = ResultKind::file; + p.data.file_path = std::move(path); + return p; + } + + // Binary result + static ResultPayload binary(std::vector data) { + ResultPayload p; + p.kind = ResultKind::binary; + p.data.binary_data = std::move(data); + return p; + } + + // Custom result + static ResultPayload custom(std::string data) { + ResultPayload p; + p.kind = ResultKind::custom; + p.data.custom_data = std::move(data); + return p; + } +}; + +// Result storage interface for tasks +class ResultStorage { + public: + virtual ~ResultStorage() = default; + + // Store result and return locator + virtual ResultLocator store_result(ResultPayload payload) = 0; + + // Retrieve result by locator + virtual std::optional get_result(const ResultLocator& locator) const = 0; + + // Remove result + virtual bool remove_result(const ResultLocator& locator) = 0; + + // Generate next result ID + virtual ResultID next_result_id() = 0; +}; + +// Simple in-memory result storage implementation +class SimpleResultStorage : public ResultStorage { + public: + SimpleResultStorage() = default; + + ResultLocator store_result(ResultPayload payload) override { + std::unique_lock lock(mutex_); + ResultID id = next_id_++; + ResultLocator locator{id, payload.kind}; + results_[locator] = std::move(payload); + return locator; + } + + std::optional get_result(const ResultLocator& locator) const override { + std::shared_lock lock(mutex_); + auto it = results_.find(locator); + return it != results_.end() ? std::optional(it->second) : std::nullopt; + } + + bool remove_result(const ResultLocator& locator) override { + std::unique_lock lock(mutex_); + return results_.erase(locator) > 0; + } + + ResultID next_result_id() override { + std::unique_lock lock(mutex_); + return next_id_++; + } + + private: + mutable std::shared_mutex mutex_; + std::map results_; + ResultID next_id_{1}; +}; + +// Default task traits +template +struct task_traits { + // metadata + static constexpr std::string_view name = taskflow::type_name(); + static constexpr std::string_view description = ""; + static constexpr TaskPriority priority = TaskPriority::normal; + + // capabilities + static constexpr bool cancellable = false; + static constexpr TaskObservability observability = TaskObservability::basic; +}; + +// Task trait detection using SFINAE +template +struct is_task : std::false_type {}; + +template +struct is_task()(std::declval()))>> : std::true_type {}; + +template +constexpr bool is_task_v = is_task::value; + +// Cancellable task trait +template +struct is_cancellable_task : std::conjunction, std::bool_constant::cancellable>> {}; + +template +constexpr bool is_cancellable_task_v = is_cancellable_task::value; + +// Basic observable task trait (supports basic observation) +template +struct is_basic_observable_task + : std::conjunction, std::bool_constant::observability == TaskObservability::basic || + task_traits::observability == TaskObservability::progress>> {}; + +template +constexpr bool is_basic_observable_task_v = is_basic_observable_task::value; + +// Progress observable task trait (supports progress reporting) +template +struct is_progress_observable_task + : std::conjunction, std::bool_constant::observability == TaskObservability::progress>> {}; + +template +constexpr bool is_progress_observable_task_v = is_progress_observable_task::value; + +// Legacy observable task trait (for backward compatibility) +template +struct is_observable_task + : std::conjunction, std::bool_constant::observability != TaskObservability::none>> {}; + +template +constexpr bool is_observable_task_v = is_observable_task::value; + +template +struct is_valid_progress_type { + static constexpr bool value = []() { + if constexpr (std::is_arithmetic_v) return true; + if constexpr (std::is_same_v || std::is_same_v) return true; + if constexpr (std::is_trivially_copyable_v && sizeof(T) <= 512) return true; + + return false; + }(); +}; + +} // namespace taskflow diff --git a/include/velotask/threadpool.hpp b/include/taskflow/threadpool.hpp similarity index 100% rename from include/velotask/threadpool.hpp rename to include/taskflow/threadpool.hpp diff --git a/include/velotask/taskbase.hpp b/include/velotask/taskbase.hpp deleted file mode 100644 index a03d972..0000000 --- a/include/velotask/taskbase.hpp +++ /dev/null @@ -1,275 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace velo { - -using json = nlohmann::json; - -enum class TaskLifecycle : std::uint8_t { - disposable, - persistent, -}; - -enum class TaskState : std::uint8_t { - pending, - running, - success, - failure, -}; - -enum class TaskPriority : std::uint8_t { - lowest = 0, - low = 25, - normal = 50, - high = 75, - critical = 100, - // Allow custom values up to 255 -}; - -static inline std::string generateTaskId() { - // XXXXXXXX-XXXX-4XXX-YXXX-XXXXXXXXXXXX - static thread_local std::random_device rnd; - static thread_local std::mt19937_64 gen(rnd()); - std::uniform_int_distribution dis; - - constexpr uint64_t uuidV4VersionMask = 0xffffffffffff0fffULL; - constexpr uint64_t uuidV4VersionBits = 0x0000000000004000ULL; - constexpr uint64_t uuidV4VariantMask = 0x3fffffffffffffffULL; - constexpr uint64_t uuidV4VariantBits = 0x8000000000000000ULL; - - constexpr uint64_t mask16Bit = 0xffffU; - constexpr uint64_t mask48Bit = 0xffffffffffffULL; - - uint64_t part1 = dis(gen); - uint64_t part2 = dis(gen); - - part1 = (part1 & uuidV4VersionMask) | uuidV4VersionBits; - part2 = (part2 & uuidV4VariantMask) | uuidV4VariantBits; - - std::stringstream oss; - oss << std::hex << std::setfill('0') << std::uppercase; - - oss << std::setw(8) << (part1 >> 32); - oss << "-"; - oss << std::setw(4) << ((part1 >> 16) & mask16Bit); - oss << "-"; - oss << std::setw(4) << (part1 & 0xffff); - oss << "-"; - oss << std::setw(4) << (part2 >> 48); - oss << "-"; - oss << std::setw(12) << (part2 & mask48Bit); - - return oss.str(); -} - -struct TaskResult { - TaskState state{TaskState::pending}; - json data; - std::string errorMessage; - - [[nodiscard]] bool isSuccess() const { return state == TaskState::success; } - [[nodiscard]] bool hasError() const { return state == TaskState::failure; } -}; - -class TaskObserver { - public: - virtual ~TaskObserver() = default; - virtual void onTaskStarted(const std::string& taskId, const json& input) = 0; - virtual void onTaskCompleted(const std::string& taskId, const TaskResult& result) = 0; - virtual void onTaskProgress(const std::string& taskId, const json& progressInfo) = 0; - virtual void onTaskError(const std::string& taskId, const std::string& errorMessage) = 0; -}; - -class TaskBase { - public: - explicit TaskBase(std::string_view taskId, TaskLifecycle lifecycle = TaskLifecycle::disposable, - TaskPriority priority = TaskPriority::normal, int maxRetries = 0) - : taskId_(taskId), lifecycle_(lifecycle), priority_(priority), maxRetries_(maxRetries) {} - - virtual ~TaskBase() = default; - - virtual std::string getName() const = 0; - virtual std::string getDescription() const = 0; - virtual void execute() = 0; - virtual void cleanup() = 0; - - std::string getTaskId() const { return taskId_; } - TaskState getState() const { return state_.load(std::memory_order_acquire); } - TaskLifecycle getLifecycle() const { return lifecycle_; } - json getInput() const { return input_; } - std::chrono::system_clock::time_point getStartTime() const { return startTime_; } - std::chrono::system_clock::time_point getEndTime() const { return endTime_; } - TaskPriority getPriority() const { return priority_; } - const TaskResult& getResult() const { return result_; } - - void setObserver(TaskObserver* observer) { observer_ = observer; } - void setInput(const json& input) { input_ = input; } - void setTaskId(std::string_view tid) { taskId_ = tid; } - void setPriority(TaskPriority priority) { priority_ = priority; } - void setDependencies(const std::vector& deps) { dependencies_ = deps; } - const std::vector& getDependencies() const { return dependencies_; } - - void cancel() { - cancelRequested_.store(true, std::memory_order_release); - cv_.notify_all(); - } - - bool restart(const json& newInput = {}) { - std::unique_lock lock(mutex_); - if (state_ == TaskState::success || state_ == TaskState::failure) { - resetState(); - if (!newInput.empty()) input_ = newInput; - return true; - } - return false; - } - - bool waitForCompletion(int timeoutMs = -1) { - std::unique_lock lock(mutex_); - auto isTerminated = [this] { - auto ste = state_.load(std::memory_order_relaxed); - return ste == TaskState::success || ste == TaskState::failure; - }; - - if (isTerminated()) return true; - - if (timeoutMs < 0) { - cv_.wait(lock, isTerminated); - return true; - } - return cv_.wait_for(lock, std::chrono::milliseconds(timeoutMs), isTerminated); - } - - bool isRunning() const { return state_.load(std::memory_order_acquire) == TaskState::running; } - bool isCancelRequested() const { return cancelRequested_.load(std::memory_order_acquire); } - - void beginExecute() { - { - std::lock_guard lock(mutex_); - if (state_.load(std::memory_order_relaxed) != TaskState::pending) { - throw std::runtime_error("nanotask: task not in pending state"); - } - state_.store(TaskState::running, std::memory_order_release); - startTime_ = std::chrono::system_clock::now(); - } - if (observer_ != nullptr) { - observer_->onTaskStarted(taskId_, input_); - } - } - - void updateProgress(const json& progressInfo) { - if (observer_ != nullptr) { - observer_->onTaskProgress(taskId_, progressInfo); - } - } - - void finishExecute(const json& resultData) { - { - std::lock_guard lock(mutex_); - if (state_.load(std::memory_order_relaxed) != TaskState::running) return; - state_.store(TaskState::success, std::memory_order_release); - endTime_ = std::chrono::system_clock::now(); - result_.state = TaskState::success; - result_.data = resultData; - cv_.notify_all(); - } - if (observer_ != nullptr) { - observer_->onTaskCompleted(taskId_, result_); - } - } - - void failExecute(const std::string& message) { - { - std::lock_guard lock(mutex_); - auto ste = state_.load(std::memory_order_relaxed); - if (ste != TaskState::running && ste != TaskState::pending) return; - state_.store(TaskState::failure, std::memory_order_release); - endTime_ = std::chrono::system_clock::now(); - result_.state = TaskState::failure; - result_.errorMessage = message; - cv_.notify_all(); - } - if (observer_ != nullptr) { - observer_->onTaskCompleted(taskId_, result_); - observer_->onTaskError(taskId_, message); - } - } - - private: - friend class TaskPool; - - void resetState() { - state_.store(TaskState::pending, std::memory_order_release); - result_ = TaskResult{}; - cancelRequested_.store(false, std::memory_order_release); - } - - std::string taskId_; - std::atomic state_{TaskState::pending}; - TaskLifecycle lifecycle_{TaskLifecycle::disposable}; - TaskPriority priority_{TaskPriority::normal}; - - TaskResult result_; - json input_; - TaskObserver* observer_ = nullptr; - - std::chrono::system_clock::time_point startTime_; - std::chrono::system_clock::time_point endTime_; - - int maxRetries_{0}; - int currentRetries_{0}; - std::vector dependencies_; - - mutable std::mutex mutex_; - std::condition_variable cv_; - std::atomic cancelRequested_{false}; -}; - -class FunctionalTask : public TaskBase { - public: - using ProgressFn = std::function; - using CancelFn = std::function; - using TaskFn = std::function; - - FunctionalTask(std::string_view taskId, TaskFn func) : TaskBase(taskId), func_(std::move(func)) {} - - std::string getName() const override { return "FunctionalTask"; } - std::string getDescription() const override { return "Task defined by a lambda"; } - - void execute() override { - try { - json result = func_( - getInput(), [this](const json& info) { this->updateProgress(info); }, - [this] { return this->isCancelRequested(); }); - - if (!isCancelRequested()) { - finishExecute(result); - } else { - failExecute("task_cancelled"); - } - } catch (const std::exception& e) { - failExecute(e.what()); - } catch (...) { - failExecute("unknown_exception"); - } - } - - void cleanup() override {} - - private: - TaskFn func_; -}; - -} // namespace velo diff --git a/include/velotask/taskmanager.hpp b/include/velotask/taskmanager.hpp deleted file mode 100644 index b887d5e..0000000 --- a/include/velotask/taskmanager.hpp +++ /dev/null @@ -1,388 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include -#include - -#include "taskbase.hpp" -#include "taskpool.hpp" - -namespace velo { - -struct TaskInfo { - std::string taskId; - std::string name; - std::string description; - TaskState state = TaskState::pending; - TaskLifecycle lifecycle{TaskLifecycle::disposable}; - - json input; - json resultData; - json currentProgressInfo; - std::string errorMessage; - - std::chrono::system_clock::time_point createdTime; - std::chrono::system_clock::time_point updatedTime; - std::chrono::system_clock::time_point startTime; - std::chrono::system_clock::time_point endTime; -}; - -class TaskManager : public TaskObserver { - public: - static TaskManager& getInstance() { - static TaskManager instance; - return instance; - } - - ~TaskManager() override { - stopCleanup_ = true; - cleanupCv_.notify_all(); - if (infoCleanupThread_.joinable()) { - infoCleanupThread_.join(); - } - } - - TaskManager(const TaskManager&) = delete; - TaskManager& operator=(const TaskManager&) = delete; - - template - void registerTaskType(const std::string& taskTypeName) { - std::lock_guard lock(taskFactoriesMutex_); - taskFactories_[taskTypeName] = createAdapterFactory(); - } - - std::string submitTask(const std::string& taskTypeName, const json& input = {}, - TaskLifecycle lifecycle = TaskLifecycle::disposable, - TaskPriority priority = TaskPriority::normal) { - std::shared_ptr task; - { - std::lock_guard lock(taskFactoriesMutex_); - auto it = taskFactories_.find(taskTypeName); - if (it == taskFactories_.end()) { - throw std::runtime_error("Unknown task type: " + taskTypeName); - } - task = it->second(generateTaskId(), lifecycle, priority); - } - - task->setObserver(this); - - std::string taskId = taskPool_.submitTask(task, input); - { - std::lock_guard lock(taskInfosMutex_); - TaskInfo info; - info.taskId = taskId; - info.lifecycle = lifecycle; - info.name = task->getName(); - info.description = task->getDescription(); - info.input = input; - info.createdTime = std::chrono::system_clock::now(); - info.updatedTime = info.createdTime; - - taskInfos_[taskId] = info; - } - return taskId; - } - - std::string submitTask(FunctionalTask::TaskFn func, const json& input = {}, - TaskLifecycle lifecycle = TaskLifecycle::disposable, - TaskPriority priority = TaskPriority::normal) { - std::string taskId = generateTaskId(); - auto task = std::make_shared(taskId, std::move(func)); - task->setObserver(this); - (void)taskPool_.submitTask(task, input); - { - std::lock_guard lock(taskInfosMutex_); - TaskInfo info; - info.taskId = taskId; - info.lifecycle = lifecycle; - info.name = task->getName(); - info.description = task->getDescription(); - info.input = input; - info.createdTime = std::chrono::system_clock::now(); - info.updatedTime = info.createdTime; - - taskInfos_[taskId] = info; - } - return taskId; - } - - std::string submitTask(const std::shared_ptr& task, const json& input = {}) { - task->setObserver(this); - std::string taskId = taskPool_.submitTask(task, input); - { - std::lock_guard lock(taskInfosMutex_); - TaskInfo info; - info.taskId = taskId; - info.lifecycle = task->getLifecycle(); - info.name = task->getName(); - info.description = task->getDescription(); - info.input = input; - info.createdTime = std::chrono::system_clock::now(); - info.updatedTime = info.createdTime; - - taskInfos_[taskId] = info; - } - return taskId; - } - - // Batch submit tasks - std::vector submitTasks(const std::vector>& taskSpecs) { - std::vector taskIds; - taskIds.reserve(taskSpecs.size()); - - { - std::lock_guard lock(taskFactoriesMutex_); - std::lock_guard lockInfos(taskInfosMutex_); - - for (const auto& [taskTypeName, input, lifecycle, priority] : taskSpecs) { - auto it = taskFactories_.find(taskTypeName); - if (it == taskFactories_.end()) { - throw std::runtime_error("Unknown task type: " + taskTypeName); - } - std::string taskId = generateTaskId(); - auto task = it->second(taskId, lifecycle, priority); - task->setObserver(this); - taskPool_.submitTask(task, input); - - TaskInfo info; - info.taskId = taskId; - info.lifecycle = lifecycle; - info.name = task->getName(); - info.description = task->getDescription(); - info.input = input; - info.createdTime = std::chrono::system_clock::now(); - info.updatedTime = info.createdTime; - - taskInfos_[taskId] = info; - taskIds.push_back(taskId); - } - } - return taskIds; - } - - // Batch cancel tasks - std::vector cancelTasks(const std::vector& taskIds) { - std::vector results; - results.reserve(taskIds.size()); - for (const auto& id : taskIds) { - results.push_back(cancelTask(id)); - } - return results; - } - - bool restartTask(const std::string& taskId, const json& newInput = {}) { - if (!taskPool_.restartTask(taskId, newInput)) { - return false; - } - - { - std::lock_guard lock(taskInfosMutex_); - auto it = taskInfos_.find(taskId); - if (it != taskInfos_.end()) { - it->second.state = TaskState::pending; - it->second.resultData.clear(); - it->second.errorMessage.clear(); - if (!newInput.empty()) { - it->second.input = newInput; - } - it->second.updatedTime = std::chrono::system_clock::now(); - } - } - - return true; - } - - bool hasTask(const std::string& taskId) const { - std::lock_guard lock(taskInfosMutex_); - return taskInfos_.find(taskId) != taskInfos_.end(); - } - - bool cancelTask(const std::string& taskId) { return taskPool_.cancelTask(taskId); } - - void cleanup() { - taskPool_.cleanup(); - { - std::lock_guard lock(taskInfosMutex_); - taskInfos_.clear(); - } - } - - std::optional getTaskInfo(const std::string& taskId) const { - std::lock_guard lock(taskInfosMutex_); - auto it = taskInfos_.find(taskId); - if (it != taskInfos_.end()) { - return it->second; - } - return std::nullopt; - } - - TaskState getTaskState(const std::string& taskId) const { - auto info = getTaskInfo(taskId); - return info ? info->state : TaskState::pending; - } - - json getTaskResult(const std::string& taskId) const { - auto info = getTaskInfo(taskId); - if (info && !info->resultData.empty()) { - return info->resultData; - } - return json{}; - } - - std::string getTaskErrorMessage(const std::string& taskId) const { - auto info = getTaskInfo(taskId); - if (info && !info->errorMessage.empty()) { - return info->errorMessage; - } - return {}; - } - - json getTaskProgress(const std::string& taskId) const { - auto info = getTaskInfo(taskId); - if (info && !info->currentProgressInfo.empty()) { - return info->currentProgressInfo; - } - return json{}; - } - - void onTaskStarted(const std::string& taskId, const json& input) override { - std::lock_guard lock(taskInfosMutex_); - auto it = taskInfos_.find(taskId); - if (it != taskInfos_.end()) { - it->second.state = TaskState::running; - it->second.startTime = std::chrono::system_clock::now(); - it->second.updatedTime = it->second.startTime; - } - } - - void onTaskCompleted(const std::string& taskId, const TaskResult& result) override { - std::lock_guard lock(taskInfosMutex_); - auto it = taskInfos_.find(taskId); - if (it != taskInfos_.end()) { - it->second.state = result.state; - it->second.resultData = result.data; - it->second.endTime = std::chrono::system_clock::now(); - it->second.updatedTime = it->second.endTime; - } - } - - void onTaskProgress(const std::string& taskId, const json& progress) override { - std::lock_guard lock(taskInfosMutex_); - auto it = taskInfos_.find(taskId); - if (it != taskInfos_.end()) { - it->second.updatedTime = std::chrono::system_clock::now(); - it->second.currentProgressInfo = progress; - } - } - - void onTaskError(const std::string& taskId, const std::string& errorMsg) override { - std::lock_guard lock(taskInfosMutex_); - auto it = taskInfos_.find(taskId); - if (it != taskInfos_.end()) { - it->second.errorMessage = errorMsg; - } - } - - private: - explicit TaskManager(size_t threadCount = std::thread::hardware_concurrency()) : taskPool_(threadCount) { - infoCleanupThread_ = std::thread([this] { cleanupInfoLoop(); }); - } - - TaskPool taskPool_; - - std::unordered_map taskInfos_; - mutable std::mutex taskInfosMutex_; - - using TaskFactory = std::function(const std::string&, TaskLifecycle, TaskPriority)>; - - std::unordered_map taskFactories_; - mutable std::mutex taskFactoriesMutex_; - - std::thread infoCleanupThread_; - std::atomic stopCleanup_{false}; - const std::chrono::hours maxInfoAge{24}; - const size_t maxInfoCount{1000}; - std::condition_variable cleanupCv_; - - template - TaskFactory createAdapterFactory() { - if constexpr (std::is_constructible_v) { - return [](const std::string& taskId, TaskLifecycle lifecycle, TaskPriority priority) { - return std::make_shared(taskId, lifecycle, priority); - }; - } else if constexpr (std::is_constructible_v) { - return [](const std::string& taskId, TaskLifecycle lifecycle, TaskPriority priority) { - auto task = std::make_shared(taskId, lifecycle); - task->setPriority(priority); - return task; - }; - } else if constexpr (std::is_constructible_v) { - return [](const std::string& taskId, TaskLifecycle lifecycle, TaskPriority priority) { - auto task = std::make_shared(taskId); - task->setPriority(priority); - return task; - }; - } - return {}; - } - - void cleanupInfoLoop() { - while (!stopCleanup_) { - auto now = std::chrono::system_clock::now(); - - std::unique_lock lock(taskInfosMutex_); - cleanupCv_.wait_for(lock, std::chrono::minutes(5), [this] { return stopCleanup_.load(); }); - - if (stopCleanup_) break; - - auto it = taskInfos_.begin(); - while (it != taskInfos_.end()) { - const auto& info = it->second; - bool isTerminated = (info.state == TaskState::success || info.state == TaskState::failure); - bool isExpired = (now - info.updatedTime) > maxInfoAge; - - if (isTerminated && isExpired) { - it = taskInfos_.erase(it); - } else { - ++it; - } - } - - if (taskInfos_.size() > maxInfoCount) { - forceTrimInfos(); - } - } - } - - void forceTrimInfos() { - struct TrimCandidate { - std::string id; - std::chrono::system_clock::time_point updatedTime; - }; - std::vector candidates; - - for (const auto& [id, info] : taskInfos_) { - if ((info.state == TaskState::success || info.state == TaskState::failure) && - info.lifecycle == TaskLifecycle::disposable) { - candidates.push_back({id, info.updatedTime}); - } - } - - if (candidates.empty()) return; - - std::sort(candidates.begin(), candidates.end(), - [](const auto& a, const auto& b) { return a.updatedTime < b.updatedTime; }); - - size_t excess = taskInfos_.size() - maxInfoCount; - size_t toRemove = std::min(excess, candidates.size()); - - for (size_t i = 0; i < toRemove; ++i) { - taskInfos_.erase(candidates[i].id); - } - } -}; - -} // namespace velo diff --git a/include/velotask/taskpool.hpp b/include/velotask/taskpool.hpp deleted file mode 100644 index 27f43e0..0000000 --- a/include/velotask/taskpool.hpp +++ /dev/null @@ -1,236 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include -#include -#include -#include - -#include "taskbase.hpp" -#include "threadpool.hpp" - -namespace velo { - -class TaskPool { - ThreadPool threadPool_; - std::multimap, std::greater<>> pendingQueue_; - std::unordered_map> runningTasks_; - std::unordered_map> persistentTasks_; - std::list> completedTasks_; - std::unordered_set completedTaskIds_; - - mutable std::mutex tasksMutex_; - std::condition_variable schedulerCv_; - - std::thread schedulerThread_; - std::thread cleanDisposedThread_; - - std::atomic stopScheduler_{false}; - - public: - explicit TaskPool(size_t threadCount = std::thread::hardware_concurrency()) : threadPool_(threadCount) { - schedulerThread_ = std::thread([this] { schedulerLoop(); }); - cleanDisposedThread_ = std::thread([this] { cleanupLoop(); }); - } - - ~TaskPool() { - stopScheduler_ = true; - schedulerCv_.notify_all(); - - if (schedulerThread_.joinable()) { - schedulerThread_.join(); - } - if (cleanDisposedThread_.joinable()) { - cleanDisposedThread_.join(); - } - } - - std::string submitTask(const std::shared_ptr& task, const json& input = {}) { - std::string taskId = task->getTaskId(); - if (taskId.empty()) { - taskId = generateTaskId(); - task->setTaskId(taskId); - } - task->setInput(input); - { - std::lock_guard lock(tasksMutex_); - if (task->getLifecycle() == TaskLifecycle::persistent) { - persistentTasks_[taskId] = task; - } - pendingQueue_.insert({task->getPriority(), task}); - } - - schedulerCv_.notify_one(); - - return taskId; - } - - bool restartTask(const std::string& taskId, const json& newInput = {}) { - std::shared_ptr task; - { - std::lock_guard lock(tasksMutex_); - auto it = persistentTasks_.find(taskId); - if (it != persistentTasks_.end()) { - task = it->second; - } - } - - if (task && task->restart(newInput)) { - { - std::lock_guard lock(tasksMutex_); - pendingQueue_.insert({task->getPriority(), task}); - } - schedulerCv_.notify_one(); - return true; - } - - return false; - } - - std::shared_ptr getTask(const std::string& taskId) const { - std::lock_guard lock(tasksMutex_); - - if (auto it = runningTasks_.find(taskId); it != runningTasks_.end()) return it->second; - if (auto it = persistentTasks_.find(taskId); it != persistentTasks_.end()) return it->second; - - for (const auto& [priority, task] : pendingQueue_) { - if (task->getTaskId() == taskId) return task; - } - - for (const auto& completedTask : completedTasks_) { - if (completedTask->getTaskId() == taskId) return completedTask; - } - return nullptr; - } - - bool cancelTask(const std::string& taskId) const { - auto task = getTask(taskId); - if (task && (task->getState() == TaskState::running || task->getState() == TaskState::pending)) { - task->cancel(); - task->waitForCompletion(); - task->cleanup(); - return true; - } - return false; - } - - void cleanup() { - threadPool_.cleanup_queue(); - - std::lock_guard lock(tasksMutex_); - for (auto& [id, task] : runningTasks_) { - task->cancel(); - } - - pendingQueue_.clear(); - runningTasks_.clear(); - persistentTasks_.clear(); - completedTasks_.clear(); - completedTaskIds_.clear(); - } - - private: - void schedulerLoop() { - while (!stopScheduler_) { - std::vector> tasksToExecute; - { - std::unique_lock lock(tasksMutex_); - schedulerCv_.wait_for(lock, std::chrono::milliseconds(100), - [this] { return stopScheduler_ || !pendingQueue_.empty(); }); - if (stopScheduler_) return; - - auto it = pendingQueue_.begin(); - while (it != pendingQueue_.end()) { - auto task = it->second; - if (task->getState() == TaskState::pending) { - // Check dependencies - bool depsSatisfied = task->dependencies_.empty(); - if (!depsSatisfied) { - depsSatisfied = true; - for (const auto& dep : task->dependencies_) { - if (completedTaskIds_.find(dep) == completedTaskIds_.end()) { - depsSatisfied = false; - break; - } - } - } - if (depsSatisfied) { - runningTasks_[task->getTaskId()] = task; - tasksToExecute.push_back(task); - it = pendingQueue_.erase(it); - } else { - ++it; - } - } else { - it = pendingQueue_.erase(it); - } - } - } - - // Execute tasks outside the lock - for (auto& task : tasksToExecute) { - threadPool_.execute([this, task]() { executeTask(task); }); - } - } - } - - void executeTask(const std::shared_ptr& task) { - task->beginExecute(); - try { - task->execute(); - } catch (const std::exception& e) { - task->failExecute(e.what()); - } catch (...) { - task->failExecute("Unknown error occurred during execution."); - } - - // Handle retries - if (task->getState() == TaskState::failure && task->currentRetries_ < task->maxRetries_) { - task->currentRetries_++; - // Reset state for retry - task->state_.store(TaskState::pending, std::memory_order_release); - task->result_ = TaskResult{}; - task->cancelRequested_.store(false, std::memory_order_release); - // Resubmit to queue - { - std::lock_guard lock(tasksMutex_); - pendingQueue_.insert({task->getPriority(), task}); - } - schedulerCv_.notify_one(); - return; - } - - taskCompleted(task); - } - - void taskCompleted(const std::shared_ptr& task) { - std::lock_guard lock(tasksMutex_); - runningTasks_.erase(task->getTaskId()); - completedTasks_.push_back(task); - completedTaskIds_.insert(task->getTaskId()); - } - - void cleanupLoop() { - while (!stopScheduler_) { - std::unique_lock lock(tasksMutex_); - schedulerCv_.wait_for(lock, std::chrono::seconds(60), [this] { return stopScheduler_.load(); }); - - if (stopScheduler_) return; - - auto it = completedTasks_.begin(); - while (it != completedTasks_.end()) { - if ((*it)->getLifecycle() == TaskLifecycle::disposable) { - completedTaskIds_.erase((*it)->getTaskId()); - it = completedTasks_.erase(it); - } else { - ++it; - } - } - } - } -}; - -} // namespace velo