Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "tasklet"
version = "0.2.8"
version = "0.2.9"
authors = ["Stavros Grigoriou <unix121@protonmail.com>"]
edition = "2021"
rust-version = "1.85.1"
Expand All @@ -12,11 +12,11 @@ keywords = ["cron", "scheduling", "tasks", "tasklet", "async"]

[dependencies]
cron = "0.15.0"
chrono = "0.4.40"
time = "0.3.41"
chrono = "0.4.41"
log = "0.4.27"
tokio = { version = "1.44.1", features = ["full"] }
tokio = { version = "1.45.1", features = ["full"] }
futures = "0.3.31"
thiserror = "2.0.12"

[dev-dependencies]
simple_logger = "5.0.0"
Expand Down
20 changes: 10 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,22 @@ in order to run tasks asynchronously.

## Dependencies

| library | version |
|---------|---------|
| cron | 0.15.0 |
| chrono | 0.4.40 |
| time | 0.3.41 |
| log | 0.4.27 |
| tokio | 1.44.1 |
| futures | 0.3.31 |
| library | version |
|-----------|---------|
| cron | 0.15.0 |
| chrono | 0.4.41 |
| log | 0.4.27 |
| tokio | 1.45.1 |
| futures | 0.3.31 |
| thiserror | 2.0.12 |

## How to use this library

In your `Cargo.toml` add:

```
[dependencies]
tasklet = "0.2.8"
tasklet = "0.2.9"
```

