Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 44 additions & 61 deletions crates/belt-daemon/src/cron.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,12 +358,18 @@ impl CronEngine {
}
};

let builtin_names: &[&str] = &[
let global_builtin_names: &[&str] = &[
"hitl_timeout",
"daily_report",
"log_cleanup",
"evaluate",
"pr_review_scan",
];

let workspace_builtin_names: &[&str] = &[
"hitl_timeout",
"daily_report",
"log_cleanup",
"evaluate",
"gap_detection",
"knowledge_extraction",
];
Expand All @@ -372,11 +378,11 @@ impl CronEngine {
let custom_db_jobs: Vec<&belt_infra::db::CronJob> = db_jobs
.iter()
.filter(|j| {
if builtin_names.contains(&j.name.as_str()) {
if global_builtin_names.contains(&j.name.as_str()) {
return false;
}
if j.name.contains(':')
&& builtin_names
&& workspace_builtin_names
.iter()
.any(|b| j.name.ends_with(&format!(":{b}")))
{
Expand All @@ -397,11 +403,11 @@ impl CronEngine {
.iter()
.filter(|j| {
let name = j.name.as_str();
if builtin_names.contains(&name) {
if global_builtin_names.contains(&name) {
return false;
}
if name.contains(':')
&& builtin_names
&& workspace_builtin_names
.iter()
.any(|b| name.ends_with(&format!(":{b}")))
{
Expand Down Expand Up @@ -2985,14 +2991,6 @@ pub fn builtin_jobs(deps: BuiltinJobDeps) -> Vec<CronJobDef> {
worktree_mgr: Arc::clone(&deps.worktree_mgr),
}),
},
CronJobDef {
name: "evaluate".to_string(),
schedule: CronSchedule::Interval(Duration::from_secs(60)),
workspace: None,
enabled: true,
last_run_at: None,
handler: Box::new(EvaluateJob::new(Arc::clone(&deps.db))),
},
CronJobDef {
name: "pr_review_scan".to_string(),
schedule: CronSchedule::Interval(Duration::from_secs(5 * 60)),
Expand All @@ -3003,25 +3001,6 @@ pub fn builtin_jobs(deps: BuiltinJobDeps) -> Vec<CronJobDef> {
db: Arc::clone(&deps.db),
}),
},
CronJobDef {
name: "gap_detection".to_string(),
schedule: CronSchedule::Interval(Duration::from_secs(3600)),
workspace: None,
enabled: true,
last_run_at: None,
handler: Box::new(GapDetectionJob::new(
Arc::clone(&deps.db),
deps.workspace_root.clone(),
)),
},
CronJobDef {
name: "knowledge_extraction".to_string(),
schedule: CronSchedule::Interval(Duration::from_secs(3600)),
workspace: None,
enabled: true,
last_run_at: None,
handler: Box::new(KnowledgeExtractionJob::new(Arc::clone(&deps.db))),
},
]
}

Expand Down Expand Up @@ -3117,24 +3096,30 @@ pub fn load_custom_jobs(engine: &mut CronEngine, db: &Arc<Database>) {
}
};

let builtin_names = [
let global_builtin_names = [
"hitl_timeout",
"daily_report",
"log_cleanup",
"evaluate",
"pr_review_scan",
];

let workspace_builtin_names = [
"hitl_timeout",
"daily_report",
"log_cleanup",
"evaluate",
"gap_detection",
"knowledge_extraction",
];

for job in jobs {
// Skip built-in jobs (they are registered separately).
if builtin_names.contains(&job.name.as_str()) {
// Skip global built-in jobs (they are registered separately).
if global_builtin_names.contains(&job.name.as_str()) {
continue;
}
// Skip workspace-scoped built-in jobs (e.g. "billing:hitl_timeout").
if job.name.contains(':')
&& builtin_names
&& workspace_builtin_names
.iter()
.any(|b| job.name.ends_with(&format!(":{b}")))
{
Expand Down Expand Up @@ -3187,9 +3172,9 @@ pub fn load_custom_jobs(engine: &mut CronEngine, db: &Arc<Database>) {
/// - `HitlTimeoutJob` — every 1 hour
/// - `DailyReportJob` — every 24 hours
/// - `LogCleanupJob` — every 6 hours
/// - `EvaluateJob` — every 6 hours
/// - `GapDetectionJob` — every 12 hours
/// - `KnowledgeExtractionJob` — every 24 hours
/// - `EvaluateJob` — every 60 seconds
/// - `GapDetectionJob` — every 1 hour
/// - `KnowledgeExtractionJob` — every 1 hour
///
/// The `deps` parameter provides the shared dependencies (DB, worktree manager,
/// belt home, workspace name) used to initialise each job handler.
Expand Down Expand Up @@ -3234,7 +3219,7 @@ pub fn seed_workspace_crons(engine: &mut CronEngine, workspace: &str, deps: Buil

engine.register(CronJobDef {
name: format!("{ws}:evaluate"),
schedule: CronSchedule::Interval(Duration::from_secs(21600)), // every 6 hours
schedule: CronSchedule::Interval(Duration::from_secs(60)), // every 60 seconds
workspace: Some(ws.clone()),
enabled: true,
last_run_at: None,
Expand All @@ -3243,7 +3228,7 @@ pub fn seed_workspace_crons(engine: &mut CronEngine, workspace: &str, deps: Buil

engine.register(CronJobDef {
name: format!("{ws}:gap_detection"),
schedule: CronSchedule::Interval(Duration::from_secs(43200)), // every 12 hours
schedule: CronSchedule::Interval(Duration::from_secs(3600)), // every 1 hour
workspace: Some(ws.clone()),
enabled: true,
last_run_at: None,
Expand All @@ -3255,7 +3240,7 @@ pub fn seed_workspace_crons(engine: &mut CronEngine, workspace: &str, deps: Buil

engine.register(CronJobDef {
name: format!("{ws}:knowledge_extraction"),
schedule: CronSchedule::Interval(Duration::from_secs(86400)), // every 24 hours
schedule: CronSchedule::Interval(Duration::from_secs(3600)), // every 1 hour
workspace: Some(ws.clone()),
enabled: true,
last_run_at: None,
Expand Down Expand Up @@ -3475,16 +3460,13 @@ mod tests {
fn builtin_jobs_are_valid() {
let deps = make_test_deps();
let jobs = builtin_jobs(deps);
assert_eq!(jobs.len(), 7);
assert_eq!(jobs.len(), 4);

let names: Vec<&str> = jobs.iter().map(|j| j.name.as_str()).collect();
assert!(names.contains(&"hitl_timeout"));
assert!(names.contains(&"daily_report"));
assert!(names.contains(&"log_cleanup"));
assert!(names.contains(&"evaluate"));
assert!(names.contains(&"pr_review_scan"));
assert!(names.contains(&"gap_detection"));
assert!(names.contains(&"knowledge_extraction"));
}

#[test]
Expand Down Expand Up @@ -3852,8 +3834,13 @@ fn middleware(request: Request, secret: &[u8], rules: &[ValidationRule]) -> Resp

#[test]
fn gap_detection_has_hourly_schedule() {
let jobs = builtin_jobs(make_test_deps());
let gap = jobs.iter().find(|j| j.name == "gap_detection").unwrap();
let mut engine = CronEngine::new();
seed_workspace_crons(&mut engine, "test", make_test_deps());
let gap = engine
.jobs
.iter()
.find(|j| j.name == "test:gap_detection")
.unwrap();
match &gap.schedule {
CronSchedule::Interval(d) => assert_eq!(d.as_secs(), 3600),
_ => panic!("expected Interval schedule"),
Expand All @@ -3862,10 +3849,12 @@ fn middleware(request: Request, secret: &[u8], rules: &[ValidationRule]) -> Resp

#[test]
fn knowledge_extraction_has_hourly_schedule() {
let jobs = builtin_jobs(make_test_deps());
let ke = jobs
let mut engine = CronEngine::new();
seed_workspace_crons(&mut engine, "test", make_test_deps());
let ke = engine
.jobs
.iter()
.find(|j| j.name == "knowledge_extraction")
.find(|j| j.name == "test:knowledge_extraction")
.unwrap();
match &ke.schedule {
CronSchedule::Interval(d) => assert_eq!(d.as_secs(), 3600),
Expand Down Expand Up @@ -5076,26 +5065,20 @@ fn middleware(request: Request, secret: &[u8], rules: &[ValidationRule]) -> Resp
#[test]
fn load_custom_jobs_skips_builtin_names() {
let db = Arc::new(Database::open_in_memory().unwrap());
// Add jobs with builtin names — should be skipped.
// Add jobs with global builtin names — should be skipped.
db.add_cron_job("hitl_timeout", "*/5 * * * *", "/bin/noop.sh", None)
.unwrap();
db.add_cron_job("daily_report", "0 6 * * *", "/bin/noop.sh", None)
.unwrap();
db.add_cron_job("log_cleanup", "0 */6 * * *", "/bin/noop.sh", None)
.unwrap();
db.add_cron_job("evaluate", "* * * * *", "/bin/noop.sh", None)
.unwrap();
db.add_cron_job("pr_review_scan", "*/30 * * * *", "/bin/noop.sh", None)
.unwrap();
db.add_cron_job("gap_detection", "0 * * * *", "/bin/noop.sh", None)
.unwrap();
db.add_cron_job("knowledge_extraction", "0 * * * *", "/bin/noop.sh", None)
.unwrap();

let mut engine = CronEngine::new();
load_custom_jobs(&mut engine, &db);

// All builtin names should be skipped.
// All global builtin names should be skipped.
assert_eq!(engine.job_count(), 0);
}

Expand Down
Loading