diff --git a/Cargo.lock b/Cargo.lock index 0099ceb7..27885ed6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1554,15 +1554,14 @@ dependencies = [ [[package]] name = "kameo" -version = "0.17.2" +version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41a73be96f616ca2784f597b5b6635582f5a7b3ba73b1dbe7afa5d9667955d39" +checksum = "ab031fd265d156f70b6af5b4c8738ef65007e9485d1d28aed5b041b1d4918688" dependencies = [ "downcast-rs", "dyn-clone", "futures", "kameo_macros", - "once_cell", "serde", "tokio", "tracing", @@ -1570,15 +1569,14 @@ dependencies = [ [[package]] name = "kameo_macros" -version = "0.17.0" +version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3f384b32bf6426ae93a8b37da62c85073b676a31a82a86d608ad86453878de0" +checksum = "07992e638a2eb9e2007f3ce6376d6b0ce61e5bebc385201e8dabf37a1804f3e0" dependencies = [ "heck 0.5.0", "proc-macro2", "quote", "syn 2.0.106", - "uuid", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 37c44526..5a0119bf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [workspace] -members = ["bot", "entity", "migration"] resolver = "2" +members = ["bot", "entity", "migration"] [workspace.dependencies] anyhow = "1" @@ -8,31 +8,32 @@ chrono = "0.4" chrono-tz = "0.10" culpa = "1.0" dotenv = "0.15" -entity = { version = "0.4", path = "./entity" } +entity = { path = "./entity", version = "0.4" } fern = { version = "0.7", features = ["colored"] } futures = "0.3" include_dir = { version = "0.7.4", features = ["glob", "nightly"] } itertools = "0.14" -kameo = "0.17" +kameo = "0.18" libbot = { path = "./lib" } log = "0.4" -migration = { version = "0.4", path = "./migration" } +migration = { path = "./migration", version = "0.4" } paste = "1" # plurals = "0.3" regex = "1" sea-orm = { version = "1.1", features = [ - "macros", - "runtime-tokio-rustls", - "sqlx-postgres", - "with-chrono", + "macros", + "runtime-tokio-rustls", + "sqlx-postgres", + "with-chrono", ] } sea-orm-migration = { version = "1.1", features = [ - "runtime-tokio-rustls", - "sqlx-postgres", - "with-chrono", + "runtime-tokio-rustls", + "sqlx-postgres", + "with-chrono", ] } serde = { version = "1.0", features = ["derive"] } teloxide = { version = "0.17", features = ["macros"] } tera = "1" tokio = { version = "1", features = ["macros", "rt-multi-thread"] } -two_timer = { version = "2.2", git = "https://github.com/berkus/two-timer.git", branch = "updated" } +two_timer = { git = "https://github.com/berkus/two-timer.git", branch = "updated", version = "2.2" } + diff --git a/Justfile b/Justfile index fa0b3e44..f3412c84 100644 --- a/Justfile +++ b/Justfile @@ -1,3 +1,6 @@ +@default: + just --list + deploy: build cp target/release/bot ../aegl-bot/ build: diff --git a/bot/Cargo.toml b/bot/Cargo.toml index dfc2cac7..0b875f1c 100644 --- a/bot/Cargo.toml +++ b/bot/Cargo.toml @@ -7,8 +7,8 @@ publish = false [dependencies] anyhow.workspace = true -chrono-tz.workspace = true chrono.workspace = true +chrono-tz.workspace = true culpa.workspace = true dotenv.workspace = true entity.workspace = true diff --git a/bot/src/actors/bot_actor.rs b/bot/src/actors/bot_actor.rs index 5c94f303..9415fd1a 100644 --- a/bot/src/actors/bot_actor.rs +++ b/bot/src/actors/bot_actor.rs @@ -1,11 +1,10 @@ use { crate::{ - actors::reminder_actor::{ - ReminderActor, ScheduleNextDay, ScheduleNextMinute, ScheduleNextWeek, - }, + actors::reminder_actor::{ReminderActor, Reminders, ScheduleNextDay, ScheduleNextWeek}, commands::*, BotCommand, }, + culpa::throws, kameo::{ actor::ActorRef, error::Infallible, @@ -28,6 +27,7 @@ pub struct BotActor { update_sender: broadcast::Sender, connection_pool: DatabaseConnection, commands_list: Vec<(String, String)>, + reminders: Option>, // we must keep a ref, but late init } unsafe impl Send for BotActor {} @@ -67,6 +67,7 @@ impl BotActor { update_sender, connection_pool, commands_list: vec![], + reminders: None, } } @@ -134,6 +135,8 @@ impl Actor for BotActor { new_command!(ChatIdCommand, bot_actor); new_command!(D1weekCommand, bot_actor); new_command!(D2weekCommand, bot_actor); + #[cfg(debug_assertions)] + new_command!(DebugCommand, bot_actor); new_command!(EditCommand, bot_actor); new_command!(EditGuardianCommand, bot_actor); new_command!(HelpCommand, bot_actor); @@ -153,10 +156,13 @@ impl Actor for BotActor { )); // Schedule first run, the actor handler will reschedule. - let _ = reminders.tell(ScheduleNextMinute).await; + log::trace!("Scheduling first tick to {reminders:?}"); + let _ = reminders.tell(Reminders).await; let _ = reminders.tell(ScheduleNextDay).await; let _ = reminders.tell(ScheduleNextWeek).await; + bot_actor.reminders = Some(reminders); + Ok(bot_actor) } } @@ -185,6 +191,9 @@ pub enum Notify { On, } +#[derive(Clone, Debug)] +pub struct Debug; + #[derive(Clone, Debug)] pub struct SendMessage(pub String, pub ChatId, pub Format, pub Notify); @@ -289,3 +298,17 @@ impl Message for BotActor { .try_send(); // @todo use unbounded mailbox for bot_actor? prolly not } } + +impl Message for BotActor { + type Reply = anyhow::Result; + + #[throws(anyhow::Error)] + async fn handle(&mut self, _msg: Debug, _ctx: &mut Context) -> String { + log::debug!("Debug"); + + self.reminders.as_ref().unwrap().kill(); + + let res = self.reminders.as_ref().is_some_and(|r| r.is_alive()); + format!("{res}") + } +} diff --git a/bot/src/actors/reminder_actor.rs b/bot/src/actors/reminder_actor.rs index 78911561..bc476ed3 100644 --- a/bot/src/actors/reminder_actor.rs +++ b/bot/src/actors/reminder_actor.rs @@ -1,22 +1,93 @@ use { crate::actors::bot_actor::{Format, Notify, SendMessage}, - chrono::Timelike, + chrono::{DateTime, Timelike, Utc}, culpa::throws, entity::prelude::*, - kameo::{actor::ActorRef, error::Infallible, message::*, Actor}, + kameo::{ + actor::{ActorRef, WeakActorRef}, + error::Infallible, + mailbox, messages, + prelude::{MailboxReceiver, Message}, + Actor, + }, libbot::{ datetime::{d2_reset_time, reference_date, start_at_time, start_at_weekday_time}, services::destiny_schedule::{this_week_in_d1, this_week_in_d2}, }, + log::{error, trace, warn}, sea_orm::DatabaseConnection, + std::{ + cmp::Ordering, + collections::BinaryHeap, + time::{Duration, Instant}, + }, teloxide::types::ChatId, }; -#[derive(Clone)] +// 1. Daily resets at 20:00 MSK (17:00 UTC) every day +#[throws(kameo::error::SendError)] +pub async fn daily_reset(bot: ActorRef, lfg_chat: ChatId) { + bot.tell(SendMessage( + "⚡️ Daily reset".into(), + lfg_chat, + Format::Plain, + Notify::Off, + )) + .await?; +} + +// 2. Weekly (main) resets at 20:00 msk every Tue +// 6. On main reset: change in Dreaming City curse +// dreaming city on 3-week schedule +// 7. On main reset: change in Dreaming City Ascendant Challenges +// dreaming city challenges on 6-week schedule +#[throws(kameo::error::SendError)] +pub async fn major_weekly_reset( + bot: ActorRef, + lfg_chat: ChatId, +) { + let msg = format!( + "⚡️ Weekly reset:\n\n{d1week}\n\n{d2week}", + d1week = this_week_in_d1(), + d2week = this_week_in_d2(), + ); + bot.tell(SendMessage(msg, lfg_chat, Format::Markdown, Notify::Off)) + .await?; +} + +// #[actor(mailbox = unbounded)] pub struct ReminderActor { bot_ref: ActorRef, lfg_chat: i64, connection_pool: DatabaseConnection, + reminders: BinaryHeap, +} + +#[derive(PartialEq, Eq, Clone, Debug)] +enum Action { + Reminders, + DailyReset, + WeeklyReset, +} + +#[derive(PartialEq, Eq, Clone, Debug)] +struct ReminderJob { + send_at: Instant, + action: Action, +} + +// For a min-heap (earliest timestamp first) +impl Ord for ReminderJob { + fn cmp(&self, other: &Self) -> Ordering { + // Reverse the order for min-heap + other.send_at.cmp(&self.send_at) + } +} + +impl PartialOrd for ReminderJob { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } } impl Actor for ReminderActor { @@ -26,6 +97,64 @@ impl Actor for ReminderActor { async fn on_start(args: Self::Args, _actor_ref: ActorRef) -> Result { Ok(args) } + + async fn next( + &mut self, + actor_ref: WeakActorRef, + mailbox_rx: &mut MailboxReceiver, + ) -> Option> { + loop { + self.print_reminders(); + // @todo: pop and reinsert, or peek and then pop? which is more robust here? + if let Some(next) = self.reminders.peek() { + let duration = next.send_at.duration_since(Instant::now()); + trace!("Sleeping for {duration:?}"); + tokio::select! { + biased; + signal = mailbox_rx.recv() => { + trace!("Woken early from {duration:?} sleep"); + return signal; + }, + _ = tokio::time::sleep(duration) => { + if let Some(next) = self.reminders.pop() { + if let Some(actor) = actor_ref.upgrade() { + trace!("{duration:?} sleep completed, dispatching action"); + match next.action { + Action::Reminders => { let _ = actor.tell(Reminders).try_send(); } + Action::DailyReset => { let _ = actor.tell(DailyReset).try_send(); } + Action::WeeklyReset => { let _ = actor.tell(WeeklyReset).try_send(); } + } + } + } + } + } + } else { + // No reminder jobs... just receive the next message as usual + trace!("No reminder jobs, just receive the next message as usual"); + return mailbox_rx.recv().await; + } + } + } + + async fn on_stop( + &mut self, + _actor_ref: WeakActorRef, + reason: kameo::prelude::ActorStopReason, + ) -> Result<(), Self::Error> { + warn!("ReminderActor stopped! {reason}"); + Ok(()) + } + + async fn on_panic( + &mut self, + _actor_ref: WeakActorRef, + err: kameo::prelude::PanicError, + ) -> Result, Self::Error> { + error!("ReminderActor panicked! {err}"); + Ok(std::ops::ControlFlow::Break( + kameo::prelude::ActorStopReason::Panicked(err), + )) + } } impl ReminderActor { @@ -38,37 +167,89 @@ impl ReminderActor { bot_ref, lfg_chat, connection_pool, + reminders: BinaryHeap::new(), } } fn connection(&self) -> &DatabaseConnection { &self.connection_pool } + + fn schedule_at_time(&mut self, time: DateTime, action: Action) { + let delay = std::cmp::max(time.timestamp() - Utc::now().timestamp(), 0_i64); + let delay = Duration::from_secs(delay as u64); + + let job = ReminderJob { + send_at: Instant::now() + delay, + action, + }; + + self.reminders.push(job); + } + + fn print_reminders(&self) { + let mut heap = self.reminders.clone(); + while let Some(x) = heap.pop() { + trace!("Reminder: {x:?}"); + } + } } -#[derive(Clone, Debug)] -pub struct Reminders; +#[messages] +impl ReminderActor { + #[message] + pub async fn schedule_next_minute(&mut self) -> anyhow::Result<()> { + self.schedule_at_time( + (reference_date() + chrono::Duration::minutes(1)) + .with_second(0) + .unwrap(), + Action::Reminders, + ); + Ok(()) + } -#[derive(Clone, Debug)] -pub struct DailyReset; + #[message] + pub async fn schedule_next_day(&mut self) -> anyhow::Result<()> { + self.schedule_at_time( + start_at_time(reference_date(), d2_reset_time()), + Action::DailyReset, + ); + Ok(()) + } -#[derive(Clone, Debug)] + #[message] + pub async fn schedule_next_week(&mut self) -> anyhow::Result<()> { + self.schedule_at_time( + start_at_weekday_time(reference_date(), chrono::Weekday::Tue, d2_reset_time()), + Action::WeeklyReset, + ); + Ok(()) + } +} + +pub struct Reminders; +pub struct DailyReset; pub struct WeeklyReset; impl Message for ReminderActor { - type Reply = (); + type Reply = anyhow::Result<()>; + #[throws(anyhow::Error)] async fn handle( &mut self, _msg: Reminders, - ctx: &mut Context, - ) -> Self::Reply { + ctx: &mut kameo::prelude::Context, + ) { let bot_ref = self.bot_ref.clone(); let connection = self.connection(); let lfg_chat = self.lfg_chat; + trace!("Received Reminders"); + let found = PlannedActivities::upcoming_activities_alert(connection).await; + trace!("Loaded planned activities"); + if let Some(upcoming_events) = found { // @Todo: this text should be populated in tera template in `bot` let text = upcoming_events @@ -87,127 +268,34 @@ impl Message for ReminderActor { .await; } - let _ = ctx.actor_ref().tell(ScheduleNextMinute).await; + ctx.actor_ref().tell(ScheduleNextMinute).try_send()?; } } -// 1. Daily resets at 20:00 MSK (17:00 UTC) every day -#[throws(kameo::error::SendError)] -pub async fn daily_reset(bot: ActorRef, lfg_chat: ChatId) { - bot.tell(SendMessage( - "⚡️ Daily reset".into(), - lfg_chat, - Format::Plain, - Notify::Off, - )) - .await?; -} - impl Message for ReminderActor { type Reply = anyhow::Result<()>; #[throws(anyhow::Error)] - async fn handle(&mut self, _msg: DailyReset, ctx: &mut Context) { + async fn handle( + &mut self, + _msg: DailyReset, + ctx: &mut kameo::prelude::Context, + ) { daily_reset(self.bot_ref.clone(), ChatId(self.lfg_chat)).await?; - ctx.actor_ref().tell(ScheduleNextDay).await?; + ctx.actor_ref().tell(ScheduleNextDay).try_send()?; } } -// 2. Weekly (main) resets at 20:00 msk every Tue -// 6. On main reset: change in Dreaming City curse -// dreaming city on 3-week schedule -// 7. On main reset: change in Dreaming City Ascendant Challenges -// dreaming city challenges on 6-week schedule -#[throws(kameo::error::SendError)] -pub async fn major_weekly_reset( - bot: ActorRef, - lfg_chat: ChatId, -) { - let msg = format!( - "⚡️ Weekly reset:\n\n{d1week}\n\n{d2week}", - d1week = this_week_in_d1(), - d2week = this_week_in_d2(), - ); - bot.tell(SendMessage(msg, lfg_chat, Format::Markdown, Notify::Off)) - .await?; -} - impl Message for ReminderActor { type Reply = anyhow::Result<()>; #[throws(anyhow::Error)] - async fn handle(&mut self, _msg: WeeklyReset, ctx: &mut Context) { - major_weekly_reset(self.bot_ref.clone(), ChatId(self.lfg_chat)).await?; - ctx.actor_ref().tell(ScheduleNextWeek).await?; - } -} - -#[derive(Clone, Debug)] -pub struct ScheduleNextMinute; - -#[derive(Clone, Debug)] -pub struct ScheduleNextDay; - -#[derive(Clone, Debug)] -pub struct ScheduleNextWeek; - -impl Message for ReminderActor { - type Reply = anyhow::Result<()>; - - #[throws(anyhow::Error)] - async fn handle(&mut self, _msg: ScheduleNextMinute, ctx: &mut Context) { - let target_time = (reference_date() + chrono::Duration::minutes(1)) - .with_second(0) - .unwrap(); - let actor_ref = ctx.actor_ref().clone(); - - let now = std::time::SystemTime::now(); - let target_system_time = - std::time::UNIX_EPOCH + std::time::Duration::from_secs(target_time.timestamp() as u64); - if let Ok(duration) = target_system_time.duration_since(now) { - tokio::time::sleep(duration).await; - actor_ref.tell(Reminders).try_send()?; - } - } -} - -impl Message for ReminderActor { - type Reply = (); async fn handle( &mut self, - _msg: ScheduleNextDay, - ctx: &mut Context, - ) -> Self::Reply { - let target_time = start_at_time(reference_date(), d2_reset_time()); - let actor_ref = ctx.actor_ref().clone(); - - let now = std::time::SystemTime::now(); - let target_system_time = - std::time::UNIX_EPOCH + std::time::Duration::from_secs(target_time.timestamp() as u64); - if let Ok(duration) = target_system_time.duration_since(now) { - tokio::time::sleep(duration).await; - let _ = actor_ref.tell(DailyReset).await; - } - } -} - -impl Message for ReminderActor { - type Reply = (); - async fn handle( - &mut self, - _msg: ScheduleNextWeek, - ctx: &mut Context, - ) -> Self::Reply { - let target_time = - start_at_weekday_time(reference_date(), chrono::Weekday::Tue, d2_reset_time()); - let actor_ref = ctx.actor_ref().clone(); - - let now = std::time::SystemTime::now(); - let target_system_time = - std::time::UNIX_EPOCH + std::time::Duration::from_secs(target_time.timestamp() as u64); - if let Ok(duration) = target_system_time.duration_since(now) { - tokio::time::sleep(duration).await; - let _ = actor_ref.tell(WeeklyReset).await; - } + _msg: WeeklyReset, + ctx: &mut kameo::prelude::Context, + ) { + major_weekly_reset(self.bot_ref.clone(), ChatId(self.lfg_chat)).await?; + ctx.actor_ref().tell(ScheduleNextWeek).try_send()?; } } diff --git a/bot/src/commands/debug_command.rs b/bot/src/commands/debug_command.rs new file mode 100644 index 00000000..3e05ff98 --- /dev/null +++ b/bot/src/commands/debug_command.rs @@ -0,0 +1,22 @@ +use { + crate::{ + actors::bot_actor::{ActorUpdateMessage, Debug}, + commands::match_command, + }, + culpa::throws, + kameo::message::Context, +}; + +command_actor!(DebugCommand, "debug", "Run debug operations"); + +impl Message for DebugCommand { + type Reply = anyhow::Result<()>; + + #[throws(anyhow::Error)] + async fn handle(&mut self, msg: ActorUpdateMessage, _ctx: &mut Context) { + if let (Some(_), _) = match_command(msg.update.text(), Self::prefix(), &self.bot_name) { + self.send_reply(&msg, format!("Alive: {}", self.bot_ref.ask(Debug).await?)) + .await; + } + } +} diff --git a/bot/src/commands/mod.rs b/bot/src/commands/mod.rs index dda8fcd7..b2c99bdd 100644 --- a/bot/src/commands/mod.rs +++ b/bot/src/commands/mod.rs @@ -115,6 +115,8 @@ mod chatid_command; pub use self::chatid_command::*; mod d2week_command; pub use self::d2week_command::*; +mod debug_command; +pub use self::debug_command::*; mod dweek_command; pub use self::dweek_command::*; mod edit_command;