diff --git a/common/src/apis.rs b/common/src/apis.rs index 6409d2d6..18fb91dd 100644 --- a/common/src/apis.rs +++ b/common/src/apis.rs @@ -137,6 +137,29 @@ impl Default for ApplicationAttributes { } } +#[derive(Clone, Debug)] +pub struct SessionAttributes { + pub id: SessionID, + pub application: String, + pub slots: u32, + pub common_data: Option, + pub min_instances: u32, + pub max_instances: Option, +} + +impl Default for SessionAttributes { + fn default() -> Self { + Self { + id: String::new(), + application: String::new(), + slots: 1, + common_data: None, + min_instances: 0, + max_instances: None, + } + } +} + #[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Hash, strum_macros::Display)] pub enum SessionState { #[default] @@ -164,6 +187,8 @@ pub struct Session { pub events: Vec, pub status: SessionStatus, + pub min_instances: u32, // Minimum number of instances + pub max_instances: Option, // Maximum number of instances (None means unlimited) } #[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Hash, strum_macros::Display)] @@ -559,6 +584,8 @@ impl Clone for Session { completion_time: self.completion_time, events: self.events.clone(), status: self.status.clone(), + min_instances: self.min_instances, + max_instances: self.max_instances, }; for (id, t) in &self.tasks { @@ -772,6 +799,8 @@ impl From<&Session> for rpc::Session { application: ssn.application.clone(), slots: ssn.slots, common_data: ssn.common_data.clone().map(CommonData::into), + min_instances: ssn.min_instances, + max_instances: ssn.max_instances, }), status: Some(status), } diff --git a/docs/designs/RFE323-runner-v2/FS.md b/docs/designs/RFE323-runner-v2/FS.md new file mode 100644 index 00000000..a4b64512 --- /dev/null +++ b/docs/designs/RFE323-runner-v2/FS.md @@ -0,0 +1,1471 @@ +# RFE323: Enhanced Runner Service Configuration + +## 1. Motivation + +**Background:** + +Currently, `Runner.service()` accepts a `kind` parameter of type `RunnerServiceKind` to configure whether a service is stateful or stateless. When the `kind` parameter is not explicitly provided, a default value is determined automatically based on the execution object type (functions/builtins are stateless, everything else is stateful). However, this simple model has significant limitations: + +1. **Instance Count Control**: There's no way to control how many instances of a service should be created. Some use cases require a single long-lived service instance (e.g., a database connection pool), while others benefit from multiple instances that can be created dynamically based on workload (e.g., stateless request handlers). + +2. **Conflated Concerns**: The binary `RunnerServiceKind` (stateful/stateless) conflates two separate concerns: state persistence and instance scaling. A service might need state persistence but also benefit from multiple instances, or might be stateless but require a single instance for resource management (e.g., connection pools). This design replaces `RunnerServiceKind` with explicit `stateful` and `autoscale` parameters to decouple these concerns. + +3. **Inefficient Execution Model**: Currently, the execution object is loaded on every task invocation (`on_task_invoke`), which is inefficient for stateful services that should maintain state across multiple tasks. + +4. **Class Instantiation Timing**: When a class is passed to `Runner.service()`, it's instantiated immediately in the client code. This means the class state is serialized and sent to executors, which may not be the desired behavior. For better distribution, the class itself should be serialized, and instances should be created on each executor. + +**Target:** + +This design aims to enhance `Runner.service()` configuration to achieve: + +1. **Explicit State Control**: Allow users to explicitly specify whether a service should persist state, independent of the execution object type. + +2. **Autoscaling Support**: Enable services to scale automatically based on pending tasks, or maintain a single instance for services that require it. + +3. **Efficient Execution**: Load execution objects once per session rather than per task, improving performance. + +4. **Better Distribution**: Serialize classes rather than instances, allowing each executor to create its own instance. + +5. **Sensible Defaults**: Maintain intelligent defaults based on execution object type to minimize required configuration. + +## 2. Function Specification + +### Configuration + +**Runner.service() Parameters:** + +The `Runner.service()` method will be enhanced with the following parameters: + +```python +def service( + self, + execution_object: Any, + stateful: Optional[bool] = None, + autoscale: Optional[bool] = None +) -> RunnerService: + """Create a RunnerService for the given execution object. + + Args: + execution_object: A function, class, or class instance to expose as a service + stateful: If True, persist the execution object state back to flame-cache + after each task. If False, do not persist state. If None, use default + based on execution_object type. + autoscale: If True, create instances dynamically based on pending tasks (min=0, max=None). + If False, create exactly one instance (min=1, max=1). + If None, use default based on execution_object type. + + Returns: + A RunnerService instance + """ +``` + +**Default Values:** + +When `stateful` or `autoscale` are not explicitly set (None), the following defaults apply: + +| Execution Object Type | stateful (default) | autoscale (default) | +| ----------------------- | ------------------ | ------------------- | +| Function | False | True | +| Class | False | False | +| Class Instance (object) | False | False | + +**Rationale for Defaults:** +- **Functions**: Typically stateless and benefit from autoscaling (e.g., request handlers) +- **Classes**: Users usually want a single instance per executor; state management is explicit +- **Objects**: Already instantiated, typically represent a single stateful entity + +**Configuration Validation:** + +The following validations will be enforced: +- If `execution_object` is a class and `stateful=True`, an error will be raised (classes themselves cannot maintain state; only instances can) +- If `stateful=True`, the execution object must be pickleable (will be validated at runtime) + +### API + +**RunnerContext Structure:** + +The `RunnerContext` dataclass will be updated to replace `kind` with `stateful` and `autoscale` fields: + +```python +@dataclass +class RunnerContext: + """Context for runner session containing the shared execution object. + + Attributes: + execution_object: The execution object for the session. Can be: + - A function (callable) + - A class (will be instantiated in on_session_enter) + - A class instance (already instantiated object) + stateful: Whether to persist the execution object state back to cache + autoscale: Whether to autoscale instances based on pending tasks + min_instances: Minimum number of instances (derived from autoscale) + max_instances: Maximum number of instances (derived from autoscale) + """ + execution_object: Any + stateful: bool = False + autoscale: bool = True + min_instances: int = field(init=False) + max_instances: Optional[int] = field(init=False) + + def __post_init__(self) -> None: + # Set min/max instances based on autoscale + if self.autoscale: + self.min_instances = 0 + self.max_instances = None # No limit + else: + self.min_instances = 1 + self.max_instances = 1 +``` + +**Session Configuration:** + +The session manager will use `min_instances` and `max_instances` from `RunnerContext` to control executor allocation: +- `min_instances`: Minimum number of executors to keep alive for this session +- `max_instances`: Maximum number of executors allowed for this session (None = unlimited) + +### CLI + +**flmctl session view command:** + +The `flmctl view -s ` command should display the `min_instances` and `max_instances` configuration for each session. This helps administrators understand the scaling behavior of running sessions. + +Example output: +```bash +$ flmctl view -s sess_abc123 +Session ID: sess_abc123 +Application: my-runner-app +Status: Running +Min Instances: 0 +Max Instances: unlimited +... +``` + +For sessions with `autoscale=False`: +```bash +$ flmctl view -s sess_xyz789 +Session ID: sess_xyz789 +Application: stateful-service +Status: Running +Min Instances: 1 +Max Instances: 1 +... +``` + +### Other Interfaces + +**Session Manager Interface (RPC API):** + +The `SessionSpec` message in the RPC API (`rpc/protos/types.proto`) needs to include `min_instances` and `max_instances` fields: + +```protobuf +message SessionSpec { + string application = 2; + uint32 slots = 3; + optional bytes common_data = 4; + uint32 min_instances = 5; + optional uint32 max_instances = 6; // NULL means unlimited +} +``` + +**Session Creation Flow:** +1. **Python SDK (runner.py)**: Creates `RunnerContext` with `stateful`, `autoscale`, `min_instances`, `max_instances` +2. **Python SDK (runner.py)**: Calls `create_session()` with `CreateSessionRequest` containing `SessionSpec` +3. **Python SDK (client.py)**: Populates `SessionSpec.min_instances` and `SessionSpec.max_instances` from `RunnerContext.min_instances` and `RunnerContext.max_instances` +4. **Session Manager**: Receives `CreateSessionRequest`, reads `min_instances` and `max_instances` directly from `SessionSpec` +5. **Session Manager**: Applies fallback - if `max_instances` is None, uses `Application.max_instances` +6. **Session Manager**: Validates that `min_instances <= max_instances` (if max_instances is not None) +7. **Session Manager**: Inserts session into database table with `min_instances` and effective `max_instances` columns +8. **Session Manager**: Creates internal `Session` struct and returns (executor allocation is asynchronous) +9. **Scheduler Loop** (runs every ~1 second): Checks if sessions are underused via FairShare plugin +10. **Scheduler**: If underused (including when `allocated < min_instances`), allocates executors via AllocateAction +11. **Scheduler**: Continues until session reaches `deserved` allocation (which is >= min_instances * slots) + +**Session Loading Flow (Recovery after restart):** +1. **Session Manager**: Loads sessions from database (e.g., after session manager restart) +2. **Session Manager**: Reads all fields including `min_instances` and `max_instances` from session table +3. **Session Manager**: Constructs `SessionSpec` protobuf message with values from database +4. **Session Manager**: Sets `SessionSpec.min_instances` from table's `min_instances` column +5. **Session Manager**: Sets `SessionSpec.max_instances` from table's `max_instances` column (NULL maps to None) +6. **Session Manager**: Constructs `Session` protobuf message with populated `SessionSpec` +7. **Scheduler**: Restores session state and ensures `min_instances` executors are allocated +8. **Scheduler**: Resumes normal scheduling based on `min_instances` and `max_instances` configuration + +**Validation Rules:** +- `min_instances` must be >= 0 +- If `max_instances` is `Some(value)`, then `min_instances <= value` +- If validation fails, return an error when creating the session + +**SDK Client Interface:** +- **Python SDK (runner.py)**: When calling `create_session()`, populates `SessionSpec` in `CreateSessionRequest` +- **Python SDK**: Sets `SessionSpec.min_instances = RunnerContext.min_instances` (as uint32) +- **Python SDK**: Sets `SessionSpec.max_instances = RunnerContext.max_instances` (as optional uint32, None means unlimited) +- **Python SDK**: Includes serialized `RunnerContext` in `SessionSpec.common_data` for use by executors +- The protobuf-generated `SessionSpec` structure is used across all SDKs (Python, Rust, Go) +- All language SDKs use the same `SessionSpec` message format from `rpc/protos/types.proto` + +**Session Query Interface (flmctl):** +- `flmctl view -s ` reads `min_instances` and `max_instances` from `SessionSpec` in the session table +- Display these values in the command output + +**Executor Manager Interface (Internal):** + +No changes required. The executor manager continues to handle executor lifecycle as directed by the session manager. + +### Scope + +**In Scope:** +- Add `stateful` and `autoscale` parameters to `Runner.service()` +- Update `RunnerContext` to include `stateful`, `autoscale`, `min_instances`, `max_instances` +- Remove `RunnerServiceKind` enum and `kind` parameter from `Runner.service()` +- Move execution object loading from `on_task_invoke` to `on_session_enter` in `FlameRunpyService` +- Implement class instantiation in `on_session_enter` (if execution object is a class) +- Persist execution object state back to cache after each task if `stateful=True` +- Update `Runner.service()` to serialize classes (not instances) when a class is provided +- Update `SessionSpec` protobuf message to add `min_instances` and `max_instances` fields +- Regenerate protobuf bindings for all supported languages +- Create database migration `20260123000000_add_session_instances.sql` +- Update session table schema to add `min_instances` and `max_instances` columns +- Update `SessionDao` struct and `TryFrom` implementation +- Update internal `Session` struct in `common/src/apis.rs` +- Update `SessionInfo` struct in `session_manager/src/model/mod.rs` +- Update INSERT/SELECT queries in `sqlite.rs` to handle new columns +- Implement validation in session creation to ensure `min_instances <= max_instances` +- Implement fallback logic: if `SessionSpec.max_instances` is None, use `Application.max_instances` +- Persist `min_instances` and `max_instances` in session table for durability +- Update fairshare scheduler plugin to: + - Respect session's `min_instances` and `max_instances` + - Ensure `desired >= min_instances * slots` (minimum guarantee) + - Cap allocation at `max_instances` (already includes app limit from creation) + - Update `SSNInfo` struct with new fields +- Update `AllocateAction` scheduler action to: + - Add explicit `max_instances` check before creating executors + - Prevent over-allocation when multiple executors are created in one scheduler cycle +- Update `flmctl view -s` command to display `min_instances` and `max_instances` from session table + +**Out of Scope:** +- Advanced autoscaling policies (e.g., based on CPU/memory metrics) +- Custom scaling rules (e.g., scale by time of day) +- Load balancing algorithms (continue to use existing session manager logic) +- Dynamic adjustment of min/max instances after session creation + +**Limitations:** +- Autoscaling is reactive (based on pending tasks), not predictive +- No support for scaling down executors (min_instances executors remain allocated) +- State persistence is all-or-nothing (cannot persist partial state) +- Class instantiation uses default constructor only (no custom initialization parameters) +- `min_instances` and `max_instances` are derived from `autoscale` and cannot be set independently + +### Feature Interaction + +**Related Features:** +- **RPC API (protobuf)**: Session configuration is part of the core RPC API interface +- **Session Management**: Session manager must respect min/max instances when allocating executors +- **Executor Management**: Executor lifecycle is controlled by session manager based on instance limits +- **Object Cache**: Stateful services use cache to persist execution object state +- **RL Module**: Uses RunnerService for remote execution of Python code + +**Updates Required:** + +1. **runner.py**: + - Update `Runner.service()` signature to accept `stateful` and `autoscale` parameters + - Remove class instantiation logic (keep class as-is, don't instantiate) + - Update `RunnerService.__init__()` to pass new parameters to `RunnerContext` + - Update default logic to determine `stateful` and `autoscale` when not specified + - When calling `create_session()`, populate `SessionSpec.min_instances` and `SessionSpec.max_instances` from `RunnerContext.min_instances` and `RunnerContext.max_instances` + +2. **runpy.py**: + - Move execution object loading from `on_task_invoke` to `on_session_enter` + - Store execution object as instance variable (`self._execution_object`) + - In `on_session_enter`, if execution object is a class, instantiate it + - In `on_task_invoke`, use `self._execution_object` instead of loading from common data + - After task execution, if `stateful=True`, persist updated execution object to cache + - Update common data handling to store both execution object and configuration + +3. **types.py**: + - Update `RunnerContext` dataclass with new fields + - Remove `RunnerServiceKind` enum entirely + - Add `__post_init__` logic to compute `min_instances` and `max_instances` + +4. **rpc/protos/types.proto**: + - Update `SessionSpec` message to add `min_instances` (uint32) and `max_instances` (optional uint32) fields + - Regenerate protobuf bindings for all languages (Rust, Python) + +5. **session_manager (Rust)**: + + **a. Database Migration:** + - Create `session_manager/migrations/sqlite/20260123000000_add_session_instances.sql` + - Add `min_instances INTEGER NOT NULL DEFAULT 0` column + - Add `max_instances INTEGER` column (NULL means unlimited) + + **b. Storage Layer (`session_manager/src/storage/engine/`):** + - Update `SessionDao` struct in `types.rs`: + - Add `min_instances: i64` field + - Add `max_instances: Option` field + - Update `TryFrom<&SessionDao> for Session` impl in `types.rs`: + - Map `min_instances` from i64 to u32 + - Map `max_instances` from Option to Option + - Update INSERT query in `sqlite.rs` (around line 384): + - Current: `INSERT INTO sessions (id, application, slots, common_data, creation_time, state)` + - Updated: `INSERT INTO sessions (id, application, slots, common_data, creation_time, state, min_instances, max_instances)` + - Add `.bind(min_instances)` and `.bind(max_instances)` to query + - Update SELECT queries to include new columns (SQLx should handle this automatically via `SessionDao`) + + **c. Internal Session Struct (`common/src/apis.rs`):** + - Update `Session` struct: + - Add `min_instances: u32` field + - Add `max_instances: Option` field + + **d. Session Creation Logic:** + - When receiving `CreateSessionRequest`: + - Read `min_instances` and `max_instances` directly from `SessionSpec` in the request + - These were already populated by the Python SDK from `RunnerContext` + - No need to deserialize `RunnerContext` from `common_data` again + - **Fallback Logic**: If `SessionSpec.max_instances` is None (unlimited), use `Application.max_instances` as fallback + - Query the application from the database + - If `Application.max_instances` exists, use it as the effective limit + - If both are None, session remains truly unlimited (effective `max_instances = None`) + - This ensures sessions respect application limits when set, but allows unlimited if both are None + - Validate that `min_instances <= max_instances` (if max_instances is Some), return error if validation fails + - Store `SessionSpec` with the effective `max_instances` value in the database + + **e. Session Loading Logic:** + - When loading sessions from database: + - `SessionDao` automatically populated by SQLx with new columns + - `TryFrom` converts to internal `Session` struct + - Scheduler reads `session.min_instances` and `session.max_instances` for allocation + + **f. Model Layer (`session_manager/src/model/mod.rs`):** + - Update `SessionInfo` struct to include: + - `min_instances: u32` field + - `max_instances: Option` field + - Update `From<&Session> for SessionInfo` implementation to populate these fields + + **g. Scheduler Plugins (`session_manager/src/scheduler/plugins/`):** + - Update `fairshare.rs` plugin: + - Update `SSNInfo` struct to include `min_instances` and `max_instances` fields + - In `setup()` method (around line 127-146): + - Read `session.min_instances` and `session.max_instances` from `SessionInfo` + - Note: `session.max_instances` already includes application limit (applied during session creation) + - Cap desired executors: `desired = desired.min((session.max_instances * ssn.slots) as f64)` if max_instances is Some + - Ensure minimum allocation: `desired = desired.max((session.min_instances * ssn.slots) as f64)` + - During the fairshare loop (calculating deserved from remaining slots): + - Initialize `deserved = min_instances * slots` (guarantee minimum) + - Only sessions with `deserved < desired` participate in fairshare distribution + - When distributing slots, cap each session's deserved by its `desired` value + - This ensures `min_instances <= deserved <= desired <= max_instances` for all sessions + - Update `is_underused()` method to check if `allocated < deserved` + - Note: deserved is already guaranteed to be >= min_instances from the calculation above + + **h. Scheduler (`session_manager/src/scheduler/`):** + - Read `min_instances` and `max_instances` from `Session` struct and pass to `SessionInfo` + - Executor allocation happens asynchronously in the scheduler loop (not immediately on session creation): + - **Scheduler Loop**: Runs periodically (every `schedule_interval` ms, typically 1000ms) + - **AllocateAction** (`scheduler/actions/allocate.rs`): Executed as part of scheduler actions + - Gets all open sessions, orders them by fairshare plugin's `ssn_order_fn` + - For each session, checks `plugins.is_underused(ssn)` + - **NEW**: Add explicit `max_instances` check by counting actual executors from snapshot + - This prevents over-allocation when multiple executors are created in one cycle + - Fairshare's cached `allocated` count doesn't update within a cycle + - If underused and below `max_instances`, allocates executors + - Calls `ctx.create_executor(node, ssn)` to actually create executor + - Delegate allocation decisions to scheduler plugins (fairshare) + - Plugins respect both `min_instances` (guaranteed) and `max_instances` (limit) + - Note: Sessions with `min_instances > 0` will be allocated executors in the first scheduler cycle (within ~1 second) + +5. **Examples and Tests**: + - Update existing examples to use new API + - Add new examples demonstrating `stateful` and `autoscale` usage + - Update integration tests to verify new behavior + +**Integration Points:** +- **Python SDK → Cache**: `RunnerContext` is serialized with cloudpickle and stored in flame-object-cache +- **Python SDK → Session Manager**: `SessionSpec` protobuf message (via RPC) contains `min_instances`, `max_instances`, and reference to cached `RunnerContext` +- **Session Manager → Database**: `SessionSpec` fields are persisted to session table +- **Session Manager → Scheduler**: Scheduler reads `min_instances` and `max_instances` from `SessionSpec` to control executor allocation +- **Executor → Cache**: `FlameRunpyService` loads `RunnerContext` from cache using reference in `SessionSpec.common_data` +- **Executor → Cache**: If `stateful=True`, persists updated execution object back to cache + +**Compatibility:** +- **Database Migration**: + - New columns have default values (min_instances=0, max_instances=NULL) + - Existing sessions will be assigned defaults upon migration + - Migration can be applied before deploying new session manager version + - Sessions in database are compatible with new schema + +**Breaking Changes:** +- `RunnerServiceKind` enum is removed from `sdk/python/src/flamepy/rl/types.py` +- `kind` parameter is removed from `Runner.service()` signature +- **Migration Required**: Users must update their code: + - Old: `runner.service(obj, kind=RunnerServiceKind.Stateful)` + - New: `runner.service(obj, stateful=True, autoscale=False)` + - Old: `runner.service(obj, kind=RunnerServiceKind.Stateless)` + - New: `runner.service(obj, stateful=False, autoscale=True)` (or omit parameters, this is the default for functions) + +## 3. Implementation Detail + +### Architecture + +The enhancement spans multiple layers from SDK to session manager to executors: + +``` +┌──────────────────────────────────────────────────────────┐ +│ Python SDK (runner.py) │ +│ - Accept stateful/autoscale parameters │ +│ - Serialize class (not instance) │ +│ - Create RunnerContext with min/max instances │ +│ - Store RunnerContext in cache │ +└────────────────┬─────────────────────────────────────────┘ + │ + ▼ create_session() with CreateSessionRequest +┌──────────────────────────────────────────────────────────┐ +│ Python SDK (client.py) │ +│ - Create SessionSpec protobuf message │ +│ - Set SessionSpec.min_instances from RunnerContext │ +│ - Set SessionSpec.max_instances from RunnerContext │ +│ - Set SessionSpec.common_data (RunnerContext ObjectRef) │ +└────────────────┬─────────────────────────────────────────┘ + │ + ▼ RPC: CreateSessionRequest(SessionSpec) +┌──────────────────────────────────────────────────────────┐ +│ Session Manager (Rust) │ +│ - Receive CreateSessionRequest with SessionSpec │ +│ - Read min_instances and max_instances from SessionSpec │ +│ - Apply fallback: if max=None, use app.max_instances │ +│ - Validate min_instances <= max_instances │ +│ - Store Session in database with these values │ +│ - Return (executor allocation is asynchronous) │ +└────────────────┬─────────────────────────────────────────┘ + │ + ▼ Asynchronous executor allocation (scheduler loop ~1s) +┌──────────────────────────────────────────────────────────┐ +│ Scheduler Loop (Rust) │ +│ - FairShare: calculate deserved (>= min_instances) │ +│ - AllocateAction: check is_underused() for sessions │ +│ - Create executors until allocated >= deserved │ +│ - Respect max_instances limit │ +└────────────────┬─────────────────────────────────────────┘ + │ + ▼ Task execution +┌──────────────────────────────────────────────────────────┐ +│ FlameRunpyService (runpy.py) │ +│ - on_session_enter: Load RunnerContext from cache │ +│ - Load execution object, instantiate if class │ +│ - on_task_invoke: Use cached execution object │ +│ - Persist state to cache if stateful=True │ +└──────────────────────────────────────────────────────────┘ +``` + +**Key Data Flows:** +1. **RunnerContext**: Created by SDK, stored in cache, contains execution object and config +2. **SessionSpec**: RPC API message, contains min/max instances and common_data reference +3. **Session Table**: Database storage, persists SessionSpec fields for recovery +4. **Scheduler Loop** (asynchronous executor allocation): + - Session created → stored in database + - Scheduler loop (every ~1 second) creates snapshot + - FairShare plugin calculates deserved (>= min_instances * slots) + - AllocateAction checks is_underused() for each session + - If underused, creates executors via ctx.create_executor() + - Process repeats until session reaches deserved allocation + +### Components + +**1. Runner.service() (Python SDK)** +- **Location**: `sdk/python/src/flamepy/rl/runner.py` +- **Responsibilities**: + - Accept `stateful` and `autoscale` parameters + - Determine defaults based on execution object type + - Validate configuration (e.g., class with stateful=True is invalid) + - Do NOT instantiate classes (keep as class) + - Create `RunnerContext` with configuration + - Serialize and store `RunnerContext` in cache + +**2. RunnerContext (Python SDK)** +- **Location**: `sdk/python/src/flamepy/rl/types.py` +- **Responsibilities**: + - Store execution object and configuration + - Compute `min_instances` and `max_instances` based on `autoscale` + - Validate configuration (e.g., classes cannot be stateful) + +**3. FlameRunpyService (Python SDK)** +- **Location**: `sdk/python/src/flamepy/rl/runpy.py` +- **Responsibilities**: + - Load execution object in `on_session_enter` (not `on_task_invoke`) + - Store execution object as instance variable + - If execution object is a class, instantiate it using default constructor + - Use stored execution object in `on_task_invoke` + - After task execution, if `stateful=True`, update cache with modified execution object + +**4. SessionSpec (RPC Protobuf)** +- **Location**: `rpc/protos/types.proto` +- **Responsibilities**: + - Define the core API interface for session configuration + - Include `min_instances` and `max_instances` as part of session specification + - Used by all components (session manager, SDK, flmctl) to communicate session configuration + - Generated into Rust, Python, and other language bindings + +**5. Session Table (Database)** +- **Location**: `session_manager` database schema +- **Responsibilities**: + - Store persistent session data including `min_instances` and `max_instances` + - Support session recovery after session manager restarts + - Enable querying sessions by instance configuration + - Map to/from `SessionSpec` protobuf message + +**6. SessionInfo Struct (Scheduler Model)** +- **Location**: `session_manager/src/model/mod.rs` +- **Responsibilities**: + - Snapshot model used by scheduler and scheduler plugins + - Include `min_instances` and `max_instances` fields from `Session` + - Provide session configuration to scheduler plugins during scheduling decisions + - Populated via `From<&Session> for SessionInfo` conversion + +**7. Session Manager (Rust)** +- **Location**: `session_manager/src/manager.rs` +- **Responsibilities**: + - **Session Creation**: + - Receive `CreateSessionRequest` with `SessionSpec` containing `min_instances` and `max_instances` + - Read values directly from `SessionSpec` (already populated by SDK from `RunnerContext`) + - **Apply Fallback**: If `SessionSpec.max_instances` is None, query `Application` and use `Application.max_instances` + - Validate that `min_instances <= max_instances`, return error if validation fails + - Store `Session` with effective `max_instances` in database table + - Return created session (executor allocation happens asynchronously in scheduler) + - **Session Loading** (recovery): + - Load session data from database table + - Construct internal `Session` struct with `min_instances` and `max_instances` from table columns + - Restore session state (scheduler will allocate executors as needed in next cycle) + +**8. Scheduler (Rust)** +- **Location**: `session_manager/src/scheduler/` +- **Responsibilities**: + - **Scheduler Loop** (`mod.rs`): + - Runs continuously in background thread + - Every `schedule_interval` milliseconds (typically 1000ms): + - Creates snapshot of current cluster state + - Executes scheduling actions (AllocateAction, DispatchAction, etc.) + - **AllocateAction** (`actions/allocate.rs`): + - Gets all open sessions from snapshot + - Orders sessions by fairshare priority + - For each session: + - Checks `plugins.is_underused(ssn)` - returns true if `allocated < deserved` + - **NEW**: Explicit `max_instances` check - counts actual executors from snapshot to prevent over-allocation + - If underused and below max_instances, finds available nodes and calls `ctx.create_executor(node, ssn)` + - Sessions with `min_instances > 0` will have `deserved >= min_instances * slots` + - Therefore, they will be allocated executors until reaching at least `min_instances` + - Sessions will not exceed `max_instances` even when multiple executors are allocated in one cycle + - **Context** (`ctx.rs`): + - Maintains snapshot of cluster state + - Delegates allocation decisions to plugins + - Calls controller to actually create executors + +**9. FairShare Plugin (Scheduler)** +- **Location**: `session_manager/src/scheduler/plugins/fairshare.rs` +- **Responsibilities**: + - **setup()**: Called at the start of each scheduler cycle + - Calculates `desired` for each session based on pending/running tasks + - Caps desired by session's `max_instances` (already includes app limit from creation) + - Initializes `deserved = min_instances * slots` (guaranteed minimum) + - Distributes remaining cluster resources fairly across sessions + - Ensures: `min_instances * slots <= deserved <= desired <= max_instances * slots` + - **is_underused()**: Called by AllocateAction for each session + - Returns true if `allocated < deserved` + - This ensures sessions reach their guaranteed `min_instances` + - And get fair share of resources beyond that + - **is_allocatable()**: Checks if node has capacity for session's slots + - **ssn_order_fn()**: Orders sessions by fairshare priority + - Update `SSNInfo` struct to include `min_instances` and `max_instances` + +### Data Structures + +**RunnerContext (Updated):** + +```python +@dataclass +class RunnerContext: + execution_object: Any + stateful: bool = False + autoscale: bool = True + # Computed fields + min_instances: int = field(init=False, repr=False) + max_instances: Optional[int] = field(init=False, repr=False) + + def __post_init__(self) -> None: + # Compute min/max instances based on autoscale + if self.autoscale: + self.min_instances = 0 + self.max_instances = None # Unlimited + else: + self.min_instances = 1 + self.max_instances = 1 # Single instance + + # Validation + if self.stateful and inspect.isclass(self.execution_object): + raise ValueError("Cannot set stateful=True for a class. Pass an instance instead.") +``` + +**FlameRunpyService State (Updated):** + +```python +class FlameRunpyService(FlameService): + def __init__(self): + self._ssn_ctx: SessionContext = None + self._execution_object: Any = None # Cached execution object + self._runner_context: RunnerContext = None # Configuration +``` + +**Session Table Schema (Database):** + +Current sessions table schema (from existing migrations): +```sql +CREATE TABLE IF NOT EXISTS sessions ( + id TEXT PRIMARY KEY, + application TEXT NOT NULL, + slots INTEGER NOT NULL, + common_data BLOB, + creation_time INTEGER NOT NULL, + completion_time INTEGER, + state INTEGER NOT NULL, + version INTEGER NOT NULL DEFAULT 1 +); +``` + +**New Migration Script** (`migrations/sqlite/20260123000000_add_session_instances.sql`): + +```sql +-- Add min_instances and max_instances columns to sessions table for RFE323 +ALTER TABLE sessions +ADD COLUMN min_instances INTEGER NOT NULL DEFAULT 0; + +ALTER TABLE sessions +ADD COLUMN max_instances INTEGER; -- NULL means unlimited + +-- Updated schema after migration: +-- sessions table will have: +-- id, application, slots, common_data, +-- creation_time, completion_time, state, version, +-- min_instances, max_instances +``` + +**SessionDao Struct (Rust - storage layer):** + +Current SessionDao in `session_manager/src/storage/engine/types.rs`: +```rust +#[derive(Clone, FromRow, Debug)] +pub struct SessionDao { + pub id: SessionID, + pub application: String, + pub slots: i64, + pub version: u32, + pub common_data: Option>, + pub creation_time: i64, + pub completion_time: Option, + pub state: i32, +} +``` + +**Updated SessionDao Struct:** +```rust +#[derive(Clone, FromRow, Debug)] +pub struct SessionDao { + pub id: SessionID, + pub application: String, + pub slots: i64, + pub version: u32, + pub common_data: Option>, + pub creation_time: i64, + pub completion_time: Option, + pub state: i32, + pub min_instances: i64, // New field + pub max_instances: Option, // New field, NULL means unlimited +} +``` + +**Updated TryFrom Implementation:** +```rust +impl TryFrom<&SessionDao> for Session { + type Error = FlameError; + + fn try_from(ssn: &SessionDao) -> Result { + Ok(Self { + id: ssn.id.clone(), + application: ssn.application.clone(), + slots: ssn.slots as u32, + version: ssn.version, + common_data: ssn.common_data.clone().map(Bytes::from), + creation_time: DateTime::::from_timestamp(ssn.creation_time, 0) + .ok_or(FlameError::Storage("invalid creation time".to_string()))?, + completion_time: ssn.completion_time + .map(|t| { + DateTime::::from_timestamp(t, 0) + .ok_or(FlameError::Storage("invalid completion time".to_string())) + }) + .transpose()?, + tasks: HashMap::new(), + tasks_index: HashMap::new(), + status: SessionStatus { + state: ssn.state.try_into()?, + }, + events: vec![], + min_instances: ssn.min_instances as u32, // New field conversion + max_instances: ssn.max_instances.map(|v| v as u32), // New field conversion + }) + } +} +``` + +**Internal Session Struct (Rust - runtime):** + +Current Session in `common/src/apis.rs`: +```rust +pub struct Session { + pub id: SessionID, + pub application: String, + pub slots: u32, + pub version: u32, + pub common_data: Option, + pub tasks: HashMap, + pub tasks_index: HashMap>, + pub creation_time: DateTime, + pub completion_time: Option>, + pub events: Vec, + pub status: SessionStatus, +} +``` + +**Updated Internal Session Struct:** +```rust +pub struct Session { + pub id: SessionID, + pub application: String, + pub slots: u32, + pub version: u32, + pub common_data: Option, + pub tasks: HashMap, + pub tasks_index: HashMap>, + pub creation_time: DateTime, + pub completion_time: Option>, + pub events: Vec, + pub status: SessionStatus, + pub min_instances: u32, // New field + pub max_instances: Option, // New field, None means unlimited (but should be set via fallback) +} +``` + +**SessionInfo Struct (Scheduler Snapshot Model):** + +Current SessionInfo in `session_manager/src/model/mod.rs`: +```rust +pub struct SessionInfo { + pub id: SessionID, + pub application: String, + pub slots: u32, + pub tasks_status: HashMap, + pub creation_time: DateTime, + pub completion_time: Option>, + pub state: SessionState, +} +``` + +**Updated SessionInfo Struct:** +```rust +pub struct SessionInfo { + pub id: SessionID, + pub application: String, + pub slots: u32, + pub tasks_status: HashMap, + pub creation_time: DateTime, + pub completion_time: Option>, + pub state: SessionState, + pub min_instances: u32, // New field + pub max_instances: Option, // New field +} +``` + +**Updated From<&Session> for SessionInfo:** +```rust +impl From<&Session> for SessionInfo { + fn from(session: &Session) -> Self { + // ... existing field mappings ... + Self { + id: session.id.clone(), + application: session.application.clone(), + slots: session.slots, + tasks_status: calculate_tasks_status(&session.tasks_index), + creation_time: session.creation_time, + completion_time: session.completion_time, + state: session.status.state, + min_instances: session.min_instances, // New field mapping + max_instances: session.max_instances, // New field mapping + } + } +} +``` + +**FairShare SSNInfo Struct:** + +Current SSNInfo in `session_manager/src/scheduler/plugins/fairshare.rs`: +```rust +struct SSNInfo { + pub id: SessionID, + pub slots: u32, + pub desired: f64, + pub deserved: f64, + pub allocated: f64, +} +``` + +**Updated SSNInfo Struct:** +```rust +struct SSNInfo { + pub id: SessionID, + pub slots: u32, + pub desired: f64, + pub deserved: f64, + pub allocated: f64, + pub min_instances: u32, // New field + pub max_instances: Option, // New field (effective max after considering app limit) +} +``` + +**SessionSpec (RPC/Protobuf):** + +The `SessionSpec` message in `rpc/protos/types.proto` needs to be updated to include instance configuration fields: + +```protobuf +message SessionSpec { + string application = 2; + uint32 slots = 3; + optional bytes common_data = 4; + uint32 min_instances = 5; // Minimum number of instances (default: 0) + optional uint32 max_instances = 6; // Maximum number of instances (null means unlimited) +} +``` + +This is part of the core RPC API interface and will be used by: +- Session manager to store and retrieve instance configuration +- SDK clients to create sessions with instance limits +- `flmctl` to display session configuration + +**Session Manager Internal Structures:** + +The session manager will use the protobuf-generated types directly, reading `min_instances` and `max_instances` from the `Session` message's `spec` field when loading sessions from the database and when creating new sessions. + +**Session Creation with Validation (Rust):** + +```rust +// Pseudo-code for session manager +fn create_session(request: CreateSessionRequest) -> Result { + // Step 1: Read min_instances and max_instances from SessionSpec + // These were already populated by the Python SDK from RunnerContext + let min_instances = request.spec.min_instances as usize; + let max_instances = request.spec.max_instances.map(|v| v as usize); + + // Step 2: Validate instance configuration + if let Some(max) = max_instances { + if min_instances > max { + return Err(FlameError::InvalidArgument( + format!("min_instances ({}) must be <= max_instances ({})", + min_instances, max) + )); + } + } + + // Note: No need to extract from RunnerContext in common_data - + // the SDK already populated SessionSpec with these values + + Ok(Session { + metadata: generate_session_metadata(), + spec: request.spec, + status: SessionStatus::default(), + }) +} +``` + +### Algorithms + +**Algorithm 1: Runner.service() with Default Determination** + +```python +def service(self, execution_object, stateful=None, autoscale=None): + # Step 1: Determine execution object type + is_function = callable(execution_object) and not inspect.isclass(execution_object) + is_class = inspect.isclass(execution_object) + is_instance = not is_function and not is_class + + # Step 2: Apply defaults if not specified + if stateful is None: + stateful = False # All types default to False + + if autoscale is None: + if is_function: + autoscale = True + else: # class or instance + autoscale = False + + # Step 3: Validation + if stateful and is_class: + raise ValueError("Cannot set stateful=True for a class") + + # Step 4: Do NOT instantiate classes (keep as-is) + # Old code: if inspect.isclass(execution_object): execution_object = execution_object() + # New code: keep execution_object as class + + # Step 5: Create RunnerContext + runner_context = RunnerContext( + execution_object=execution_object, + stateful=stateful, + autoscale=autoscale + ) + + # Step 6: Serialize and store in cache + serialized = cloudpickle.dumps(runner_context) + object_ref = put_object(session_id, serialized) + + # Step 7: Create session and RunnerService + return RunnerService(self._name, execution_object, stateful, autoscale) +``` + +**Algorithm 2: on_session_enter() with Object Loading** + +```python +def on_session_enter(self, context: SessionContext) -> bool: + # Step 1: Install package if needed + if context.application.url: + self._install_package_from_url(context.application.url) + + # Step 2: Load RunnerContext from common_data + common_data_bytes = context.common_data() + object_ref = ObjectRef.decode(common_data_bytes) + serialized_ctx = get_object(object_ref) + runner_context = cloudpickle.loads(serialized_ctx) + + # Step 3: Store configuration + self._ssn_ctx = context + self._runner_context = runner_context + + # Step 4: Load execution object + execution_object = runner_context.execution_object + + # Step 5: If it's a class, instantiate it + if inspect.isclass(execution_object): + logger.debug(f"Instantiating class {execution_object.__name__}") + execution_object = execution_object() # Use default constructor + + # Step 6: Store execution object for reuse + self._execution_object = execution_object + + logger.info("Session entered successfully, execution object loaded") + return True +``` + +**Algorithm 3: on_task_invoke() with State Persistence** + +```python +def on_task_invoke(self, context: TaskContext) -> Optional[TaskOutput]: + # Step 1: Use cached execution object (not from common_data) + execution_object = self._execution_object + + # Step 2: Deserialize RunnerRequest + request = cloudpickle.loads(context.input) + + # Step 3: Resolve arguments + invoke_args = tuple(self._resolve_object_ref(arg) for arg in request.args or ()) + invoke_kwargs = {k: self._resolve_object_ref(v) for k, v in (request.kwargs or {}).items()} + + # Step 4: Execute method or callable + if request.method is None: + result = execution_object(*invoke_args, **invoke_kwargs) + else: + method = getattr(execution_object, request.method) + result = method(*invoke_args, **invoke_kwargs) + + # Step 5: Update execution object if stateful + if self._runner_context.stateful: + logger.debug("Persisting execution object state") + updated_context = RunnerContext( + execution_object=execution_object, # Updated object + stateful=self._runner_context.stateful, + autoscale=self._runner_context.autoscale, + ) + serialized = cloudpickle.dumps(updated_context) + + # Get original ObjectRef and update it + common_data_bytes = self._ssn_ctx.common_data() + object_ref = ObjectRef.decode(common_data_bytes) + update_object(object_ref, serialized) + + # Step 6: Return result + result_ref = put_object(context.session_id, result) + return TaskOutput(result_ref.encode()) +``` + +**Algorithm 4: Session Creation Enhancement** + +**Context**: The existing `create_session()` method in session manager needs to handle the new `min_instances` and `max_instances` fields. + +**Changes Required**: + +```rust +// In session_manager/src/manager.rs (or wherever create_session is implemented) +fn create_session(request: CreateSessionRequest) -> Result { + // NEW: Step 1 - Read min_instances and max_instances from SessionSpec + // These were populated by the Python SDK from RunnerContext + let min_instances = request.spec.min_instances as usize; + let max_instances = request.spec.max_instances.map(|v| v as usize); + + // NEW: Step 2 - Apply fallback logic for max_instances + let application = get_application(&request.spec.application)?; + let effective_max_instances = match max_instances { + Some(max) => Some(max), + None => { + // Fallback to application's max_instances (if it exists) + match application.spec.max_instances { + Some(app_max) => { + tracing::info!( + "Session max_instances is None, using application max_instances: {}", + app_max + ); + Some(app_max as usize) + } + None => { + tracing::info!( + "Both session and application max_instances are None, session is unlimited" + ); + None + } + } + } + }; + + // NEW: Step 3 - Validate min <= max + if let Some(max) = effective_max_instances { + if min_instances > max { + return Err(FlameError::InvalidArgument( + format!("min_instances ({}) must be <= max_instances ({})", + min_instances, max) + )); + } + } + + // NEW: Step 4 - Update SessionSpec with effective max_instances + let mut session_spec = request.spec.clone(); + session_spec.max_instances = effective_max_instances.map(|v| v as u32); + + // MODIFIED: Step 5 - Update INSERT query to include new columns + db.execute( + "INSERT INTO sessions (id, application, slots, common_data, + min_instances, max_instances, ...) + VALUES (?, ?, ?, ?, ?, ?, ...)", + params![ + &session_id, + &session_spec.application, + session_spec.slots as i64, + &session_spec.common_data, + session_spec.min_instances as i64, // NEW + session_spec.max_instances.map(|v| v as i64), // NEW + // ... existing fields ... + ] + )?; + + // MODIFIED: Step 6 - Add new fields to Session struct + let session = Session { + // ... existing fields ... + min_instances: session_spec.min_instances, // NEW + max_instances: session_spec.max_instances, // NEW + // ... existing fields ... + }; + + Ok(session) + // Note: Executor allocation happens asynchronously in scheduler loop +} +``` + +**Algorithm 5: FairShare Plugin Enhancement** + +**Context**: The existing FairShare plugin (`session_manager/src/scheduler/plugins/fairshare.rs`) calculates how many executors each session deserves. This shows the NEW/MODIFIED logic for min/max instances. + +**Changes Required in `setup()` method**: + +```rust +// In fairshare.rs setup() method +fn setup(&mut self, ss: &SnapShot) -> Result<(), FlameError> { + // ... existing code: load sessions, apps, calculate base desired from tasks ... + + for ssn in open_ssns.values() { + // Existing: Calculate desired from pending/running tasks + let mut desired = calculate_desired_from_tasks(ssn); + + if let Some(app) = apps.get(&ssn.application) { + // NEW: Cap desired by session's max_instances + // Note: ssn.max_instances already includes app limit (from session creation) + if let Some(max_instances) = ssn.max_instances { + desired = desired.min((max_instances * ssn.slots) as f64); + } + + // NEW: Ensure desired is at least min_instances + let min_allocation = (ssn.min_instances * ssn.slots) as f64; + desired = desired.max(min_allocation); + + // MODIFIED: Store SSNInfo with new fields + self.ssn_map.insert( + ssn.id.clone(), + SSNInfo { + id: ssn.id.clone(), + desired, + deserved: min_allocation, // NEW: Initialize to min_instances + slots: ssn.slots, + min_instances: ssn.min_instances, // NEW field + max_instances: ssn.max_instances, // NEW field + allocated: 0.0, + }, + ); + } + } + + // ... existing code: calculate allocated from executors ... + + // NEW: Reserve slots for guaranteed minimums before fair distribution + let mut remaining_slots = total_cluster_slots; + for ssn in self.ssn_map.values() { + let min_allocation = (ssn.min_instances * ssn.slots) as f64; + remaining_slots -= min_allocation; + } + + // Existing: Distribute remaining_slots fairly + // (existing fair distribution loop works as-is, but now respects + // deserved >= min_instances from initialization above) + // ... + + Ok(()) +} +``` + +**SSNInfo struct update** (in `fairshare.rs`): + +```rust +struct SSNInfo { + // ... existing fields ... + min_instances: u32, // NEW + max_instances: Option, // NEW +} +``` + +**Note**: The existing fair distribution loop works unchanged, because: +- `deserved` starts at `min_instances` (guaranteed) +- `desired` is capped by `max_instances` +- Fair distribution only gives more if `deserved < desired` + +**Algorithm 6: AllocateAction Enhancement (Scheduler Loop)** + +**Context**: The existing AllocateAction (`session_manager/src/scheduler/actions/allocate.rs`) iterates through open sessions, checks if they're underused via `ctx.is_underused()`, and allocates executors. This algorithm shows only the NEW logic needed. + +**Change Required**: Add max_instances check after the `is_underused()` check (around line 65-67): + +```rust +// Existing code (line ~65) +if !ctx.is_underused(&ssn)? { + continue; +} + +// NEW: Add this check before pipeline check +// Explicit max_instances check (safety guard) +// Note: The fairshare plugin caches allocated count from snapshot +// at the start of the cycle. Within a single cycle, if we allocate +// multiple executors, the cached count doesn't update. To prevent +// over-allocation, we count actual executors from the current snapshot. +if let Some(max_instances) = ssn.max_instances { + let current_executors = ss.find_executors_by_session(&ssn.id)?; + let current_count = current_executors.len(); + if current_count >= max_instances as usize { + tracing::debug!( + "Session <{}> has reached max_instances limit: {} >= {}", + ssn.id, current_count, max_instances + ); + continue; // Already at max limit + } +} + +// Existing code continues (pipeline check, node iteration, etc.) +``` + +**Key Points:** +- Runs periodically in scheduler loop (every ~1 second) +- Uses fairshare plugin's `is_underused()` to determine if session needs more executors +- For sessions with `min_instances > 0`, `is_underused()` returns true until `allocated >= min_instances * slots` +- **Explicit max_instances check**: Counts actual executors from snapshot to prevent over-allocation (Step 5) + - This is needed because fairshare's cached `allocated` count doesn't update within a single scheduler cycle + - Prevents allocating beyond `max_instances` when creating multiple executors in one cycle +- Creates one executor per iteration, then re-evaluates +- Sessions are processed in fairshare priority order + +### System Considerations + +**Performance:** +- **Improved**: Loading execution object once per session (not per task) reduces overhead +- **Serialization**: Class pickling is more efficient than instance pickling +- **Cache Access**: Stateful services incur cache update cost after each task +- **Expected Impact**: 10-20% reduction in task invocation latency for stateful services + +**Scalability:** +- **Autoscaling**: Services can scale from 0 to unlimited instances based on workload +- **Single Instance**: Services that require coordination can use autoscale=False +- **Executor Allocation**: Session manager dynamically allocates executors via fairshare plugin +- **Fair Sharing**: FairShare plugin ensures fair allocation across sessions while respecting individual session limits +- **Minimum Guarantee**: Sessions with `min_instances > 0` are guaranteed that many executors +- **Maximum Limit**: Session's `max_instances` serves as the effective limit (includes application's limit as fallback during creation) +- **Limitation**: No scale-down mechanism (min_instances remain allocated) + +**Reliability:** +- **State Persistence**: Stateful services can recover state from cache +- **Failure Handling**: If executor fails, state persisted in cache can be loaded by new executor +- **Consistency**: No version checking; last-write-wins for state updates +- **Risk**: Concurrent updates to stateful services may cause race conditions + +**Resource Usage:** +- **Memory**: Execution objects cached in memory per session +- **Disk**: Stateful services persist to cache storage +- **Network**: State updates require cache communication +- **Executors**: Autoscaling services may use more executors + +**Security:** +- **Code Execution**: Execution objects are pickled and unpickled (existing risk) +- **State Tampering**: Cache objects could be modified externally (existing risk) +- **Validation**: No additional security measures in this RFE + +**Observability:** +- **Logging**: Log when execution object is loaded, instantiated, and persisted +- **Metrics**: Track cache updates for stateful services +- **Debugging**: Log stateful/autoscale configuration when session is created + +**Operational:** +- **Deployment**: Requires database migration to add `min_instances` and `max_instances` columns to session table +- **Database Migration**: + - Migration file: `session_manager/migrations/sqlite/20260123000000_add_session_instances.sql` + - Add `min_instances INTEGER NOT NULL DEFAULT 0` column + - Add `max_instances INTEGER` column (NULL for unlimited) + - Existing sessions will automatically get default values (min=0, max=NULL) upon migration + - Migration can be applied before deploying new session manager (forward-compatible) + - No downtime required - SQLx handles schema evolution +- **Code Updates**: + - Update `SessionDao` struct in `session_manager/src/storage/engine/types.rs` + - Update `Session` struct in `common/src/apis.rs` + - Update INSERT query in `session_manager/src/storage/engine/sqlite.rs` + - Update `TryFrom` implementation to map new fields +- **Configuration**: New parameters are optional with sensible defaults +- **Migration Path**: Existing code continues to work unchanged with defaults +- **Monitoring**: Monitor executor allocation for autoscaling services +- **Data Persistence**: Session configuration survives session manager restarts + +### Dependencies + +**External Dependencies:** +- `cloudpickle`: For serialization (existing) +- `inspect`: For type detection (existing) + +**Internal Dependencies:** +- `rpc/protos/types.proto`: Core RPC API definition (SessionSpec message) +- `flamepy.core.cache`: For object persistence +- `flamepy.core.types`: For FlameContext +- Session manager: For executor allocation logic +- Protobuf compiler: For generating language bindings from proto files + +**Version Requirements:** +- Python 3.8+: For type hints and dataclass features +- Rust 1.70+: For session manager updates + +## 4. Use Cases + +### Basic Use Cases + +**Example 1: Stateless Function with Autoscaling** + +```python +# Define a stateless function +def process_request(data): + return {"result": data * 2} + +with Runner("my-app") as runner: + # Default behavior: stateful=False, autoscale=True + service = runner.service(process_request) + + # Submit many tasks, they will autoscale + futures = [service(i) for i in range(1000)] + results = runner.get(futures) +``` + +- **Description**: A stateless request handler that benefits from autoscaling +- **Configuration**: `stateful=False` (default), `autoscale=True` (default) +- **Behavior**: Min 0 instances, max unlimited. Executors created as tasks arrive. +- **Expected outcome**: Efficient parallel processing with automatic scaling + +**Example 2: Stateful Object without Autoscaling** + +```python +# Define a stateful counter class +class Counter: + def __init__(self): + self.count = 0 + + def increment(self): + self.count += 1 + return self.count + +with Runner("counter-app") as runner: + # Explicitly configure: stateful=True, autoscale=False + counter = Counter() + service = runner.service(counter, stateful=True, autoscale=False) + + # All tasks go to the same instance, state is persisted + service.increment() # returns 1 + service.increment() # returns 2 + service.increment() # returns 3 +``` + +- **Description**: A stateful counter that maintains state across tasks +- **Configuration**: `stateful=True`, `autoscale=False` +- **Behavior**: Min 1 instance, max 1 instance. State persisted after each task. +- **Expected outcome**: Sequential execution with persistent state + +**Example 3: Class with Default Constructor** + +```python +# Define a class +class DataProcessor: + def __init__(self): + self.processed_count = 0 + + def process(self, data): + self.processed_count += 1 + return f"Processed {data}, total: {self.processed_count}" + +with Runner("processor-app") as runner: + # Pass the class itself (not an instance) + service = runner.service(DataProcessor, autoscale=False) + + # The class will be instantiated on the executor + result1 = service.process("data1").get() # "Processed data1, total: 1" + result2 = service.process("data2").get() # "Processed data2, total: 2" +``` + +- **Description**: A class is passed to service(), and instantiated on the executor +- **Configuration**: `stateful=False` (default), `autoscale=False` +- **Behavior**: Min 1 instance, max 1 instance. Class instantiated in on_session_enter. +- **Expected outcome**: Single instance, but state is not persisted (resets if executor fails) + +### Advanced Use Cases + +**Example 4: Database Connection Pool** + +```python +class ConnectionPool: + def __init__(self): + self.pool = create_connection_pool(size=10) + + def execute_query(self, query): + with self.pool.get_connection() as conn: + return conn.execute(query) + +with Runner("db-app") as runner: + # Single instance, no autoscaling + pool = ConnectionPool() + service = runner.service(pool, stateful=False, autoscale=False) + + # All queries use the same connection pool + result = service.execute_query("SELECT * FROM users") +``` + +- **Description**: A connection pool that should not be duplicated +- **Configuration**: `stateful=False`, `autoscale=False` +- **Behavior**: Min 1 instance, max 1 instance. Maintains connection pool. +- **Expected outcome**: Efficient resource usage with single pool + +**Example 5: Distributed Counter with State Persistence** + +```python +class DistributedCounter: + def __init__(self): + self.counts = {} + + def increment(self, key): + self.counts[key] = self.counts.get(key, 0) + 1 + return self.counts[key] + +with Runner("dist-counter") as runner: + counter = DistributedCounter() + service = runner.service(counter, stateful=True, autoscale=False) + + # State persisted after each task + service.increment("user1") + service.increment("user2") + service.increment("user1") + + # If executor fails and restarts, state is recovered from cache +``` + +- **Description**: A distributed counter with persistent state +- **Configuration**: `stateful=True`, `autoscale=False` +- **Behavior**: Min 1 instance, max 1 instance. State persisted to cache. +- **Expected outcome**: State survives executor failures + +**Example 6: Parallel Image Processing** + +```python +def process_image(image_data): + # CPU-intensive image processing + return transformed_image + +with Runner("image-processor") as runner: + # Autoscale to handle large batch + service = runner.service(process_image) # stateful=False, autoscale=True + + # Submit 10,000 images + futures = [service(img) for img in images] + + # Executors scale up to handle workload + results = runner.get(futures) +``` + +- **Description**: Parallel image processing with autoscaling +- **Configuration**: `stateful=False` (default), `autoscale=True` (default) +- **Behavior**: Min 0 instances, max unlimited. Scales with workload. +- **Expected outcome**: Fast parallel processing with automatic resource allocation + +## 5. References + +### Related Documents +- RFE323 GitHub Issue: https://github.com/xflops/flame/issues/323 +- RFE280: Initial Runner implementation +- RFE284: flmrun application +- RFE318: Apache Arrow-Based Object Cache + +### External References +- Python `inspect` module documentation: https://docs.python.org/3/library/inspect.html +- `cloudpickle` documentation: https://github.com/cloudpipe/cloudpickle + +### Implementation References + +**RPC API:** +- RPC API definition: `rpc/protos/types.proto` (SessionSpec message) + +**Python SDK:** +- Runner implementation: `sdk/python/src/flamepy/rl/runner.py` +- RunnerService: `sdk/python/src/flamepy/rl/runpy.py` +- RunnerContext and types: `sdk/python/src/flamepy/rl/types.py` +- Object cache: `sdk/python/src/flamepy/core/cache.py` +- Core client: `sdk/python/src/flamepy/core/client.py` (create_session) + +**Session Manager:** +- Migration script: `session_manager/migrations/sqlite/20260123000000_add_session_instances.sql` +- SessionDao struct: `session_manager/src/storage/engine/types.rs` +- SQLite storage: `session_manager/src/storage/engine/sqlite.rs` (INSERT query around line 384) +- SessionInfo struct: `session_manager/src/model/mod.rs` +- Session manager: `session_manager/src/manager.rs` +- Scheduler: `session_manager/src/scheduler/` (main scheduler logic) +- AllocateAction: `session_manager/src/scheduler/actions/allocate.rs` (add max_instances check) +- FairShare plugin: `session_manager/src/scheduler/plugins/fairshare.rs` +- Plugin interface: `session_manager/src/scheduler/plugins/mod.rs` + +**Common:** +- Internal Session struct: `common/src/apis.rs` + +**Tools:** +- flmctl view command: `cmd/flmctl/` (session view implementation) diff --git a/e2e/tests/test_runner.py b/e2e/tests/test_runner.py index 20af9e03..2c4c5f5b 100644 --- a/e2e/tests/test_runner.py +++ b/e2e/tests/test_runner.py @@ -286,3 +286,105 @@ def test_runner_error_no_package_config(): pass assert exc_info.value.code == flamepy.FlameErrorCode.INVALID_CONFIG + + +def test_runner_stateful_instance(check_package_config, check_flmrun_app): + """Test Case 14: Test Runner with stateful=True for instance.""" + with rl.Runner("test-runner-stateful") as rr: + # Create a Counter instance + counter = Counter() + + # Create a stateful service (state should persist across tasks) + cnt_service = rr.service(counter, stateful=True, autoscale=False) + + # Call methods + cnt_service.add(5).wait() + cnt_service.increment().wait() + result = cnt_service.get_count() + + # Get the result + value = result.get() + assert value == 6, f"Expected 6, got {value}" + + +def test_runner_stateless_function(check_package_config, check_flmrun_app): + """Test Case 15: Test Runner with stateless function (default behavior).""" + with rl.Runner("test-runner-stateless-func") as rr: + # Create a service with a function (stateless by default) + sum_service = rr.service(sum_func, stateful=False, autoscale=True) + + # Call the function multiple times + results = [sum_service(i, i+1) for i in range(5)] + values = rr.get(results) + + # Verify results + expected = [1, 3, 5, 7, 9] + assert values == expected, f"Expected {expected}, got {values}" + + +def test_runner_class_single_instance(check_package_config, check_flmrun_app): + """Test Case 16: Test Runner with class and autoscale=False (single instance).""" + with rl.Runner("test-runner-class-single") as rr: + # Create a service with a class, single instance mode + calc_service = rr.service(Calculator, stateful=False, autoscale=False) + + # Call methods + result1 = calc_service.add(10, 5) + result2 = calc_service.multiply(3, 4) + + values = rr.get([result1, result2]) + assert values == [15, 12], f"Expected [15, 12], got {values}" + + +def test_runner_error_stateful_class(check_package_config, check_flmrun_app): + """Test Case 17: Test that stateful=True raises error for class.""" + with rl.Runner("test-runner-stateful-class-error") as rr: + # Trying to create a stateful service with a class should raise ValueError + with pytest.raises(ValueError) as exc_info: + rr.service(Counter, stateful=True) + + assert "Cannot set stateful=True for a class" in str(exc_info.value) + + +def test_runner_defaults_function(check_package_config, check_flmrun_app): + """Test Case 18: Test default parameters for function (stateful=False, autoscale=True).""" + with rl.Runner("test-runner-defaults-func") as rr: + # Create service with defaults (should be stateful=False, autoscale=True) + sum_service = rr.service(sum_func) + + # Verify it works (defaults should be applied automatically) + result = sum_service(100, 200) + value = result.get() + assert value == 300, f"Expected 300, got {value}" + + +def test_runner_defaults_class(check_package_config, check_flmrun_app): + """Test Case 19: Test default parameters for class (stateful=False, autoscale=False).""" + with rl.Runner("test-runner-defaults-class") as rr: + # Create service with class using defaults (should be stateful=False, autoscale=False) + counter_service = rr.service(Counter) + + # Call methods + counter_service.add(10).wait() + counter_service.increment().wait() + result = counter_service.get_count() + + value = result.get() + assert value == 11, f"Expected 11, got {value}" + + +def test_runner_defaults_instance(check_package_config, check_flmrun_app): + """Test Case 20: Test default parameters for instance (stateful=False, autoscale=False).""" + with rl.Runner("test-runner-defaults-instance") as rr: + # Create an instance + calc = Calculator() + + # Create service with instance using defaults (should be stateful=False, autoscale=False) + calc_service = rr.service(calc) + + # Call methods + result1 = calc_service.add(5, 3) + result2 = calc_service.subtract(10, 4) + + values = rr.get([result1, result2]) + assert values == [8, 6], f"Expected [8, 6], got {values}" diff --git a/examples/pi/rust/src/client.rs b/examples/pi/rust/src/client.rs index c9a4beb4..5cad6162 100644 --- a/examples/pi/rust/src/client.rs +++ b/examples/pi/rust/src/client.rs @@ -64,6 +64,8 @@ async fn main() -> Result<(), Box> { application: app, slots, common_data: None, + min_instances: 0, // Default: no minimum guarantee + max_instances: None, // Default: unlimited }) .await?; diff --git a/flmctl/src/create.rs b/flmctl/src/create.rs index 7dfd0da6..ef2e5e06 100644 --- a/flmctl/src/create.rs +++ b/flmctl/src/create.rs @@ -24,6 +24,8 @@ pub async fn run(ctx: &FlameContext, app: &str, slots: &u32) -> Result<(), Box Result<(), Box> { application: DEFAULT_APP.to_string(), slots, common_data: None, + min_instances: 0, // Default: no minimum guarantee + max_instances: None, // Default: unlimited }; let ssn = conn.create_session(&ssn_attr).await?; let ssn_creation_end_time = Local::now(); diff --git a/flmping/src/client.rs b/flmping/src/client.rs index 4a7bfdce..00e15b86 100644 --- a/flmping/src/client.rs +++ b/flmping/src/client.rs @@ -83,6 +83,8 @@ async fn main() -> Result<(), Box> { application: DEFAULT_APP.to_string(), slots, common_data: cli.common_data.map(|s| s.into()), + min_instances: 0, // Default: no minimum guarantee + max_instances: None, // Default: unlimited }; let ssn = conn.create_session(&ssn_attr).await?; let ssn_creation_end_time = Instant::now(); diff --git a/rpc/protos/types.proto b/rpc/protos/types.proto index e943a8a8..8fde8d2d 100644 --- a/rpc/protos/types.proto +++ b/rpc/protos/types.proto @@ -32,6 +32,8 @@ message SessionSpec { string application = 2; uint32 slots = 3; optional bytes common_data = 4; + uint32 min_instances = 5; // Minimum number of instances (default: 0) + optional uint32 max_instances = 6; // Maximum number of instances (null means unlimited) } message Session { diff --git a/sdk/python/protos/types.proto b/sdk/python/protos/types.proto index e943a8a8..8fde8d2d 100644 --- a/sdk/python/protos/types.proto +++ b/sdk/python/protos/types.proto @@ -32,6 +32,8 @@ message SessionSpec { string application = 2; uint32 slots = 3; optional bytes common_data = 4; + uint32 min_instances = 5; // Minimum number of instances (default: 0) + optional uint32 max_instances = 6; // Maximum number of instances (null means unlimited) } message Session { diff --git a/sdk/python/src/flamepy/core/client.py b/sdk/python/src/flamepy/core/client.py index a8520d87..623fd40f 100644 --- a/sdk/python/src/flamepy/core/client.py +++ b/sdk/python/src/flamepy/core/client.py @@ -62,7 +62,7 @@ def connect(addr: str) -> "Connection": return Connection.connect(addr) -def create_session(application: str, common_data: Optional[bytes] = None, session_id: Optional[str] = None, slots: int = 1) -> "Session": +def create_session(application: str, common_data: Optional[bytes] = None, session_id: Optional[str] = None, slots: int = 1, min_instances: int = 0, max_instances: Optional[int] = None) -> "Session": """Create a new session. Args: @@ -70,9 +70,11 @@ def create_session(application: str, common_data: Optional[bytes] = None, sessio common_data: Common data as bytes (core API works with bytes) session_id: Optional session ID slots: Number of slots + min_instances: Minimum number of instances (default: 0) + max_instances: Maximum number of instances (None = unlimited) """ conn = ConnectionInstance.instance() - return conn.create_session(SessionAttributes(id=session_id, application=application, common_data=common_data, slots=slots)) + return conn.create_session(SessionAttributes(id=session_id, application=application, common_data=common_data, slots=slots, min_instances=min_instances, max_instances=max_instances)) def open_session(session_id: SessionID) -> "Session": @@ -333,6 +335,8 @@ def create_session(self, attrs: SessionAttributes) -> "Session": application=attrs.application, slots=attrs.slots, common_data=common_data_bytes, + min_instances=attrs.min_instances, + max_instances=attrs.max_instances if attrs.max_instances is not None else None, ) request = CreateSessionRequest(session_id=session_id, session=session_spec) diff --git a/sdk/python/src/flamepy/core/types.py b/sdk/python/src/flamepy/core/types.py index 4027f6c5..c5dd328c 100644 --- a/sdk/python/src/flamepy/core/types.py +++ b/sdk/python/src/flamepy/core/types.py @@ -103,6 +103,8 @@ class SessionAttributes: slots: int id: Optional[str] = None common_data: Any = None + min_instances: int = 0 # Minimum number of instances (default: 0) + max_instances: Optional[int] = None # Maximum number of instances (None = unlimited) @dataclass diff --git a/sdk/python/src/flamepy/proto/types_pb2.py b/sdk/python/src/flamepy/proto/types_pb2.py index ca777d63..27c45e5f 100644 --- a/sdk/python/src/flamepy/proto/types_pb2.py +++ b/sdk/python/src/flamepy/proto/types_pb2.py @@ -24,7 +24,7 @@ -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0btypes.proto\x12\x05\x66lame\"$\n\x08Metadata\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\"\xdd\x01\n\rSessionStatus\x12\"\n\x05state\x18\x01 \x01(\x0e\x32\x13.flame.SessionState\x12\x15\n\rcreation_time\x18\x02 \x01(\x03\x12\x1c\n\x0f\x63ompletion_time\x18\x03 \x01(\x03H\x00\x88\x01\x01\x12\x0f\n\x07pending\x18\x04 \x01(\x05\x12\x0f\n\x07running\x18\x05 \x01(\x05\x12\x0f\n\x07succeed\x18\x06 \x01(\x05\x12\x0e\n\x06\x66\x61iled\x18\x07 \x01(\x05\x12\x1c\n\x06\x65vents\x18\x08 \x03(\x0b\x32\x0c.flame.EventB\x12\n\x10_completion_time\"[\n\x0bSessionSpec\x12\x13\n\x0b\x61pplication\x18\x02 \x01(\t\x12\r\n\x05slots\x18\x03 \x01(\r\x12\x18\n\x0b\x63ommon_data\x18\x04 \x01(\x0cH\x00\x88\x01\x01\x42\x0e\n\x0c_common_data\"t\n\x07Session\x12!\n\x08metadata\x18\x01 \x01(\x0b\x32\x0f.flame.Metadata\x12 \n\x04spec\x18\x02 \x01(\x0b\x32\x12.flame.SessionSpec\x12$\n\x06status\x18\x03 \x01(\x0b\x32\x14.flame.SessionStatus\"\x94\x01\n\nTaskStatus\x12\x1f\n\x05state\x18\x01 \x01(\x0e\x32\x10.flame.TaskState\x12\x15\n\rcreation_time\x18\x02 \x01(\x03\x12\x1c\n\x0f\x63ompletion_time\x18\x03 \x01(\x03H\x00\x88\x01\x01\x12\x1c\n\x06\x65vents\x18\x04 \x03(\x0b\x32\x0c.flame.EventB\x12\n\x10_completion_time\"\\\n\x08TaskSpec\x12\x12\n\nsession_id\x18\x02 \x01(\t\x12\x12\n\x05input\x18\x03 \x01(\x0cH\x00\x88\x01\x01\x12\x13\n\x06output\x18\x04 \x01(\x0cH\x01\x88\x01\x01\x42\x08\n\x06_inputB\t\n\x07_output\"k\n\x04Task\x12!\n\x08metadata\x18\x01 \x01(\x0b\x32\x0f.flame.Metadata\x12\x1d\n\x04spec\x18\x02 \x01(\x0b\x32\x0f.flame.TaskSpec\x12!\n\x06status\x18\x03 \x01(\x0b\x32\x11.flame.TaskStatus\"R\n\x11\x41pplicationStatus\x12&\n\x05state\x18\x01 \x01(\x0e\x32\x17.flame.ApplicationState\x12\x15\n\rcreation_time\x18\x02 \x01(\x03\"*\n\x0b\x45nvironment\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t\"{\n\x11\x41pplicationSchema\x12\x12\n\x05input\x18\x01 \x01(\tH\x00\x88\x01\x01\x12\x13\n\x06output\x18\x02 \x01(\tH\x01\x88\x01\x01\x12\x18\n\x0b\x63ommon_data\x18\x03 \x01(\tH\x02\x88\x01\x01\x42\x08\n\x06_inputB\t\n\x07_outputB\x0e\n\x0c_common_data\"\xc9\x03\n\x0f\x41pplicationSpec\x12\x19\n\x04shim\x18\x01 \x01(\x0e\x32\x0b.flame.Shim\x12\x18\n\x0b\x64\x65scription\x18\x02 \x01(\tH\x00\x88\x01\x01\x12\x0e\n\x06labels\x18\x03 \x03(\t\x12\x12\n\x05image\x18\x04 \x01(\tH\x01\x88\x01\x01\x12\x14\n\x07\x63ommand\x18\x05 \x01(\tH\x02\x88\x01\x01\x12\x11\n\targuments\x18\x06 \x03(\t\x12(\n\x0c\x65nvironments\x18\x07 \x03(\x0b\x32\x12.flame.Environment\x12\x1e\n\x11working_directory\x18\x08 \x01(\tH\x03\x88\x01\x01\x12\x1a\n\rmax_instances\x18\t \x01(\rH\x04\x88\x01\x01\x12\x1a\n\rdelay_release\x18\n \x01(\x03H\x05\x88\x01\x01\x12-\n\x06schema\x18\x0b \x01(\x0b\x32\x18.flame.ApplicationSchemaH\x06\x88\x01\x01\x12\x10\n\x03url\x18\x0c \x01(\tH\x07\x88\x01\x01\x42\x0e\n\x0c_descriptionB\x08\n\x06_imageB\n\n\x08_commandB\x14\n\x12_working_directoryB\x10\n\x0e_max_instancesB\x10\n\x0e_delay_releaseB\t\n\x07_schemaB\x06\n\x04_url\"\x80\x01\n\x0b\x41pplication\x12!\n\x08metadata\x18\x01 \x01(\x0b\x32\x0f.flame.Metadata\x12$\n\x04spec\x18\x02 \x01(\x0b\x32\x16.flame.ApplicationSpec\x12(\n\x06status\x18\x03 \x01(\x0b\x32\x18.flame.ApplicationStatus\"W\n\x0c\x45xecutorSpec\x12\x0c\n\x04node\x18\x01 \x01(\t\x12*\n\x06resreq\x18\x02 \x01(\x0b\x32\x1a.flame.ResourceRequirement\x12\r\n\x05slots\x18\x03 \x01(\r\"]\n\x0e\x45xecutorStatus\x12#\n\x05state\x18\x01 \x01(\x0e\x32\x14.flame.ExecutorState\x12\x17\n\nsession_id\x18\x02 \x01(\tH\x00\x88\x01\x01\x42\r\n\x0b_session_id\"w\n\x08\x45xecutor\x12!\n\x08metadata\x18\x01 \x01(\x0b\x32\x0f.flame.Metadata\x12!\n\x04spec\x18\x02 \x01(\x0b\x32\x13.flame.ExecutorSpec\x12%\n\x06status\x18\x03 \x01(\x0b\x32\x15.flame.ExecutorStatus\"2\n\x0c\x45xecutorList\x12\"\n\texecutors\x18\x01 \x03(\x0b\x32\x0f.flame.Executor\"/\n\x0bSessionList\x12 \n\x08sessions\x18\x01 \x03(\x0b\x32\x0e.flame.Session\";\n\x0f\x41pplicationList\x12(\n\x0c\x61pplications\x18\x01 \x03(\x0b\x32\x12.flame.Application\"2\n\x13ResourceRequirement\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x04\x12\x0e\n\x06memory\x18\x02 \x01(\x04\"\n\n\x08NodeSpec\"$\n\x08NodeInfo\x12\x0c\n\x04\x61rch\x18\x01 \x01(\t\x12\n\n\x02os\x18\x02 \x01(\t\"\xab\x01\n\nNodeStatus\x12\x1f\n\x05state\x18\x01 \x01(\x0e\x32\x10.flame.NodeState\x12,\n\x08\x63\x61pacity\x18\x02 \x01(\x0b\x32\x1a.flame.ResourceRequirement\x12/\n\x0b\x61llocatable\x18\x03 \x01(\x0b\x32\x1a.flame.ResourceRequirement\x12\x1d\n\x04info\x18\x04 \x01(\x0b\x32\x0f.flame.NodeInfo\"k\n\x04Node\x12!\n\x08metadata\x18\x01 \x01(\x0b\x32\x0f.flame.Metadata\x12\x1d\n\x04spec\x18\x02 \x01(\x0b\x32\x0f.flame.NodeSpec\x12!\n\x06status\x18\x03 \x01(\x0b\x32\x11.flame.NodeStatus\"?\n\x06Result\x12\x13\n\x0breturn_code\x18\x01 \x01(\x05\x12\x14\n\x07message\x18\x02 \x01(\tH\x00\x88\x01\x01\x42\n\n\x08_message\"c\n\nTaskResult\x12\x13\n\x0breturn_code\x18\x01 \x01(\x05\x12\x13\n\x06output\x18\x02 \x01(\x0cH\x00\x88\x01\x01\x12\x14\n\x07message\x18\x03 \x01(\tH\x01\x88\x01\x01\x42\t\n\x07_outputB\n\n\x08_message\"\x0e\n\x0c\x45mptyRequest\"N\n\x05\x45vent\x12\x0c\n\x04\x63ode\x18\x01 \x01(\x05\x12\x14\n\x07message\x18\x02 \x01(\tH\x00\x88\x01\x01\x12\x15\n\rcreation_time\x18\x03 \x01(\x03\x42\n\n\x08_message*$\n\x0cSessionState\x12\x08\n\x04Open\x10\x00\x12\n\n\x06\x43losed\x10\x01*>\n\tTaskState\x12\x0b\n\x07Pending\x10\x00\x12\x0b\n\x07Running\x10\x01\x12\x0b\n\x07Succeed\x10\x02\x12\n\n\x06\x46\x61iled\x10\x03*\x1a\n\x04Shim\x12\x08\n\x04Host\x10\x00\x12\x08\n\x04Wasm\x10\x01*-\n\x10\x41pplicationState\x12\x0b\n\x07\x45nabled\x10\x00\x12\x0c\n\x08\x44isabled\x10\x01*\xb4\x01\n\rExecutorState\x12\x13\n\x0f\x45xecutorUnknown\x10\x00\x12\x10\n\x0c\x45xecutorVoid\x10\x01\x12\x10\n\x0c\x45xecutorIdle\x10\x02\x12\x13\n\x0f\x45xecutorBinding\x10\x03\x12\x11\n\rExecutorBound\x10\x04\x12\x15\n\x11\x45xecutorUnbinding\x10\x05\x12\x15\n\x11\x45xecutorReleasing\x10\x06\x12\x14\n\x10\x45xecutorReleased\x10\x07*1\n\tNodeState\x12\x0b\n\x07Unknown\x10\x00\x12\t\n\x05Ready\x10\x01\x12\x0c\n\x08NotReady\x10\x02\x42&Z$github.com/flame-sh/flame/sdk/go/rpcb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0btypes.proto\x12\x05\x66lame\"$\n\x08Metadata\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\"\xdd\x01\n\rSessionStatus\x12\"\n\x05state\x18\x01 \x01(\x0e\x32\x13.flame.SessionState\x12\x15\n\rcreation_time\x18\x02 \x01(\x03\x12\x1c\n\x0f\x63ompletion_time\x18\x03 \x01(\x03H\x00\x88\x01\x01\x12\x0f\n\x07pending\x18\x04 \x01(\x05\x12\x0f\n\x07running\x18\x05 \x01(\x05\x12\x0f\n\x07succeed\x18\x06 \x01(\x05\x12\x0e\n\x06\x66\x61iled\x18\x07 \x01(\x05\x12\x1c\n\x06\x65vents\x18\x08 \x03(\x0b\x32\x0c.flame.EventB\x12\n\x10_completion_time\"\xa0\x01\n\x0bSessionSpec\x12\x13\n\x0b\x61pplication\x18\x02 \x01(\t\x12\r\n\x05slots\x18\x03 \x01(\r\x12\x18\n\x0b\x63ommon_data\x18\x04 \x01(\x0cH\x00\x88\x01\x01\x12\x15\n\rmin_instances\x18\x05 \x01(\r\x12\x1a\n\rmax_instances\x18\x06 \x01(\rH\x01\x88\x01\x01\x42\x0e\n\x0c_common_dataB\x10\n\x0e_max_instances\"t\n\x07Session\x12!\n\x08metadata\x18\x01 \x01(\x0b\x32\x0f.flame.Metadata\x12 \n\x04spec\x18\x02 \x01(\x0b\x32\x12.flame.SessionSpec\x12$\n\x06status\x18\x03 \x01(\x0b\x32\x14.flame.SessionStatus\"\x94\x01\n\nTaskStatus\x12\x1f\n\x05state\x18\x01 \x01(\x0e\x32\x10.flame.TaskState\x12\x15\n\rcreation_time\x18\x02 \x01(\x03\x12\x1c\n\x0f\x63ompletion_time\x18\x03 \x01(\x03H\x00\x88\x01\x01\x12\x1c\n\x06\x65vents\x18\x04 \x03(\x0b\x32\x0c.flame.EventB\x12\n\x10_completion_time\"\\\n\x08TaskSpec\x12\x12\n\nsession_id\x18\x02 \x01(\t\x12\x12\n\x05input\x18\x03 \x01(\x0cH\x00\x88\x01\x01\x12\x13\n\x06output\x18\x04 \x01(\x0cH\x01\x88\x01\x01\x42\x08\n\x06_inputB\t\n\x07_output\"k\n\x04Task\x12!\n\x08metadata\x18\x01 \x01(\x0b\x32\x0f.flame.Metadata\x12\x1d\n\x04spec\x18\x02 \x01(\x0b\x32\x0f.flame.TaskSpec\x12!\n\x06status\x18\x03 \x01(\x0b\x32\x11.flame.TaskStatus\"R\n\x11\x41pplicationStatus\x12&\n\x05state\x18\x01 \x01(\x0e\x32\x17.flame.ApplicationState\x12\x15\n\rcreation_time\x18\x02 \x01(\x03\"*\n\x0b\x45nvironment\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t\"{\n\x11\x41pplicationSchema\x12\x12\n\x05input\x18\x01 \x01(\tH\x00\x88\x01\x01\x12\x13\n\x06output\x18\x02 \x01(\tH\x01\x88\x01\x01\x12\x18\n\x0b\x63ommon_data\x18\x03 \x01(\tH\x02\x88\x01\x01\x42\x08\n\x06_inputB\t\n\x07_outputB\x0e\n\x0c_common_data\"\xc9\x03\n\x0f\x41pplicationSpec\x12\x19\n\x04shim\x18\x01 \x01(\x0e\x32\x0b.flame.Shim\x12\x18\n\x0b\x64\x65scription\x18\x02 \x01(\tH\x00\x88\x01\x01\x12\x0e\n\x06labels\x18\x03 \x03(\t\x12\x12\n\x05image\x18\x04 \x01(\tH\x01\x88\x01\x01\x12\x14\n\x07\x63ommand\x18\x05 \x01(\tH\x02\x88\x01\x01\x12\x11\n\targuments\x18\x06 \x03(\t\x12(\n\x0c\x65nvironments\x18\x07 \x03(\x0b\x32\x12.flame.Environment\x12\x1e\n\x11working_directory\x18\x08 \x01(\tH\x03\x88\x01\x01\x12\x1a\n\rmax_instances\x18\t \x01(\rH\x04\x88\x01\x01\x12\x1a\n\rdelay_release\x18\n \x01(\x03H\x05\x88\x01\x01\x12-\n\x06schema\x18\x0b \x01(\x0b\x32\x18.flame.ApplicationSchemaH\x06\x88\x01\x01\x12\x10\n\x03url\x18\x0c \x01(\tH\x07\x88\x01\x01\x42\x0e\n\x0c_descriptionB\x08\n\x06_imageB\n\n\x08_commandB\x14\n\x12_working_directoryB\x10\n\x0e_max_instancesB\x10\n\x0e_delay_releaseB\t\n\x07_schemaB\x06\n\x04_url\"\x80\x01\n\x0b\x41pplication\x12!\n\x08metadata\x18\x01 \x01(\x0b\x32\x0f.flame.Metadata\x12$\n\x04spec\x18\x02 \x01(\x0b\x32\x16.flame.ApplicationSpec\x12(\n\x06status\x18\x03 \x01(\x0b\x32\x18.flame.ApplicationStatus\"W\n\x0c\x45xecutorSpec\x12\x0c\n\x04node\x18\x01 \x01(\t\x12*\n\x06resreq\x18\x02 \x01(\x0b\x32\x1a.flame.ResourceRequirement\x12\r\n\x05slots\x18\x03 \x01(\r\"]\n\x0e\x45xecutorStatus\x12#\n\x05state\x18\x01 \x01(\x0e\x32\x14.flame.ExecutorState\x12\x17\n\nsession_id\x18\x02 \x01(\tH\x00\x88\x01\x01\x42\r\n\x0b_session_id\"w\n\x08\x45xecutor\x12!\n\x08metadata\x18\x01 \x01(\x0b\x32\x0f.flame.Metadata\x12!\n\x04spec\x18\x02 \x01(\x0b\x32\x13.flame.ExecutorSpec\x12%\n\x06status\x18\x03 \x01(\x0b\x32\x15.flame.ExecutorStatus\"2\n\x0c\x45xecutorList\x12\"\n\texecutors\x18\x01 \x03(\x0b\x32\x0f.flame.Executor\"/\n\x0bSessionList\x12 \n\x08sessions\x18\x01 \x03(\x0b\x32\x0e.flame.Session\";\n\x0f\x41pplicationList\x12(\n\x0c\x61pplications\x18\x01 \x03(\x0b\x32\x12.flame.Application\"2\n\x13ResourceRequirement\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x04\x12\x0e\n\x06memory\x18\x02 \x01(\x04\"\n\n\x08NodeSpec\"$\n\x08NodeInfo\x12\x0c\n\x04\x61rch\x18\x01 \x01(\t\x12\n\n\x02os\x18\x02 \x01(\t\"\xab\x01\n\nNodeStatus\x12\x1f\n\x05state\x18\x01 \x01(\x0e\x32\x10.flame.NodeState\x12,\n\x08\x63\x61pacity\x18\x02 \x01(\x0b\x32\x1a.flame.ResourceRequirement\x12/\n\x0b\x61llocatable\x18\x03 \x01(\x0b\x32\x1a.flame.ResourceRequirement\x12\x1d\n\x04info\x18\x04 \x01(\x0b\x32\x0f.flame.NodeInfo\"k\n\x04Node\x12!\n\x08metadata\x18\x01 \x01(\x0b\x32\x0f.flame.Metadata\x12\x1d\n\x04spec\x18\x02 \x01(\x0b\x32\x0f.flame.NodeSpec\x12!\n\x06status\x18\x03 \x01(\x0b\x32\x11.flame.NodeStatus\"?\n\x06Result\x12\x13\n\x0breturn_code\x18\x01 \x01(\x05\x12\x14\n\x07message\x18\x02 \x01(\tH\x00\x88\x01\x01\x42\n\n\x08_message\"c\n\nTaskResult\x12\x13\n\x0breturn_code\x18\x01 \x01(\x05\x12\x13\n\x06output\x18\x02 \x01(\x0cH\x00\x88\x01\x01\x12\x14\n\x07message\x18\x03 \x01(\tH\x01\x88\x01\x01\x42\t\n\x07_outputB\n\n\x08_message\"\x0e\n\x0c\x45mptyRequest\"N\n\x05\x45vent\x12\x0c\n\x04\x63ode\x18\x01 \x01(\x05\x12\x14\n\x07message\x18\x02 \x01(\tH\x00\x88\x01\x01\x12\x15\n\rcreation_time\x18\x03 \x01(\x03\x42\n\n\x08_message*$\n\x0cSessionState\x12\x08\n\x04Open\x10\x00\x12\n\n\x06\x43losed\x10\x01*>\n\tTaskState\x12\x0b\n\x07Pending\x10\x00\x12\x0b\n\x07Running\x10\x01\x12\x0b\n\x07Succeed\x10\x02\x12\n\n\x06\x46\x61iled\x10\x03*\x1a\n\x04Shim\x12\x08\n\x04Host\x10\x00\x12\x08\n\x04Wasm\x10\x01*-\n\x10\x41pplicationState\x12\x0b\n\x07\x45nabled\x10\x00\x12\x0c\n\x08\x44isabled\x10\x01*\xb4\x01\n\rExecutorState\x12\x13\n\x0f\x45xecutorUnknown\x10\x00\x12\x10\n\x0c\x45xecutorVoid\x10\x01\x12\x10\n\x0c\x45xecutorIdle\x10\x02\x12\x13\n\x0f\x45xecutorBinding\x10\x03\x12\x11\n\rExecutorBound\x10\x04\x12\x15\n\x11\x45xecutorUnbinding\x10\x05\x12\x15\n\x11\x45xecutorReleasing\x10\x06\x12\x14\n\x10\x45xecutorReleased\x10\x07*1\n\tNodeState\x12\x0b\n\x07Unknown\x10\x00\x12\t\n\x05Ready\x10\x01\x12\x0c\n\x08NotReady\x10\x02\x42&Z$github.com/flame-sh/flame/sdk/go/rpcb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -32,70 +32,70 @@ if not _descriptor._USE_C_DESCRIPTORS: _globals['DESCRIPTOR']._loaded_options = None _globals['DESCRIPTOR']._serialized_options = b'Z$github.com/flame-sh/flame/sdk/go/rpc' - _globals['_SESSIONSTATE']._serialized_start=2807 - _globals['_SESSIONSTATE']._serialized_end=2843 - _globals['_TASKSTATE']._serialized_start=2845 - _globals['_TASKSTATE']._serialized_end=2907 - _globals['_SHIM']._serialized_start=2909 - _globals['_SHIM']._serialized_end=2935 - _globals['_APPLICATIONSTATE']._serialized_start=2937 - _globals['_APPLICATIONSTATE']._serialized_end=2982 - _globals['_EXECUTORSTATE']._serialized_start=2985 - _globals['_EXECUTORSTATE']._serialized_end=3165 - _globals['_NODESTATE']._serialized_start=3167 - _globals['_NODESTATE']._serialized_end=3216 + _globals['_SESSIONSTATE']._serialized_start=2877 + _globals['_SESSIONSTATE']._serialized_end=2913 + _globals['_TASKSTATE']._serialized_start=2915 + _globals['_TASKSTATE']._serialized_end=2977 + _globals['_SHIM']._serialized_start=2979 + _globals['_SHIM']._serialized_end=3005 + _globals['_APPLICATIONSTATE']._serialized_start=3007 + _globals['_APPLICATIONSTATE']._serialized_end=3052 + _globals['_EXECUTORSTATE']._serialized_start=3055 + _globals['_EXECUTORSTATE']._serialized_end=3235 + _globals['_NODESTATE']._serialized_start=3237 + _globals['_NODESTATE']._serialized_end=3286 _globals['_METADATA']._serialized_start=22 _globals['_METADATA']._serialized_end=58 _globals['_SESSIONSTATUS']._serialized_start=61 _globals['_SESSIONSTATUS']._serialized_end=282 - _globals['_SESSIONSPEC']._serialized_start=284 - _globals['_SESSIONSPEC']._serialized_end=375 - _globals['_SESSION']._serialized_start=377 - _globals['_SESSION']._serialized_end=493 - _globals['_TASKSTATUS']._serialized_start=496 - _globals['_TASKSTATUS']._serialized_end=644 - _globals['_TASKSPEC']._serialized_start=646 - _globals['_TASKSPEC']._serialized_end=738 - _globals['_TASK']._serialized_start=740 - _globals['_TASK']._serialized_end=847 - _globals['_APPLICATIONSTATUS']._serialized_start=849 - _globals['_APPLICATIONSTATUS']._serialized_end=931 - _globals['_ENVIRONMENT']._serialized_start=933 - _globals['_ENVIRONMENT']._serialized_end=975 - _globals['_APPLICATIONSCHEMA']._serialized_start=977 - _globals['_APPLICATIONSCHEMA']._serialized_end=1100 - _globals['_APPLICATIONSPEC']._serialized_start=1103 - _globals['_APPLICATIONSPEC']._serialized_end=1560 - _globals['_APPLICATION']._serialized_start=1563 - _globals['_APPLICATION']._serialized_end=1691 - _globals['_EXECUTORSPEC']._serialized_start=1693 - _globals['_EXECUTORSPEC']._serialized_end=1780 - _globals['_EXECUTORSTATUS']._serialized_start=1782 - _globals['_EXECUTORSTATUS']._serialized_end=1875 - _globals['_EXECUTOR']._serialized_start=1877 - _globals['_EXECUTOR']._serialized_end=1996 - _globals['_EXECUTORLIST']._serialized_start=1998 - _globals['_EXECUTORLIST']._serialized_end=2048 - _globals['_SESSIONLIST']._serialized_start=2050 - _globals['_SESSIONLIST']._serialized_end=2097 - _globals['_APPLICATIONLIST']._serialized_start=2099 - _globals['_APPLICATIONLIST']._serialized_end=2158 - _globals['_RESOURCEREQUIREMENT']._serialized_start=2160 - _globals['_RESOURCEREQUIREMENT']._serialized_end=2210 - _globals['_NODESPEC']._serialized_start=2212 - _globals['_NODESPEC']._serialized_end=2222 - _globals['_NODEINFO']._serialized_start=2224 - _globals['_NODEINFO']._serialized_end=2260 - _globals['_NODESTATUS']._serialized_start=2263 - _globals['_NODESTATUS']._serialized_end=2434 - _globals['_NODE']._serialized_start=2436 - _globals['_NODE']._serialized_end=2543 - _globals['_RESULT']._serialized_start=2545 - _globals['_RESULT']._serialized_end=2608 - _globals['_TASKRESULT']._serialized_start=2610 - _globals['_TASKRESULT']._serialized_end=2709 - _globals['_EMPTYREQUEST']._serialized_start=2711 - _globals['_EMPTYREQUEST']._serialized_end=2725 - _globals['_EVENT']._serialized_start=2727 - _globals['_EVENT']._serialized_end=2805 + _globals['_SESSIONSPEC']._serialized_start=285 + _globals['_SESSIONSPEC']._serialized_end=445 + _globals['_SESSION']._serialized_start=447 + _globals['_SESSION']._serialized_end=563 + _globals['_TASKSTATUS']._serialized_start=566 + _globals['_TASKSTATUS']._serialized_end=714 + _globals['_TASKSPEC']._serialized_start=716 + _globals['_TASKSPEC']._serialized_end=808 + _globals['_TASK']._serialized_start=810 + _globals['_TASK']._serialized_end=917 + _globals['_APPLICATIONSTATUS']._serialized_start=919 + _globals['_APPLICATIONSTATUS']._serialized_end=1001 + _globals['_ENVIRONMENT']._serialized_start=1003 + _globals['_ENVIRONMENT']._serialized_end=1045 + _globals['_APPLICATIONSCHEMA']._serialized_start=1047 + _globals['_APPLICATIONSCHEMA']._serialized_end=1170 + _globals['_APPLICATIONSPEC']._serialized_start=1173 + _globals['_APPLICATIONSPEC']._serialized_end=1630 + _globals['_APPLICATION']._serialized_start=1633 + _globals['_APPLICATION']._serialized_end=1761 + _globals['_EXECUTORSPEC']._serialized_start=1763 + _globals['_EXECUTORSPEC']._serialized_end=1850 + _globals['_EXECUTORSTATUS']._serialized_start=1852 + _globals['_EXECUTORSTATUS']._serialized_end=1945 + _globals['_EXECUTOR']._serialized_start=1947 + _globals['_EXECUTOR']._serialized_end=2066 + _globals['_EXECUTORLIST']._serialized_start=2068 + _globals['_EXECUTORLIST']._serialized_end=2118 + _globals['_SESSIONLIST']._serialized_start=2120 + _globals['_SESSIONLIST']._serialized_end=2167 + _globals['_APPLICATIONLIST']._serialized_start=2169 + _globals['_APPLICATIONLIST']._serialized_end=2228 + _globals['_RESOURCEREQUIREMENT']._serialized_start=2230 + _globals['_RESOURCEREQUIREMENT']._serialized_end=2280 + _globals['_NODESPEC']._serialized_start=2282 + _globals['_NODESPEC']._serialized_end=2292 + _globals['_NODEINFO']._serialized_start=2294 + _globals['_NODEINFO']._serialized_end=2330 + _globals['_NODESTATUS']._serialized_start=2333 + _globals['_NODESTATUS']._serialized_end=2504 + _globals['_NODE']._serialized_start=2506 + _globals['_NODE']._serialized_end=2613 + _globals['_RESULT']._serialized_start=2615 + _globals['_RESULT']._serialized_end=2678 + _globals['_TASKRESULT']._serialized_start=2680 + _globals['_TASKRESULT']._serialized_end=2779 + _globals['_EMPTYREQUEST']._serialized_start=2781 + _globals['_EMPTYREQUEST']._serialized_end=2795 + _globals['_EVENT']._serialized_start=2797 + _globals['_EVENT']._serialized_end=2875 # @@protoc_insertion_point(module_scope) diff --git a/sdk/python/src/flamepy/rl/__init__.py b/sdk/python/src/flamepy/rl/__init__.py index 59cd92cd..6bd9e636 100644 --- a/sdk/python/src/flamepy/rl/__init__.py +++ b/sdk/python/src/flamepy/rl/__init__.py @@ -13,7 +13,7 @@ from .runner import ObjectFuture, ObjectFutureIterator, Runner, RunnerService from .runpy import FlameRunpyService -from .types import RunnerContext, RunnerRequest, RunnerServiceKind +from .types import RunnerContext, RunnerRequest __all__ = [ "ObjectFuture", @@ -21,7 +21,6 @@ "Runner", "RunnerService", "FlameRunpyService", - "RunnerServiceKind", "RunnerContext", "RunnerRequest", ] diff --git a/sdk/python/src/flamepy/rl/runner.py b/sdk/python/src/flamepy/rl/runner.py index d3e94ead..4e8bef82 100644 --- a/sdk/python/src/flamepy/rl/runner.py +++ b/sdk/python/src/flamepy/rl/runner.py @@ -34,7 +34,6 @@ from flamepy.rl.types import ( RunnerContext, RunnerRequest, - RunnerServiceKind, ) logger = logging.getLogger(__name__) @@ -127,7 +126,7 @@ class RunnerService: _session: The Flame session for task execution """ - def __init__(self, app: str, execution_object: Any, kind: Optional[RunnerServiceKind] = None): + def __init__(self, app: str, execution_object: Any, stateful: bool = False, autoscale: bool = True): """Initialize a RunnerService. Args: @@ -135,7 +134,10 @@ def __init__(self, app: str, execution_object: Any, kind: Optional[RunnerService The associated service must be flamepy.rl.runpy. execution_object: The Python execution object to be managed and exposed as a remote service. - kind: The runner service kind, if specified. + stateful: If True, persist the execution object state back to flame-cache + after each task. If False, do not persist state. + autoscale: If True, create instances dynamically based on pending tasks (min=0, max=None). + If False, create exactly one instance (min=1, max=1). """ self._app = app self._execution_object = execution_object @@ -144,7 +146,7 @@ def __init__(self, app: str, execution_object: Any, kind: Optional[RunnerService # Create a session with flamepy.rl.runpy service # For RL module: serialize RunnerContext with cloudpickle, put in cache to get ObjectRef, # then encode ObjectRef to bytes for core API - runner_context = RunnerContext(execution_object=execution_object, kind=kind) + runner_context = RunnerContext(execution_object=execution_object, stateful=stateful, autoscale=autoscale) # Serialize the context using cloudpickle serialized_ctx = cloudpickle.dumps(runner_context, protocol=cloudpickle.DEFAULT_PROTOCOL) # Generate a temporary session_id for caching (will be regenerated by create_session if needed) @@ -153,9 +155,10 @@ def __init__(self, app: str, execution_object: Any, kind: Optional[RunnerService object_ref = put_object(temp_session_id, serialized_ctx) # Encode ObjectRef to bytes for core API common_data_bytes = object_ref.encode() - self._session = create_session(application=app, common_data=common_data_bytes, session_id=temp_session_id) + # Pass min_instances and max_instances from RunnerContext to create_session + self._session = create_session(application=app, common_data=common_data_bytes, session_id=temp_session_id, min_instances=runner_context.min_instances, max_instances=runner_context.max_instances) - logger.debug(f"Created RunnerService for app '{app}' with session '{self._session.id}'") + logger.debug(f"Created RunnerService for app '{app}' with session '{self._session.id}' (stateful={stateful}, autoscale={autoscale})") # Generate wrapper methods for all public methods of the execution object self._generate_wrappers() @@ -418,27 +421,48 @@ def __exit__(self, exc_type, exc_val, exc_tb) -> None: except Exception as e: logger.error(f"Error removing local package: {e}", exc_info=True) - def service(self, execution_object: Any, kind: Optional[RunnerServiceKind] = None) -> RunnerService: + def service(self, execution_object: Any, stateful: Optional[bool] = None, autoscale: Optional[bool] = None) -> RunnerService: """Create a RunnerService for the given execution object. - If execution_object is a class, it will be instantiated using its default - constructor. The resulting RunnerService exposes all callable methods. - Args: execution_object: A function, class, or class instance to expose as a service - kind: The runner service kind. If None, defaults based on execution_object + stateful: If True, persist the execution object state back to flame-cache + after each task. If False, do not persist state. If None, use default + based on execution_object type (default: False for all types). + autoscale: If True, create instances dynamically based on pending tasks (min=0, max=None). + If False, create exactly one instance (min=1, max=1). + If None, use default based on execution_object type. Returns: A RunnerService instance - """ - - # If it's a class, instantiate it - if inspect.isclass(execution_object): - logger.debug(f"Instantiating class {execution_object.__name__}") - execution_object = execution_object() - # Create the RunnerService - runner_service = RunnerService(self._name, execution_object, kind=kind) + Raises: + ValueError: If stateful=True is set for a class (only instances can be stateful) + """ + # Step 1: Determine execution object type + is_function = callable(execution_object) and not inspect.isclass(execution_object) + is_class = inspect.isclass(execution_object) + + # Step 2: Apply defaults if not specified + if stateful is None: + stateful = False # All types default to False + + if autoscale is None: + if is_function: + autoscale = True # Functions benefit from autoscaling + else: # class or instance + autoscale = False # Classes/instances typically want single instance + + # Step 3: Validation + if stateful and is_class: + raise ValueError("Cannot set stateful=True for a class. Classes themselves cannot maintain state; only instances can. Pass an instance instead, or set stateful=False.") + + # Step 4: Do NOT instantiate classes (keep as-is) + # The class will be instantiated on each executor in FlameRunpyService.on_session_enter + logger.debug(f"Creating service for {type(execution_object).__name__} (stateful={stateful}, autoscale={autoscale})") + + # Step 5: Create the RunnerService + runner_service = RunnerService(self._name, execution_object, stateful=stateful, autoscale=autoscale) self._services.append(runner_service) logger.debug(f"Created service for execution object in Runner '{self._name}'") diff --git a/sdk/python/src/flamepy/rl/runpy.py b/sdk/python/src/flamepy/rl/runpy.py index f0f5db45..bd512d58 100644 --- a/sdk/python/src/flamepy/rl/runpy.py +++ b/sdk/python/src/flamepy/rl/runpy.py @@ -12,6 +12,7 @@ """ import importlib +import inspect import logging import os import site @@ -27,7 +28,7 @@ from flamepy.core import ObjectRef, get_object, put_object, update_object from flamepy.core.service import FlameService, SessionContext, TaskContext from flamepy.core.types import TaskOutput -from flamepy.rl.types import RunnerContext, RunnerRequest, RunnerServiceKind +from flamepy.rl.types import RunnerContext, RunnerRequest logger = logging.getLogger(__name__) @@ -44,6 +45,8 @@ class FlameRunpyService(FlameService): def __init__(self): """Initialize the FlameRunpyService.""" self._ssn_ctx: SessionContext = None + self._execution_object: Any = None # Cached execution object + self._runner_context: RunnerContext = None # Configuration def _resolve_object_ref(self, value: Any) -> Any: """ @@ -223,6 +226,7 @@ def on_session_enter(self, context: SessionContext) -> bool: Handle session enter event. If the application URL is specified, install the package into the current .venv. + Loads the RunnerContext and execution object, instantiating classes if needed. Args: context: Session context containing application and session information @@ -245,7 +249,38 @@ def on_session_enter(self, context: SessionContext) -> bool: else: logger.info("No application URL specified, skipping package installation") - logger.info("Session entered successfully") + # Step 1: Load RunnerContext from common_data + common_data_bytes = context.common_data() + if common_data_bytes is None: + raise ValueError("Common data is None in session context") + + # Decode bytes to ObjectRef + object_ref = ObjectRef.decode(common_data_bytes) + # Get from cache + serialized_ctx = get_object(object_ref) + # Deserialize using cloudpickle + runner_context = cloudpickle.loads(serialized_ctx) + + if not isinstance(runner_context, RunnerContext): + raise ValueError(f"Expected RunnerContext in common_data, got {type(runner_context)}") + + # Step 2: Store configuration + self._runner_context = runner_context + + # Step 3: Load execution object + execution_object = runner_context.execution_object + if execution_object is None: + raise ValueError("Execution object is None in RunnerContext") + + # Step 4: If it's a class, instantiate it + if inspect.isclass(execution_object): + logger.info(f"Instantiating class {execution_object.__name__}") + execution_object = execution_object() # Use default constructor + + # Step 5: Store execution object for reuse + self._execution_object = execution_object + + logger.info(f"Session entered successfully, execution object loaded (stateful={runner_context.stateful}, autoscale={runner_context.autoscale})") return True def on_task_invoke(self, context: TaskContext) -> Optional[TaskOutput]: @@ -253,11 +288,12 @@ def on_task_invoke(self, context: TaskContext) -> Optional[TaskOutput]: Handle task invoke event. This method: - 1. Retrieves the execution object from session context + 1. Uses the cached execution object from on_session_enter 2. Deserializes the RunnerRequest from task input 3. Resolves any ObjectRef instances in args/kwargs 4. Executes the requested method on the execution object - 5. Returns the result as bytes + 5. Persists state if stateful=True + 6. Returns the result as bytes Args: context: Task context containing task ID, session ID, and input @@ -271,29 +307,14 @@ def on_task_invoke(self, context: TaskContext) -> Optional[TaskOutput]: logger.info(f"Invoking task: {context.task_id}") try: - # Get the execution object from session context - # For RL module: get bytes from core API, decode to ObjectRef, get from cache, then deserialize - common_data_bytes = self._ssn_ctx.common_data() - if common_data_bytes is None: - raise ValueError("Common data is None in session context") - - # Decode bytes to ObjectRef - object_ref = ObjectRef.decode(common_data_bytes) - # Get from cache - serialized_ctx = get_object(object_ref) - # Deserialize using cloudpickle - common_data = cloudpickle.loads(serialized_ctx) - - if not isinstance(common_data, RunnerContext): - raise ValueError(f"Expected RunnerContext in common_data, got {type(common_data)}") - - execution_object = common_data.execution_object + # Step 1: Use cached execution object (not from common_data) + execution_object = self._execution_object if execution_object is None: - raise ValueError("Execution object is None in RunnerContext") + raise ValueError("Execution object is None. Session may not have been entered properly.") logger.debug(f"Execution object type: {type(execution_object)}") - # Get the RunnerRequest from task input + # Step 2: Get the RunnerRequest from task input # For RL module: receive bytes from core API, deserialize with cloudpickle if context.input is None: raise ValueError("Task input is None") @@ -312,7 +333,7 @@ def on_task_invoke(self, context: TaskContext) -> Optional[TaskOutput]: logger.debug(f"RunnerRequest: method={request.method}, has_args={request.args is not None}, has_kwargs={request.kwargs is not None}") - # Resolve ObjectRef instances in args and kwargs + # Step 3: Resolve ObjectRef instances in args and kwargs invoke_args = () invoke_kwargs = {} @@ -332,7 +353,7 @@ def on_task_invoke(self, context: TaskContext) -> Optional[TaskOutput]: invoke_kwargs = {key: self._resolve_object_ref(value) for key, value in request.kwargs.items()} logger.debug(f"Resolved kwargs: {len(invoke_kwargs)} keyword arguments") - # Execute the requested method + # Step 4: Execute the requested method if request.method is None: # The execution object itself is callable if not callable(execution_object): @@ -354,25 +375,27 @@ def on_task_invoke(self, context: TaskContext) -> Optional[TaskOutput]: logger.info(f"Task {context.task_id} completed successfully") logger.debug(f"Result type: {type(result)}") - if common_data.kind == RunnerServiceKind.Stateless: - logger.debug("Skipping common data update for stateless runner") - else: - # Update common data with the modified execution object to persist state - # This is important for stateful classes where instance variables change - logger.debug("Updating common data with modified execution object") + # Step 5: Update execution object state if stateful + if self._runner_context.stateful: + logger.debug("Persisting execution object state") updated_context = RunnerContext( - execution_object=execution_object, - kind=common_data.kind, + execution_object=execution_object, # Updated object + stateful=self._runner_context.stateful, + autoscale=self._runner_context.autoscale, ) # For RL module: serialize RunnerContext with cloudpickle, update in cache to get ObjectRef, # then encode ObjectRef to bytes for core API serialized_ctx = cloudpickle.dumps(updated_context, protocol=cloudpickle.DEFAULT_PROTOCOL) - # Update existing ObjectRef with latest version in cache - object_ref = update_object(object_ref, serialized_ctx) - logger.debug("Common data updated successfully in cache") + # Get original ObjectRef and update it + common_data_bytes = self._ssn_ctx.common_data() + object_ref = ObjectRef.decode(common_data_bytes) + update_object(object_ref, serialized_ctx) + logger.debug("Execution object state persisted successfully in cache") + else: + logger.debug("Skipping state persistence for non-stateful service") - # Put the result into cache and return ObjectRef encoded as bytes + # Step 6: Put the result into cache and return ObjectRef encoded as bytes # This enables efficient data transfer for large objects logger.debug("Putting result into cache") result_object_ref = put_object(context.session_id, result) diff --git a/sdk/python/src/flamepy/rl/types.py b/sdk/python/src/flamepy/rl/types.py index 9bacc17e..cf6b4b09 100644 --- a/sdk/python/src/flamepy/rl/types.py +++ b/sdk/python/src/flamepy/rl/types.py @@ -12,18 +12,10 @@ """ import inspect -from dataclasses import dataclass -from enum import IntEnum +from dataclasses import dataclass, field from typing import Any, Dict, Optional, Tuple -class RunnerServiceKind(IntEnum): - """Runner service kind enumeration.""" - - Stateful = 0 - Stateless = 1 - - @dataclass class RunnerContext: """Context for runner session containing the shared execution object. @@ -33,21 +25,35 @@ class RunnerContext: Attributes: execution_object: The execution object for the customized session. This can be - any Python object (function, class instance, etc.) that will + any Python object (function, class, instance, etc.) that will be used to execute tasks within the session. - kind: The kind of the runner service, if specified. + stateful: If True, persist the execution object state back to flame-cache + after each task. If False, do not persist state. + autoscale: If True, create instances dynamically based on pending tasks (min=0, max=None). + If False, create exactly one instance (min=1, max=1). + min_instances: Minimum number of instances (computed from autoscale) + max_instances: Maximum number of instances (computed from autoscale) """ execution_object: Any - kind: Optional[RunnerServiceKind] = None + stateful: bool = False + autoscale: bool = True + min_instances: int = field(init=False, repr=False) + max_instances: Optional[int] = field(init=False, repr=False) def __post_init__(self) -> None: - if self.kind is not None: - return - if inspect.isfunction(self.execution_object) or inspect.isbuiltin(self.execution_object) or (inspect.isclass(self.execution_object) and self.execution_object.__module__ == "builtins"): - self.kind = RunnerServiceKind.Stateless + """Compute min/max instances and validate configuration.""" + # Compute min/max instances based on autoscale + if self.autoscale: + self.min_instances = 0 + self.max_instances = None # Unlimited else: - self.kind = RunnerServiceKind.Stateful + self.min_instances = 1 + self.max_instances = 1 # Single instance + + # Validation: classes cannot be stateful (only instances can) + if self.stateful and inspect.isclass(self.execution_object): + raise ValueError("Cannot set stateful=True for a class. Classes themselves cannot maintain state; only instances can. Pass an instance instead, or set stateful=False.") @dataclass diff --git a/sdk/rust/protos/types.proto b/sdk/rust/protos/types.proto index e943a8a8..8fde8d2d 100644 --- a/sdk/rust/protos/types.proto +++ b/sdk/rust/protos/types.proto @@ -32,6 +32,8 @@ message SessionSpec { string application = 2; uint32 slots = 3; optional bytes common_data = 4; + uint32 min_instances = 5; // Minimum number of instances (default: 0) + optional uint32 max_instances = 6; // Maximum number of instances (null means unlimited) } message Session { diff --git a/sdk/rust/src/client/mod.rs b/sdk/rust/src/client/mod.rs index b154dcbb..bb4f18de 100644 --- a/sdk/rust/src/client/mod.rs +++ b/sdk/rust/src/client/mod.rs @@ -73,6 +73,8 @@ pub struct SessionAttributes { pub slots: u32, #[serde(with = "serde_message")] pub common_data: Option, + pub min_instances: u32, + pub max_instances: Option, } #[derive(Clone, Serialize, Deserialize)] @@ -187,6 +189,8 @@ impl Connection { application: attrs.application.clone(), slots: attrs.slots, common_data: attrs.common_data.clone().map(CommonData::into), + min_instances: attrs.min_instances, + max_instances: attrs.max_instances, }), }; diff --git a/sdk/rust/tests/integration_test.rs b/sdk/rust/tests/integration_test.rs index 160ada11..2f846fee 100644 --- a/sdk/rust/tests/integration_test.rs +++ b/sdk/rust/tests/integration_test.rs @@ -59,6 +59,8 @@ async fn test_create_session() -> Result<(), FlameError> { application: FLAME_DEFAULT_APP.to_string(), slots: 1, common_data: None, + min_instances: 0, + max_instances: None, }; let ssn = conn.create_session(&ssn_attr).await?; @@ -81,6 +83,8 @@ async fn test_create_multiple_sessions() -> Result<(), FlameError> { application: FLAME_DEFAULT_APP.to_string(), slots: 1, common_data: None, + min_instances: 0, + max_instances: None, }; let ssn = conn.create_session(&ssn_attr).await?; @@ -101,6 +105,8 @@ async fn test_create_session_with_tasks() -> Result<(), FlameError> { application: FLAME_DEFAULT_APP.to_string(), slots: 1, common_data: None, + min_instances: 0, + max_instances: None, }; let ssn = conn.create_session(&ssn_attr).await?; @@ -159,6 +165,8 @@ async fn test_create_multiple_sessions_with_tasks() -> Result<(), FlameError> { application: FLAME_DEFAULT_APP.to_string(), slots: 1, common_data: None, + min_instances: 0, + max_instances: None, }; let ssn_1 = conn.create_session(&ssn_1_attr).await?; assert_eq!(ssn_1.state, SessionState::Open); @@ -168,6 +176,8 @@ async fn test_create_multiple_sessions_with_tasks() -> Result<(), FlameError> { application: FLAME_DEFAULT_APP.to_string(), slots: 1, common_data: None, + min_instances: 0, + max_instances: None, }; let ssn_2 = conn.create_session(&ssn_2_attr).await?; assert_eq!(ssn_2.state, SessionState::Open); diff --git a/session_manager/migrations/sqlite/20260123000000_add_session_instances.sql b/session_manager/migrations/sqlite/20260123000000_add_session_instances.sql new file mode 100644 index 00000000..da2709ec --- /dev/null +++ b/session_manager/migrations/sqlite/20260123000000_add_session_instances.sql @@ -0,0 +1,17 @@ +-- Add min_instances and max_instances columns to sessions table for RFE323 +-- This migration adds support for controlling the minimum and maximum number +-- of executor instances that can be allocated to a session. + +-- Add min_instances column (minimum number of instances to maintain) +-- Default is 0 (no minimum guarantee) +ALTER TABLE sessions +ADD COLUMN min_instances INTEGER NOT NULL DEFAULT 0; + +-- Add max_instances column (maximum number of instances allowed) +-- NULL means unlimited +ALTER TABLE sessions +ADD COLUMN max_instances INTEGER; + +-- Note: Existing sessions will automatically get default values: +-- - min_instances = 0 (no minimum) +-- - max_instances = NULL (unlimited) diff --git a/session_manager/src/apiserver/frontend.rs b/session_manager/src/apiserver/frontend.rs index dbc2e837..7866f1fb 100644 --- a/session_manager/src/apiserver/frontend.rs +++ b/session_manager/src/apiserver/frontend.rs @@ -13,7 +13,7 @@ limitations under the License. use std::pin::Pin; use async_trait::async_trait; -use common::apis::ApplicationAttributes; +use common::apis::{ApplicationAttributes, SessionAttributes}; use futures::Stream; use serde_json::Value; use stdng::trace_fn; @@ -255,18 +255,27 @@ impl Frontend for Flame { .parse::() .map_err(|_| Status::invalid_argument("invalid session id"))?; - let common_data = ssn_spec.common_data.map(apis::CommonData::from); + let attr = SessionAttributes { + id: ssn_id, + application: ssn_spec.application, + slots: ssn_spec.slots, + common_data: ssn_spec.common_data.map(apis::CommonData::from), + min_instances: ssn_spec.min_instances, + max_instances: ssn_spec.max_instances, + }; tracing::debug!( - "common_data: {:?}, application: {:?}, slots: {:?}", - common_data, - ssn_spec.application, - ssn_spec.slots + "Creating session with attributes: id={}, application={}, slots={}, min_instances={}, max_instances={:?}", + attr.id, + attr.application, + attr.slots, + attr.min_instances, + attr.max_instances ); let ssn = self .controller - .create_session(ssn_id, ssn_spec.application, ssn_spec.slots, common_data) + .create_session(attr) .await .map(Session::from) .map_err(Status::from)?; diff --git a/session_manager/src/controller/mod.rs b/session_manager/src/controller/mod.rs index 522e5ccd..87fd69d1 100644 --- a/session_manager/src/controller/mod.rs +++ b/session_manager/src/controller/mod.rs @@ -18,8 +18,8 @@ use std::task::{Context, Poll}; use common::apis::{ Application, ApplicationAttributes, ApplicationID, CommonData, Event, EventOwner, ExecutorID, - ExecutorState, Node, NodeState, Session, SessionID, SessionPtr, SessionState, Task, TaskGID, - TaskID, TaskInput, TaskOutput, TaskPtr, TaskResult, TaskState, + ExecutorState, Node, NodeState, Session, SessionAttributes, SessionID, SessionPtr, + SessionState, Task, TaskGID, TaskID, TaskInput, TaskOutput, TaskPtr, TaskResult, TaskState, }; use common::FlameError; @@ -58,17 +58,9 @@ impl Controller { self.storage.release_node(node_name).await } - pub async fn create_session( - &self, - id: SessionID, - app: String, - slots: u32, - common_data: Option, - ) -> Result { + pub async fn create_session(&self, attr: SessionAttributes) -> Result { trace_fn!("Controller::create_session"); - self.storage - .create_session(id, app, slots, common_data) - .await + self.storage.create_session(attr).await } pub async fn open_session(&self, id: SessionID) -> Result { diff --git a/session_manager/src/model/mod.rs b/session_manager/src/model/mod.rs index a3044fc5..d25f9554 100644 --- a/session_manager/src/model/mod.rs +++ b/session_manager/src/model/mod.rs @@ -99,6 +99,8 @@ pub struct SessionInfo { pub completion_time: Option>, pub state: SessionState, + pub min_instances: u32, // Minimum number of instances + pub max_instances: Option, // Maximum number of instances (None means unlimited) } #[derive(Clone, Debug, Default)] @@ -198,6 +200,8 @@ impl From<&Session> for SessionInfo { creation_time: ssn.creation_time, completion_time: ssn.completion_time, state: ssn.status.state, + min_instances: ssn.min_instances, // Map from Session + max_instances: ssn.max_instances, // Map from Session } } } diff --git a/session_manager/src/scheduler/actions/allocate.rs b/session_manager/src/scheduler/actions/allocate.rs index 84448fce..dfe3f9c8 100644 --- a/session_manager/src/scheduler/actions/allocate.rs +++ b/session_manager/src/scheduler/actions/allocate.rs @@ -66,6 +66,27 @@ impl Action for AllocateAction { continue; } + // Explicit max_instances check (safety guard) + // The fairshare plugin caches allocated count from snapshot at the start of the cycle. + // Within a single cycle, if we allocate multiple executors, the cached count doesn't update. + // To prevent over-allocation, we count actual executors from the current snapshot. + if let Some(max_instances) = ssn.max_instances { + let all_executors = ss.find_executors(None)?; + let current_count = all_executors + .values() + .filter(|e| e.ssn_id.as_ref() == Some(&ssn.id)) + .count(); + if current_count >= max_instances as usize { + tracing::debug!( + "Session <{}> has reached max_instances limit: {} >= {}", + ssn.id, + current_count, + max_instances + ); + continue; // Already at max limit + } + } + // If there're still some executors in pipeline, skip allocate new executor to the session. let pipelined_executors = ss.pipelined_executors(ssn.clone())?; if !pipelined_executors.is_empty() { diff --git a/session_manager/src/scheduler/mod.rs b/session_manager/src/scheduler/mod.rs index 1893b214..747ba265 100644 --- a/session_manager/src/scheduler/mod.rs +++ b/session_manager/src/scheduler/mod.rs @@ -169,12 +169,14 @@ mod tests { )?; tokio_test::block_on(controller.register_node(&new_test_node("node_1".to_string())))?; let ssn_1_id = format!("ssn-1-{}", Utc::now().timestamp()); - let ssn_1 = tokio_test::block_on(controller.create_session( - ssn_1_id.clone(), - "flmtest".to_string(), - 1, - None, - ))?; + let ssn_1 = tokio_test::block_on(controller.create_session(common::apis::SessionAttributes { + id: ssn_1_id.clone(), + application: "flmtest".to_string(), + slots: 1, + common_data: None, + min_instances: 0, + max_instances: None, + }))?; for _ in 0..task_num { tokio_test::block_on(controller.create_task(ssn_1.id.clone(), None))?; diff --git a/session_manager/src/scheduler/plugins/fairshare.rs b/session_manager/src/scheduler/plugins/fairshare.rs index 96b7e508..9ac4f840 100644 --- a/session_manager/src/scheduler/plugins/fairshare.rs +++ b/session_manager/src/scheduler/plugins/fairshare.rs @@ -30,6 +30,8 @@ struct SSNInfo { pub desired: f64, pub deserved: f64, pub allocated: f64, + pub min_instances: u32, // Minimum number of instances + pub max_instances: Option, // Maximum number of instances (None means unlimited) } impl Eq for SSNInfo {} @@ -133,14 +135,24 @@ impl Plugin for FairShare { } if let Some(app) = apps.get(&ssn.application) { - desired = desired.min((app.max_instances * ssn.slots) as f64); + // Cap desired by session's max_instances (already includes app limit from session creation) + if let Some(max_instances) = ssn.max_instances { + desired = desired.min((max_instances * ssn.slots) as f64); + } + + // Ensure desired is at least min_instances * slots (minimum guarantee) + let min_allocation = (ssn.min_instances * ssn.slots) as f64; + desired = desired.max(min_allocation); self.ssn_map.insert( ssn.id.clone(), SSNInfo { id: ssn.id.clone(), desired, + deserved: min_allocation, // Initialize to min_instances (guaranteed minimum) slots: ssn.slots, + min_instances: ssn.min_instances, + max_instances: ssn.max_instances, ..SSNInfo::default() }, ); @@ -169,6 +181,12 @@ impl Plugin for FairShare { ); } + // Reserve slots for guaranteed minimums before fair distribution + for ssn in self.ssn_map.values() { + let min_allocation = (ssn.min_instances * ssn.slots) as f64; + remaining_slots -= min_allocation; + } + let executors = ss.find_executors(ALL_EXECUTOR)?; for exe in executors.values() { if let Some(node) = self.node_map.get_mut(&exe.node) { diff --git a/session_manager/src/storage/engine/mod.rs b/session_manager/src/storage/engine/mod.rs index 81ba9dc3..f5c55169 100644 --- a/session_manager/src/storage/engine/mod.rs +++ b/session_manager/src/storage/engine/mod.rs @@ -17,8 +17,8 @@ use async_trait::async_trait; use crate::FlameError; use common::apis::{ - Application, ApplicationAttributes, ApplicationID, CommonData, Event, Session, SessionID, Task, - TaskGID, TaskInput, TaskOutput, TaskResult, TaskState, + Application, ApplicationAttributes, ApplicationID, CommonData, Event, Session, + SessionAttributes, SessionID, Task, TaskGID, TaskInput, TaskOutput, TaskResult, TaskState, }; mod sqlite; @@ -42,13 +42,7 @@ pub trait Engine: Send + Sync + 'static { async fn get_application(&self, id: ApplicationID) -> Result; async fn find_application(&self) -> Result, FlameError>; - async fn create_session( - &self, - id: SessionID, - app: String, - slots: u32, - common_data: Option, - ) -> Result; + async fn create_session(&self, attr: SessionAttributes) -> Result; async fn get_session(&self, id: SessionID) -> Result; async fn close_session(&self, id: SessionID) -> Result; async fn delete_session(&self, id: SessionID) -> Result; diff --git a/session_manager/src/storage/engine/sqlite.rs b/session_manager/src/storage/engine/sqlite.rs index 98c91339..29adb8b0 100644 --- a/session_manager/src/storage/engine/sqlite.rs +++ b/session_manager/src/storage/engine/sqlite.rs @@ -32,9 +32,9 @@ use stdng::{logs::TraceFn, trace_fn}; use common::{ apis::{ Application, ApplicationAttributes, ApplicationID, ApplicationSchema, ApplicationState, - CommonData, Event, Session, SessionID, SessionState, SessionStatus, Shim, Task, TaskGID, - TaskID, TaskInput, TaskOutput, TaskResult, TaskState, DEFAULT_DELAY_RELEASE, - DEFAULT_MAX_INSTANCES, + CommonData, Event, Session, SessionAttributes, SessionID, SessionState, SessionStatus, + Shim, Task, TaskGID, TaskID, TaskInput, TaskOutput, TaskResult, TaskState, + DEFAULT_DELAY_RELEASE, DEFAULT_MAX_INSTANCES, }, FlameError, }; @@ -367,38 +367,36 @@ impl Engine for SqliteEngine { .collect()) } - async fn create_session( - &self, - id: SessionID, - app: String, - slots: u32, - common_data: Option, - ) -> Result { + async fn create_session(&self, attr: SessionAttributes) -> Result { let mut tx = self .pool .begin() .await .map_err(|e| FlameError::Storage(e.to_string()))?; - let common_data: Option> = common_data.map(Bytes::into); - let sql = r#"INSERT INTO sessions (id, application, slots, common_data, creation_time, state) + let common_data: Option> = attr.common_data.map(Bytes::into); + let sql = r#"INSERT INTO sessions (id, application, slots, common_data, creation_time, state, min_instances, max_instances) VALUES ( ?, (SELECT name FROM applications WHERE name=? AND state=?), ?, ?, ?, + ?, + ?, ? ) RETURNING *"#; let ssn: SessionDao = sqlx::query_as(sql) - .bind(id.clone()) - .bind(app) + .bind(attr.id.clone()) + .bind(attr.application) .bind(ApplicationState::Enabled as i32) - .bind(slots) + .bind(attr.slots) .bind(common_data) .bind(Utc::now().timestamp()) .bind(SessionState::Open as i32) + .bind(attr.min_instances as i64) + .bind(attr.max_instances.map(|v| v as i64)) .fetch_one(&mut *tx) .await .map_err(|e| FlameError::Storage(e.to_string()))?; @@ -741,12 +739,14 @@ mod tests { } let ssn_1_id = format!("ssn-1-{}", Utc::now().timestamp()); - let ssn_1 = tokio_test::block_on(storage.create_session( - ssn_1_id.clone(), - "flmexec".to_string(), - 1, - None, - ))?; + let ssn_1 = tokio_test::block_on(storage.create_session(SessionAttributes { + id: ssn_1_id.clone(), + application: "flmexec".to_string(), + slots: 1, + common_data: None, + min_instances: 0, + max_instances: None, + }))?; assert_eq!(ssn_1.id, ssn_1_id); assert_eq!(ssn_1.application, "flmexec"); assert_eq!(ssn_1.status.state, SessionState::Open); @@ -847,12 +847,14 @@ mod tests { let ssn_1_id = format!("ssn-1-{}", Utc::now().timestamp()); - let ssn_1 = tokio_test::block_on(storage.create_session( - ssn_1_id.clone(), - "flmexec".to_string(), - 1, - None, - ))?; + let ssn_1 = tokio_test::block_on(storage.create_session(SessionAttributes { + id: ssn_1_id.clone(), + application: "flmexec".to_string(), + slots: 1, + common_data: None, + min_instances: 0, + max_instances: None, + }))?; assert_eq!(ssn_1.id, ssn_1_id); assert_eq!(ssn_1.application, "flmexec"); assert_eq!(ssn_1.status.state, SessionState::Open); @@ -1164,12 +1166,14 @@ mod tests { } let ssn_1_id = format!("ssn-1-{}", Utc::now().timestamp()); - let ssn_1 = tokio_test::block_on(storage.create_session( - ssn_1_id.clone(), - "flmexec".to_string(), - 1, - None, - ))?; + let ssn_1 = tokio_test::block_on(storage.create_session(SessionAttributes { + id: ssn_1_id.clone(), + application: "flmexec".to_string(), + slots: 1, + common_data: None, + min_instances: 0, + max_instances: None, + }))?; assert_eq!(ssn_1.id, ssn_1_id); assert_eq!(ssn_1.application, "flmexec"); @@ -1216,12 +1220,14 @@ mod tests { } let ssn_1_id = format!("ssn-1-{}", Utc::now().timestamp()); - let ssn_1 = tokio_test::block_on(storage.create_session( - ssn_1_id.clone(), - "flmexec".to_string(), - 1, - None, - ))?; + let ssn_1 = tokio_test::block_on(storage.create_session(SessionAttributes { + id: ssn_1_id.clone(), + application: "flmexec".to_string(), + slots: 1, + common_data: None, + min_instances: 0, + max_instances: None, + }))?; assert_eq!(ssn_1.id, ssn_1_id); assert_eq!(ssn_1.application, "flmexec"); @@ -1248,12 +1254,14 @@ mod tests { assert_eq!(task_1_2.state, TaskState::Succeed); let ssn_2_id = format!("ssn-2-{}", Utc::now().timestamp()); - let ssn_2 = tokio_test::block_on(storage.create_session( - ssn_2_id.clone(), - "flmping".to_string(), - 1, - None, - ))?; + let ssn_2 = tokio_test::block_on(storage.create_session(SessionAttributes { + id: ssn_2_id.clone(), + application: "flmping".to_string(), + slots: 1, + common_data: None, + min_instances: 0, + max_instances: None, + }))?; assert_eq!(ssn_2.id, ssn_2_id); assert_eq!(ssn_2.application, "flmping"); @@ -1301,12 +1309,14 @@ mod tests { tokio_test::block_on(storage.register_application(name.clone(), attr))?; } let ssn_1_id = format!("ssn-1-{}", Utc::now().timestamp()); - let ssn_1 = tokio_test::block_on(storage.create_session( - ssn_1_id.clone(), - "flmexec".to_string(), - 1, - None, - ))?; + let ssn_1 = tokio_test::block_on(storage.create_session(SessionAttributes { + id: ssn_1_id.clone(), + application: "flmexec".to_string(), + slots: 1, + common_data: None, + min_instances: 0, + max_instances: None, + }))?; assert_eq!(ssn_1.id, ssn_1_id); assert_eq!(ssn_1.application, "flmexec"); @@ -1336,12 +1346,14 @@ mod tests { tokio_test::block_on(storage.register_application(name.clone(), attr))?; } let ssn_1_id = format!("ssn-1-{}", Utc::now().timestamp()); - let ssn_1 = tokio_test::block_on(storage.create_session( - ssn_1_id.clone(), - "flmexec".to_string(), - 1, - None, - ))?; + let ssn_1 = tokio_test::block_on(storage.create_session(SessionAttributes { + id: ssn_1_id.clone(), + application: "flmexec".to_string(), + slots: 1, + common_data: None, + min_instances: 0, + max_instances: None, + }))?; assert_eq!(ssn_1.id, ssn_1_id); assert_eq!(ssn_1.application, "flmexec"); @@ -1377,12 +1389,14 @@ mod tests { tokio_test::block_on(storage.register_application(name.clone(), attr))?; } let ssn_1_id = format!("ssn-1-{}", Utc::now().timestamp()); - let ssn_1 = tokio_test::block_on(storage.create_session( - ssn_1_id.clone(), - "flmexec".to_string(), - 1, - None, - ))?; + let ssn_1 = tokio_test::block_on(storage.create_session(SessionAttributes { + id: ssn_1_id.clone(), + application: "flmexec".to_string(), + slots: 1, + common_data: None, + min_instances: 0, + max_instances: None, + }))?; assert_eq!(ssn_1.id, ssn_1_id); assert_eq!(ssn_1.application, "flmexec"); diff --git a/session_manager/src/storage/engine/types.rs b/session_manager/src/storage/engine/types.rs index cd2eb4ea..e3450257 100644 --- a/session_manager/src/storage/engine/types.rs +++ b/session_manager/src/storage/engine/types.rs @@ -72,6 +72,8 @@ pub struct SessionDao { pub completion_time: Option, pub state: i32, + pub min_instances: i64, // Minimum number of instances + pub max_instances: Option, // Maximum number of instances (NULL means unlimited) } #[derive(Clone, FromRow, Debug)] @@ -113,6 +115,8 @@ impl TryFrom<&SessionDao> for Session { state: ssn.state.try_into()?, }, events: vec![], + min_instances: ssn.min_instances as u32, // Convert i64 to u32 + max_instances: ssn.max_instances.map(|v| v as u32), // Convert Option to Option }) } } diff --git a/session_manager/src/storage/mod.rs b/session_manager/src/storage/mod.rs index 28bfce97..b251bba0 100644 --- a/session_manager/src/storage/mod.rs +++ b/session_manager/src/storage/mod.rs @@ -21,9 +21,9 @@ use stdng::{lock_ptr, logs::TraceFn, trace_fn, MutexPtr}; use common::apis::{ Application, ApplicationAttributes, ApplicationID, ApplicationPtr, CommonData, Event, - EventOwner, ExecutorID, ExecutorState, Node, NodePtr, ResourceRequirement, Session, SessionID, - SessionPtr, SessionState, Task, TaskGID, TaskID, TaskInput, TaskOutput, TaskPtr, TaskResult, - TaskState, + EventOwner, ExecutorID, ExecutorState, Node, NodePtr, ResourceRequirement, Session, + SessionAttributes, SessionID, SessionPtr, SessionState, Task, TaskGID, TaskID, TaskInput, + TaskOutput, TaskPtr, TaskResult, TaskState, }; use common::ctx::FlameClusterContext; use common::FlameError; @@ -185,18 +185,9 @@ impl Storage { Ok(()) } - pub async fn create_session( - &self, - id: SessionID, - app: String, - slots: u32, - common_data: Option, - ) -> Result { + pub async fn create_session(&self, attr: SessionAttributes) -> Result { trace_fn!("Storage::create_session"); - let ssn = self - .engine - .create_session(id, app, slots, common_data) - .await?; + let ssn = self.engine.create_session(attr).await?; let mut ssn_map = lock_ptr!(self.sessions)?; ssn_map.insert(ssn.id.clone(), SessionPtr::new(ssn.clone().into()));