Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ jobs:
run: mypy py_src/taskito/

rust-test:
needs: lint
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6
Expand All @@ -63,14 +62,16 @@ jobs:

- name: Rust cache
uses: Swatinem/rust-cache@v2.8.2
with:
save-if: false

- name: Run Rust tests
run: cargo test --workspace
env:
LD_LIBRARY_PATH: ${{ env.pythonLocation }}/lib

test:
needs: [lint, rust-test]
needs: lint
runs-on: ${{ matrix.os }}
strategy:
matrix:
Expand Down Expand Up @@ -98,6 +99,8 @@ jobs:

- name: Rust cache
uses: Swatinem/rust-cache@v2.8.2
with:
save-if: ${{ matrix.os != 'ubuntu-latest' }}

- name: Create virtualenv (Unix)
if: runner.os != 'Windows'
Expand Down
23 changes: 23 additions & 0 deletions .github/workflows/cleanup.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
name: Cleanup PR caches

on:
pull_request:
types: [closed]

jobs:
cleanup:
runs-on: ubuntu-latest
permissions:
actions: write
steps:
- uses: actions/checkout@v6

- name: Delete PR branch caches
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
BRANCH: refs/pull/${{ github.event.pull_request.number }}/merge
run: |
echo "Deleting caches for branch: $BRANCH"
gh cache list --ref "$BRANCH" --json id --jq '.[].id' |
xargs -I {} gh cache delete {} --repo "${{ github.repository }}" || true
echo "Done"
2 changes: 1 addition & 1 deletion crates/taskito-async/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "taskito-async"
version = "0.5.0"
version = "0.6.0"
edition = "2021"

[dependencies]
Expand Down
1 change: 1 addition & 0 deletions crates/taskito-async/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ impl WorkerDispatcher for NativeAsyncPool {
task_name: job.task_name.clone(),
wall_time_ns: 0,
should_retry: true,
timed_out: false,
});
}
});
Expand Down
1 change: 1 addition & 0 deletions crates/taskito-async/src/result_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ impl PyResultSender {
task_name,
wall_time_ns,
should_retry,
timed_out: false,
});
}

Expand Down
9 changes: 8 additions & 1 deletion crates/taskito-async/src/task_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub fn execute_sync_task(
task_name,
wall_time_ns,
should_retry,
timed_out: false,
}
}
}
Expand Down Expand Up @@ -115,8 +116,14 @@ fn run_task(py: Python<'_>, task_registry: &PyObject, job: &Job) -> PyResult<Opt
)?;

