Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "tasklet"
version = "0.2.10"
version = "0.2.11"
authors = ["Stavros Grigoriou <unix121@protonmail.com>"]
edition = "2021"
rust-version = "1.90.0"
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ In your `Cargo.toml` add:

```
[dependencies]
tasklet = "0.2.10"
tasklet = "0.2.11"
```

## Example
Expand Down
8 changes: 4 additions & 4 deletions src/generator.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
extern crate chrono;
extern crate cron;
use crate::errors::TaskResult;
use crate::scheduler_log;
use crate::task::Task;
use chrono::prelude::*;
use chrono::DateTime;
use cron::Schedule;
use log::debug;

/// Task generation structure.
///
Expand Down Expand Up @@ -62,16 +62,16 @@ where

/// Run the discovery function and reschedule the generation function.
pub(crate) fn run(&mut self) -> Option<TaskResult<Task<T>>> {
debug!("Executing discovery function");
scheduler_log!(log::Level::Debug, "Executing task discovery function");
self.next_exec = self.schedule.upcoming(self.timezone.clone()).next()?;
match (self.discovery_function)() {
Some(t) => {
// A task was generated and must be returned.
debug!("A task was found, adding it to the queue...");
scheduler_log!(log::Level::Debug, "Task discovered, adding to queue");
Some(t)
}
None => {
debug!("No task was generated");
scheduler_log!(log::Level::Debug, "No tasks were generated");
None
}
}
Expand Down
72 changes: 72 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,75 @@ pub use errors::{TaskError, TaskResult};
pub use generator::TaskGenerator;
pub use scheduler::TaskScheduler;
pub use task::Task;

/// Macro for consistent task-related logging
///
/// # Examples
///
/// ```
/// use tasklet::task_log;
/// use log::Level;
///
/// // Log an info message for task 1
/// task_log!(1, Level::Info, "Task started with parameter: {}", "example");
/// ```
#[macro_export]
macro_rules! task_log {
($task_id:expr, $level:expr, $message:expr $(, $args:expr)*) => {
match $level {
log::Level::Error => log::error!("[Task {}] {}", $task_id, format!($message $(, $args)*)),
log::Level::Warn => log::warn!("[Task {}] {}", $task_id, format!($message $(, $args)*)),
log::Level::Info => log::info!("[Task {}] {}", $task_id, format!($message $(, $args)*)),
log::Level::Debug => log::debug!("[Task {}] {}", $task_id, format!($message $(, $args)*)),
log::Level::Trace => log::trace!("[Task {}] {}", $task_id, format!($message $(, $args)*)),
}
};
}

/// Macro for consistent task step logging
///
/// # Examples
///
/// ```
/// use tasklet::step_log;
/// use log::Level;
///
/// // Log a debug message for task 1, step 2
/// step_log!(1, 2, Level::Debug, "Step completed successfully with result: {}", "success");
/// ```
#[macro_export]
macro_rules! step_log {
($task_id:expr, $step_idx:expr, $level:expr, $message:expr $(, $args:expr)*) => {
match $level {
log::Level::Error => log::error!("[Task {}-Step {}] {}", $task_id, $step_idx, format!($message $(, $args)*)),
log::Level::Warn => log::warn!("[Task {}-Step {}] {}", $task_id, $step_idx, format!($message $(, $args)*)),
log::Level::Info => log::info!("[Task {}-Step {}] {}", $task_id, $step_idx, format!($message $(, $args)*)),
log::Level::Debug => log::debug!("[Task {}-Step {}] {}", $task_id, $step_idx, format!($message $(, $args)*)),
log::Level::Trace => log::trace!("[Task {}-Step {}] {}", $task_id, $step_idx, format!($message $(, $args)*)),
}
};
}

/// Macro for consistent scheduler logging
///
/// # Examples
///
/// ```
/// use tasklet::scheduler_log;
/// use log::Level;
///
/// // Log a warning message from the scheduler
/// scheduler_log!(Level::Warn, "Failed to execute task with ID: {}", 5);
/// ```
#[macro_export]
macro_rules! scheduler_log {
($level:expr, $message:expr $(, $args:expr)*) => {
match $level {
log::Level::Error => log::error!("[Scheduler] {}", format!($message $(, $args)*)),
log::Level::Warn => log::warn!("[Scheduler] {}", format!($message $(, $args)*)),
log::Level::Info => log::info!("[Scheduler] {}", format!($message $(, $args)*)),
log::Level::Debug => log::debug!("[Scheduler] {}", format!($message $(, $args)*)),
log::Level::Trace => log::trace!("[Scheduler] {}", format!($message $(, $args)*)),
}
};
}
39 changes: 25 additions & 14 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::errors::{TaskError, TaskResult};
use crate::generator::TaskGenerator;
use crate::task::{run_task, Status, Task, TaskCmd, TaskResponse};
use crate::{scheduler_log, task_log};
use chrono::prelude::*;
use chrono::Utc;
use futures::future::join_all;
use futures::StreamExt;
use log::{debug, error, info};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::sync::{mpsc, oneshot};
Expand Down Expand Up @@ -210,13 +210,14 @@ where
if res.status == Status::Finished || res.status == Status::ForceRemoved {
for handle in &self.handles {
if handle.id == res.id {
debug!(
"Killing task {} due {}",
task_log!(
res.id,
log::Level::Debug,
"Killing task due to {}",
if res.status == Status::Finished {
"to end of execution circle."
"end of execution cycle"
} else {
"force removal."
"force removal"
}
);
handle.handle.abort();
Expand Down Expand Up @@ -263,16 +264,19 @@ where
.iter_mut()
.filter(|h| h.id == r.id)
.for_each(|h| {
info!("Task with id {} initialized", h.id);
task_log!(h.id, log::Level::Info, "Initialized");
h.is_init = true;
});
}
_ => {
error!("Task with id {} failed to initialize", r.id);
task_log!(r.id, log::Level::Error, "Failed to initialize");
}
},
Err(_) => {
error!("RecvError returned by at least one uninitialized task")
scheduler_log!(
log::Level::Error,
"RecvError returned by at least one uninitialized task"
);
}
});
}
Expand Down Expand Up @@ -307,8 +311,9 @@ where
///
/// If there is a task generation/discovery method provided, executed on every loop.
pub async fn run(&mut self) {
info!(
"Scheduler started. Total tasks currently in queue: {}",
scheduler_log!(
log::Level::Info,
"Scheduler started. Total tasks in queue: {}",
self.handles.len()
);

Expand All @@ -322,12 +327,18 @@ where
}
match self.execute_tasks().await {
ExecutionStatus::Success(c) => {
info!("Execution round run successfully for {} total task(s)", c);
scheduler_log!(
log::Level::Info,
"Execution round completed successfully for {} task(s)",
c
);
}
ExecutionStatus::HadError(c, e) => {
error!(
"Execution round executed {} total task(s) and had {} total error(s)",
c, e
scheduler_log!(
log::Level::Error,
"Execution round ran {} task(s) with {} error(s)",
c,
e
);
}
_ => { /* No executions */ }
Expand Down
63 changes: 44 additions & 19 deletions src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ extern crate chrono;
extern crate cron;

use crate::errors::{TaskError, TaskResult};
use crate::{step_log, task_log};
use chrono::TimeZone;
use chrono::{DateTime, Utc};
use cron::Schedule;
use log::{debug, error, warn};
use std::fmt::{self, Debug};
use tokio::sync::{mpsc, oneshot};

Expand Down Expand Up @@ -314,15 +314,15 @@ where
///
/// * id - The task's id.
pub(crate) fn init(&mut self) {
debug!("Task with id {} is initializing", self.task_id);
task_log!(self.task_id, log::Level::Debug, "Initializing");
self.next_exec = Some(
self.schedule
.upcoming(self.timezone.clone())
.next()
.unwrap(),
);
self.status = Status::Scheduled;
debug!("Task with id {} finished initializing", self.task_id);
task_log!(self.task_id, log::Level::Debug, "Finished initializing");
}

/// Create a `TaskResponse` from the current state of the task.
Expand Down Expand Up @@ -370,18 +370,32 @@ where
Status::Finished => Err(TaskError::Finished),
Status::ForceRemoved => Err(TaskError::ForceRemoved),
Status::Scheduled => {
debug!(
"[Task {}] [{}] is been executed...",
self.task_id, self.description
task_log!(
self.task_id,
log::Level::Debug,
"Executing '{}'",
self.description
);
let mut had_error: bool = false;
for (index, step) in self.steps.iter_mut().enumerate() {
if !had_error {
match (step.function)() {
Ok(status) => {
match status {
TaskStepStatusOk::Success => debug!("[Task step {}-{}] [{}] Executed successfully",self.task_id, index, step),
TaskStepStatusOk::HadErrors => debug!("[Task step {}-{}] [{}] Executed successfully but had some non fatal errors",self.task_id, index, step)
TaskStepStatusOk::Success => step_log!(
self.task_id,
index,
log::Level::Debug,
"Executed successfully - {}",
step
),
TaskStepStatusOk::HadErrors => step_log!(
self.task_id,
index,
log::Level::Debug,
"Executed with non-fatal errors - {}",
step
),
}
self.status = Status::Executed
}
Expand All @@ -390,14 +404,23 @@ where
had_error = true;
match status {
TaskStepStatusErr::Error => {
error!(
"[Task step {}-{}] [{}] Execution failed",
self.task_id, index, step
step_log!(
self.task_id,
index,
log::Level::Error,
"Execution failed - {}",
step
);
self.status = Status::Failed
}
TaskStepStatusErr::ErrorDelete => {
error!("[Task step {}-{}] [{}] Execution failed and the task is marked for deletion",self.task_id, index, step);
step_log!(
self.task_id,
index,
log::Level::Error,
"Execution failed and task is marked for deletion - {}",
step
);
self.status = Status::ForceRemoved
}
}
Expand Down Expand Up @@ -432,12 +455,13 @@ where
self.status = match self.repeats {
Some(t) => {
if t > 0 {
debug!("[Task {}] Has been rescheduled", self.task_id);
task_log!(self.task_id, log::Level::Debug, "Has been rescheduled");
Status::Scheduled
} else {
warn!(
"[Task {}] Has finished its execution cycle and will be removed",
self.task_id
task_log!(
self.task_id,
log::Level::Warn,
"Has finished its execution cycle and will be removed"
);
Status::Finished
}
Expand All @@ -447,9 +471,10 @@ where
Ok(())
}
Status::Finished | Status::ForceRemoved => {
warn!(
"[Task {}] The task will be removed from the queue",
self.task_id
task_log!(
self.task_id,
log::Level::Warn,
"Will be removed from the queue"
);
Ok(())
}
Expand Down