## Example
Expand Down Expand Up @@ -64,7 +64,7 @@ async fn main() {
// Create a task with 2 steps and add it to the scheduler.
// The second step fails every second execution.
// Append the task to the scheduler.
scheduler.add_task(
let _ = scheduler.add_task(
TaskBuilder::new(chrono::Local)
.every("1 * * * * * *")
.description("A simple task")
Expand Down
2 changes: 1 addition & 1 deletion examples/force_remove_example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ async fn main() {
// Create a task with 2 steps and add it to the scheduler.
// The second step fails every second execution.
// Append the task to the scheduler.
scheduler.add_task(
let _ = scheduler.add_task(
TaskBuilder::new(chrono::Local)
.every("0,5,10,15,20,25,30,35,40,45,50,55 * * * * * *")
.description("A simple task")
Expand Down
2 changes: 1 addition & 1 deletion examples/one_task_example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ async fn main() {
// Create a task with 2 steps and add it to the scheduler.
// The second step fails every second execution.
// Append the task to the scheduler.
scheduler.add_task(
let _ = scheduler.add_task(
TaskBuilder::new(chrono::Local)
.every("1 * * * * * *")
.description("A simple task")
Expand Down
2 changes: 1 addition & 1 deletion examples/read_me_example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ async fn main() {
// Create a task with 2 steps and add it to the scheduler.
// The second step fails every second execution.
// Append the task to the scheduler.
scheduler.add_task(
let _ = scheduler.add_task(
TaskBuilder::new(chrono::Local)
.every("1 * * * * * *")
.description("A simple task")
Expand Down
4 changes: 3 additions & 1 deletion examples/simple_example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ async fn main() {
})
.build(),
)
.unwrap()
.add_task(
TaskBuilder::new(chrono::Utc)
.every("1, 10 , 20 * * * * * *")
Expand All @@ -41,7 +42,8 @@ async fn main() {
Ok(Success)
})
.build(),
);
)
.unwrap();

// Execute the tasks in the queue.
scheduler.run().await;
Expand Down
2 changes: 1 addition & 1 deletion examples/task_builder_example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ async fn main() {
let mut scheduler = TaskScheduler::new(500, Utc);

// Append a new task with two steps.
scheduler.add_task(
let _ = scheduler.add_task(
TaskBuilder::new(Utc)
.every("* * * * * *")
.description("Some description")
Expand Down
80 changes: 58 additions & 22 deletions src/builders.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::errors::{TaskError, TaskResult};
use crate::task::{Task, TaskStep, TaskStepStatusErr, TaskStepStatusOk};
use chrono::TimeZone;
use cron::Schedule;
Expand All @@ -16,6 +17,8 @@
/// The provided `Schedule`, if not given,
/// it will be defaulted to once every hour.
schedule: Option<Schedule>,
/// The original expression string, for error reporting
expression: String,
/// Max number of repeats.
repeats: Option<usize>,
/// The Task/Scheduler timezone.
Expand Down Expand Up @@ -43,6 +46,7 @@
steps: Vec::new(),
description: None,
schedule: None,
expression: "* * * * * * *".to_string(), // Default expression
repeats: None,
timezone,
}
Expand All @@ -56,7 +60,7 @@
///
/// ```rust
/// # use tasklet::TaskBuilder;
/// let _task = TaskBuilder::new(chrono::Local).every("* * * * * * *").description("Description").build();
/// let _task = TaskBuilder::new(chrono::Local).every("* * * * * * *").description("Description").build().unwrap();
/// ```
pub fn description(mut self, description: &str) -> TaskBuilder<T> {
self.description = Some(description.to_string());
Expand All @@ -73,10 +77,19 @@
///
/// ```rust
/// # use tasklet::{TaskBuilder, Task};
/// let _task = TaskBuilder::new(chrono::Local).every("* * * * * * *").build();
/// let _task = TaskBuilder::new(chrono::Local).every("* * * * * * *").build().unwrap();
/// ```
pub fn every(mut self, expression: &str) -> TaskBuilder<T> {
self.schedule = Some(expression.parse().unwrap());
self.expression = expression.to_string();
match expression.parse() {
Ok(schedule) => {
self.schedule = Some(schedule);
}
Err(_) => {
// We'll validate at build time
self.schedule = None;
}
};
self
}

Expand Down Expand Up @@ -144,24 +157,38 @@
///
/// ```rust
/// # use tasklet::{TaskBuilder, Task};
/// let mut _task = TaskBuilder::new(chrono::Utc).build();
/// let mut _task = TaskBuilder::new(chrono::Utc).build().unwrap();
/// ```
pub fn build(self) -> Task<T> {
pub fn build(self) -> TaskResult<Task<T>> {
// Validate schedule if provided
let schedule = match self.schedule {
Some(s) => s,
None => {

Check warning on line 166 in src/builders.rs

View check run for this annotation

Codecov / codecov/patch

src/builders.rs#L166

Added line #L166 was not covered by tests
// Try to parse the expression
self.expression.parse().map_err(|e| {
TaskError::InvalidCronExpression(format!(
"Invalid cron expression '{}': {}",
self.expression, e

Check warning on line 171 in src/builders.rs

View check run for this annotation

Codecov / codecov/patch

src/builders.rs#L170-L171

Added lines #L170 - L171 were not covered by tests
))
})?
}
};

// Create the task with default expression - we'll replace the schedule after
let mut task = Task::new(
"* * * * * * *",
match self.description {
Some(ref x) => Some(&x[..]),
None => None,
},
"* * * * * * *", // This is just a placeholder, we'll set the real schedule next
self.description.as_deref(),
self.repeats,
self.timezone,
);
task.set_schedule(
self.schedule
.unwrap_or_else(|| "* * * * * * *".parse().unwrap()),
);
)?;

// Set the validated schedule
task.set_schedule(schedule);

// Set the steps
task.set_steps(self.steps);
task

Ok(task)
}
}

Expand Down Expand Up @@ -197,7 +224,7 @@
#[test]
pub fn test_task_builder_init() {
let builder = TaskBuilder::new(chrono::Utc);
assert_none!(builder.repeats, builder.schedule, builder.description);
assert_none!(builder.repeats);
assert_eq!(builder.steps.len(), 0);
assert_eq!(builder.timezone, chrono::Utc);
}
Expand All @@ -206,7 +233,7 @@
#[test]
pub fn test_task_builder_with_description() {
let builder = TaskBuilder::new(chrono::Utc).description("Some description");
assert_none!(builder.repeats, builder.schedule);
assert_none!(builder.repeats);
assert_eq!(builder.steps.len(), 0);
assert_some!(builder.description);
assert_eq!(builder.timezone, chrono::Utc);
Expand All @@ -229,14 +256,12 @@
assert_eq!(builder.timezone, chrono::Utc);
assert_eq!(builder.steps.len(), 0);
assert_some!(builder.repeats);
assert_none!(builder.schedule, builder.description);
}

/// Test the normal functionality of the add_step() function of the `TaskBuilder`.
#[test]
pub fn test_task_builder_add_step() {
let builder = TaskBuilder::new(chrono::Utc).add_step_default(|| Ok(Success));
assert_none!(builder.schedule, builder.repeats, builder.description);
assert_eq!(builder.timezone, chrono::Utc);
assert_eq!(builder.steps.len(), 1);
}
Expand All @@ -249,7 +274,8 @@
.repeat(5)
.description("Some description")
.add_step("Step 1", || Ok(Success))
.build();
.build()
.unwrap();
assert_some!(task.repeats);
assert_eq!(task.description, "Some description");
assert_eq!(task.timezone, chrono::Utc);
Expand All @@ -262,9 +288,19 @@
let task = TaskBuilder::new(chrono::Utc)
.repeat(5)
.add_step("Step 1", || Ok(Success))
.build();
.build()
.unwrap();
assert_some!(task.repeats);
assert_eq!(task.timezone, chrono::Utc);
assert_eq!(task.steps.len(), 1);
}

/// Test building with an invalid cron expression
#[test]
pub fn test_task_builder_invalid_expression() {
let result = TaskBuilder::new(chrono::Utc)
.every("invalid expression")
.build();
assert!(result.is_err());
}
}
42 changes: 42 additions & 0 deletions src/errors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
//! Error types for the tasklet library.

use thiserror::Error;

/// Errors that can occur when working with tasks.
#[derive(Error, Debug)]
pub enum TaskError {
/// The task is not initialized yet.
#[error("Task not initialized yet")]
NotInitialized,

/// The task has already been executed and must be rescheduled.
#[error("Task already executed and must be rescheduled")]
AlreadyExecuted,

/// The task has failed and must be rescheduled.
#[error("Task failed and must be rescheduled")]
Failed,

/// The task has finished and must be removed.
#[error("Task has finished and must be removed")]
Finished,

/// The task has been force removed.
#[error("Task has been force removed")]
ForceRemoved,

/// The task's schedule could not be parsed.
#[error("Invalid cron expression: {0}")]
InvalidCronExpression(String),

/// A required component is missing.
#[error("Missing required component: {0}")]
MissingComponent(String),

/// A generic error occurred during task execution.
#[error("Task execution error: {0}")]
ExecutionError(String),
}

/// Result type for task operations.
pub type TaskResult<T> = Result<T, TaskError>;
9 changes: 5 additions & 4 deletions src/generator.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
extern crate chrono;
extern crate cron;
use crate::errors::TaskResult;
use crate::task::Task;
use chrono::prelude::*;
use chrono::DateTime;
Expand All @@ -15,7 +16,7 @@ where
T: TimeZone + Send + 'static,
{
/// The discovery function, used to retrieve new tasks.
discovery_function: Box<dyn (FnMut() -> Option<Task<T>>)>,
discovery_function: Box<dyn (FnMut() -> Option<TaskResult<Task<T>>>)>,
/// The execution schedule.
schedule: Schedule,
/// The task generator's timezone.
Expand Down Expand Up @@ -47,7 +48,7 @@ where
/// ```
pub fn new<F>(expression: &str, timezone: T, function: F) -> TaskGenerator<T>
where
F: (FnMut() -> Option<Task<T>>) + 'static,
F: (FnMut() -> Option<TaskResult<Task<T>>>) + 'static,
{
let schedule: Schedule = expression.parse().unwrap();

Expand All @@ -60,7 +61,7 @@ where
}

/// Run the discovery function and reschedule the generation function.
pub(crate) fn run(&mut self) -> Option<Task<T>> {
pub(crate) fn run(&mut self) -> Option<TaskResult<Task<T>>> {
debug!("Executing discovery function");
self.next_exec = self.schedule.upcoming(self.timezone.clone()).next()?;
match (self.discovery_function)() {
Expand Down Expand Up @@ -92,7 +93,7 @@ mod test {
let mut task_gen = TaskGenerator::new("* * * * * * *", Local, || {
Some(Task::new("* * * * * * *", None, Some(1), Local))
});
assert!(!task_gen.run().is_none());
assert!(task_gen.run().is_some());
}

/// Test the normal flow of a task generation instance.
Expand Down
8 changes: 8 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
//! An asynchronous task scheduling library written in Rust.
//!
//! `tasklet` allows you to create scheduled tasks with specific execution patterns and
//! run them asynchronously using Tokio. It supports cron-like scheduling expressions and
//! provides a builder pattern for easy task creation.

mod builders;
pub mod errors;
mod generator;
mod scheduler;
pub mod task;

pub use builders::TaskBuilder;
pub use errors::{TaskError, TaskResult};
pub use generator::TaskGenerator;
pub use scheduler::TaskScheduler;
pub use task::Task;
Loading