let result = (|| -> PyResult<Bound<'_, pyo3::PyAny>> {
// Deserialize arguments using per-task or queue-level serializer
let payload_bytes = PyBytes::new_bound(py, &job.payload);
let unpickled = cloudpickle.call_method1("loads", (payload_bytes,))?;
let queue_ref = context_mod.getattr("_queue_ref")?;
let unpickled = if !queue_ref.is_none() {
queue_ref.call_method1("_deserialize_payload", (&job.task_name, &payload_bytes))?
} else {
cloudpickle.call_method1("loads", (&payload_bytes,))?
};
let args_tuple: Bound<'_, PyTuple> = unpickled.downcast_into()?;

if args_tuple.len() != 2 {
Expand Down
2 changes: 1 addition & 1 deletion crates/taskito-core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "taskito-core"
version = "0.5.0"
version = "0.6.0"
edition = "2021"

[features]
Expand Down
3 changes: 2 additions & 1 deletion crates/taskito-core/src/scheduler/maintenance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@ impl Scheduler {

for job in stale_jobs {
let error = format!("job timed out after {}ms", job.timeout_ms);
self.handle_result(JobResult::Failure {
let _ = self.handle_result(JobResult::Failure {
job_id: job.id.clone(),
error,
retry_count: job.retry_count,
max_retries: job.max_retries,
task_name: job.task_name.clone(),
wall_time_ns: 0,
should_retry: true,
timed_out: true,
})?;
}

Expand Down
47 changes: 47 additions & 0 deletions crates/taskito-core/src/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub enum JobResult {
task_name: String,
wall_time_ns: i64,
should_retry: bool,
timed_out: bool,
},
Cancelled {
job_id: String,
Expand All @@ -68,12 +69,45 @@ pub enum JobResult {
},
}

/// Outcome of processing a job result, returned to the caller for
/// Python-side middleware hook dispatch.
#[derive(Debug, Clone)]
pub enum ResultOutcome {
/// Task completed successfully.
Success { job_id: String, task_name: String },
/// Task failed and will be retried.
Retry {
job_id: String,
task_name: String,
error: String,
retry_count: i32,
timed_out: bool,
},
/// Task exhausted retries and moved to the dead-letter queue.
DeadLettered {
job_id: String,
task_name: String,
error: String,
timed_out: bool,
},
/// Task was cancelled during execution.
Cancelled { job_id: String, task_name: String },
}

/// Per-task configuration for retry, rate limiting, and circuit breaker.
#[derive(Debug, Clone)]
pub struct TaskConfig {
pub retry_policy: RetryPolicy,
pub rate_limit: Option<crate::resilience::rate_limiter::RateLimitConfig>,
pub circuit_breaker: Option<CircuitBreakerConfig>,
pub max_concurrent: Option<i32>,
}

/// Per-queue configuration for rate limiting and concurrency caps.
#[derive(Debug, Clone)]
pub struct QueueConfig {
pub rate_limit: Option<crate::resilience::rate_limiter::RateLimitConfig>,
pub max_concurrent: Option<i32>,
}

/// The central scheduler that coordinates job dispatch, retries, rate limiting, and circuit breakers.
Expand All @@ -83,6 +117,7 @@ pub struct Scheduler {
dlq: DeadLetterQueue,
circuit_breaker: CircuitBreaker,
task_configs: HashMap<String, TaskConfig>,
queue_configs: HashMap<String, QueueConfig>,
queues: Vec<String>,
config: SchedulerConfig,
shutdown: Arc<Notify>,
Expand All @@ -109,6 +144,7 @@ impl Scheduler {
dlq,
circuit_breaker,
task_configs: HashMap::new(),
queue_configs: HashMap::new(),
queues,
config,
shutdown: Arc::new(Notify::new()),
Expand All @@ -124,6 +160,10 @@ impl Scheduler {
self.shutdown.clone()
}

pub fn register_queue_config(&mut self, queue_name: String, config: QueueConfig) {
self.queue_configs.insert(queue_name, config);
}

pub fn register_task(&mut self, task_name: String, config: TaskConfig) {
if let Some(ref cb_config) = config.circuit_breaker {
if let Err(e) = self.circuit_breaker.register(&task_name, cb_config) {
Expand Down Expand Up @@ -276,6 +316,7 @@ mod tests {
},
rate_limit: None,
circuit_breaker: None,
max_concurrent: None,
},
);

Expand All @@ -290,6 +331,7 @@ mod tests {
task_name: "retry_task".to_string(),
wall_time_ns: 500_000,
should_retry: true,
timed_out: false,
})
.unwrap();

Expand All @@ -312,6 +354,7 @@ mod tests {
task_name: "exhausted_task".to_string(),
wall_time_ns: 100,
should_retry: true,
timed_out: false,
})
.unwrap();

Expand All @@ -337,6 +380,7 @@ mod tests {
task_name: "no_retry_task".to_string(),
wall_time_ns: 100,
should_retry: false,
timed_out: false,
})
.unwrap();

Expand Down Expand Up @@ -373,6 +417,7 @@ mod tests {
refill_rate: 0.0,
}),
circuit_breaker: None,
max_concurrent: None,
},
);

Expand Down Expand Up @@ -414,6 +459,7 @@ mod tests {
retry_policy: RetryPolicy::default(),
rate_limit: None,
circuit_breaker: Some(cb_config),
max_concurrent: None,
},
);

Expand Down Expand Up @@ -452,6 +498,7 @@ mod tests {
},
rate_limit: None,
circuit_breaker: None,
max_concurrent: None,
},
);

Expand Down
33 changes: 33 additions & 0 deletions crates/taskito-core/src/scheduler/poller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ const CIRCUIT_BREAKER_RETRY_DELAY_MS: i64 = 5000;
/// Delay before re-scheduling a rate-limited job (ms).
const RATE_LIMIT_RETRY_DELAY_MS: i64 = 1000;

/// Delay before re-scheduling a concurrency-limited job (ms).
const CONCURRENCY_RETRY_DELAY_MS: i64 = 500;

impl Scheduler {
pub(super) fn try_dispatch(&self, job_tx: &tokio::sync::mpsc::Sender<Job>) -> Result<bool> {
let now = now_millis();
Expand Down Expand Up @@ -53,6 +56,26 @@ impl Scheduler {
None => return Ok(false),
};

// Check queue-level limits
if let Some(qcfg) = self.queue_configs.get(&job.queue) {
if let Some(ref rl_config) = qcfg.rate_limit {
let key = format!("queue:{}", job.queue);
if !self.rate_limiter.try_acquire(&key, rl_config)? {
self.storage
.retry(&job.id, now + RATE_LIMIT_RETRY_DELAY_MS)?;
return Ok(false);
}
}
if let Some(max_conc) = qcfg.max_concurrent {
let stats = self.storage.stats_by_queue(&job.queue)?;
if stats.running >= max_conc as i64 {
self.storage
.retry(&job.id, now + CONCURRENCY_RETRY_DELAY_MS)?;
return Ok(false);
}
}
}

// Check circuit breaker for this task
if let Some(config) = self.task_configs.get(&job.task_name) {
if config.circuit_breaker.is_some() && !self.circuit_breaker.allow(&job.task_name)? {
Expand All @@ -68,6 +91,16 @@ impl Scheduler {
return Ok(false);
}
}

// Check per-task concurrency limit
if let Some(max_conc) = config.max_concurrent {
let running = self.storage.count_running_by_task(&job.task_name)?;
if running >= max_conc as i64 {
self.storage
.retry(&job.id, now + CONCURRENCY_RETRY_DELAY_MS)?;
return Ok(false);
}
}
}

// Claim exactly-once execution
Expand Down
Loading
Loading