From 861a8e33723928fb358aa41c85d8f4a865f5f945 Mon Sep 17 00:00:00 2001 From: Alan Alvarez Date: Fri, 7 Feb 2025 11:03:31 -0600 Subject: [PATCH] feat: add orchestrator logic --- README.md | 51 +++++++++++++++ lib/bas/orchestrator.rb | 18 ++++++ lib/bas/orchestrator/manager.rb | 92 +++++++++++++++++++++++++++ spec/bas/orchestrator/manager_spec.rb | 80 +++++++++++++++++++++++ spec/bas/orchestrator_spec.rb | 29 +++++++++ 5 files changed, 270 insertions(+) create mode 100644 lib/bas/orchestrator.rb create mode 100644 lib/bas/orchestrator/manager.rb create mode 100644 spec/bas/orchestrator/manager_spec.rb create mode 100644 spec/bas/orchestrator_spec.rb diff --git a/README.md b/README.md index 74a5443..e68d451 100644 --- a/README.md +++ b/README.md @@ -88,6 +88,57 @@ CREATE TABLE api_data( ); ``` +## Orchestrator: Manager for Scheduling and Running Scripts ## + +This module manages the execution of scheduled bots based on time intervals, specific times, or days. It uses a **ThreadPool** to run the scripts concurrently, ensuring efficient execution. + +### Adding Schedules to Your Repository + +To use the Orchestrator for scheduling and executing your bots, you need to define a `schedules` array in your repository. This array should include the paths to your scripts and the schedules for execution. + +### Example of the `schedules` defintion + +```ruby +BIRTHDAY_SCHEDULES = [ + # Execute every 1000ms (1 second) + { path: '/birthday/fetch_birthday_from_notion.rb', interval: 1000 }, + { path: '/birthday/format_birthday.rb', interval: 1000 }, + { path: '/birthday/notify_birthday_in_discord.rb', interval: 1000 }, + { path: '/birthday/garbage_collector.rb', interval: 1000 }, + ].freeze + + # With days and hours + # Execute at 08:00 AM on Mondays + { path: '/birthday/notify_birthday_in_email.rb', day: ['Monday'], time: ['08:00'] } + +``` + +### How to Use the Orchestrator + +Once you've defined your schedules, you can initialize and run the Orchestrator to begin executing your scripts based on their schedules. + + +```ruby + +# Initialize the orchestrator with the defined schedules +manager = Bas::Orchestrator::Manager.new(BIRTHDAY_SCHEDULES) + +# Run the orchestrator +manager.run + +``` +### Folder structure example: + +```bash +src/use_cases_execution/ +├── birthday/ +│ ├── fetch_birthday_from_notion.rb +│ ├── format_birthday.rb +│ ├── notify_birthday_in_discord.rb +│ └── garbage_collector.rb +└── schedules.rb +``` + ### Implementation examples #### Example 1: Using the Same Shared Storage for Reading and Writing diff --git a/lib/bas/orchestrator.rb b/lib/bas/orchestrator.rb new file mode 100644 index 0000000..cf888b2 --- /dev/null +++ b/lib/bas/orchestrator.rb @@ -0,0 +1,18 @@ +# frozen_string_literal: true + +require "bas/orchestrator/manager" +module Bas + # The Orchestrator module is responsible for managing the scheduling and execution + # of scripts within the business automation services. It provides a high-level + # interface to start the orchestration process using the `Manager` class. + # + module Orchestrator + # Starts the orchestration process with the given schedules. + # + # @param schedules [Array] A list of scripts with execution details. + def self.start(schedules) + manager = Manager.new(schedules) + manager.run + end + end +end diff --git a/lib/bas/orchestrator/manager.rb b/lib/bas/orchestrator/manager.rb new file mode 100644 index 0000000..d8fed33 --- /dev/null +++ b/lib/bas/orchestrator/manager.rb @@ -0,0 +1,92 @@ +# frozen_string_literal: true + +require "concurrent-ruby" + +module Bas + module Orchestrator + ## + # Manager class responsible for scheduling and executing scripts concurrently. + # + # This class initializes a thread pool and processes scheduled scripts based on + # time intervals, specific days, or exact times. + # + class Manager + def initialize(schedules) + @last_executions = Hash.new(0.0) + @schedules = schedules + @pool = Concurrent::FixedThreadPool.new(@schedules.size) + end + + def run + @schedules.each { |script| @pool.post { process_script(script) } } + + @pool.shutdown + @pool.wait_for_termination + end + + private + + def process_script(script) + loop do + @actual_time = Time.new + + execute_interval(script) if interval?(script) + execute_day(script) if day?(script) && time?(script) + execute_time(script) if time?(script) && !day?(script) + + sleep 0.1 + rescue StandardError => e + puts "Error in thread: #{e.message}" + end + end + + def execute_interval(script) + return unless time_in_milliseconds - @last_executions[script[:path]] >= script[:interval] + + execute(script) + @last_executions[script[:path]] = time_in_milliseconds + end + + def execute_day(script) + return unless script[:day].include?(current_day) && script[:time].include?(current_time) + + execute(script) unless @last_executions[script[:path]].eql?(current_time) + @last_executions[script[:path]] = current_time + end + + def execute_time(script) + execute(script) if script[:time].include?(current_time) && !@last_executions[script[:path]].eql?(current_time) + @last_executions[script[:path]] = current_time + end + + def interval?(script) + script[:interval] + end + + def time?(script) + script[:time] + end + + def day?(script) + script[:day] + end + + def time_in_milliseconds + @actual_time.to_f * 1000 + end + + def current_time + @actual_time.strftime("%H:%M") + end + + def current_day + @actual_time.strftime("%A") + end + + def execute(script) + puts "Executing #{script[:path]} at #{current_time}" + system("ruby ", script[:path]) + end + end + end +end diff --git a/spec/bas/orchestrator/manager_spec.rb b/spec/bas/orchestrator/manager_spec.rb new file mode 100644 index 0000000..4a78156 --- /dev/null +++ b/spec/bas/orchestrator/manager_spec.rb @@ -0,0 +1,80 @@ +# frozen_string_literal: true + +require "spec_helper" +require "bas/orchestrator/manager" + +RSpec.describe Bas::Orchestrator::Manager do + let(:schedules) do + [ + { path: "websites_availability/fetch_domain_services_from_notion.rb", interval: 600_000 }, + { path: "websites_availability/notify_domain_availability.rb", interval: 60_000 }, + { path: "websites_availability/garbage_collector.rb", time: ["00:00"] }, + { path: "pto_next_week/fetch_next_week_pto_from_notion.rb", time: ["12:40"], day: ["Monday"] }, + { path: "pto/fetch_pto_from_notion.rb", time: ["13:10"] } + ] + end + + let(:manager) { described_class.new(schedules) } + + before do + allow(manager).to receive(:current_time).and_return("12:40") + allow(manager).to receive(:current_day).and_return("Monday") + allow(manager).to receive(:time_in_milliseconds).and_return(10_000) + allow(manager).to receive(:system).and_return(true) + end + + describe "#execute_interval" do + it "executes scripts when interval has elapsed" do + script = schedules[0] + manager.instance_variable_set(:@last_executions, { script[:path] => 0 }) + allow(manager).to receive(:time_in_milliseconds).and_return(600_000) + + expect { manager.send(:execute_interval, script) }.to(change do + manager.instance_variable_get(:@last_executions)[script[:path]] + end) + end + + it "does not execute script if interval has not elapsed" do + script = schedules[0] + manager.instance_variable_set(:@last_executions, { script[:path] => 0 }) + allow(manager).to receive(:time_in_milliseconds).and_return(10_000) + + expect { manager.send(:execute_interval, script) }.not_to(change do + manager.instance_variable_get(:@last_executions)[script[:path]] + end) + end + end + + describe "#execute_time" do + it "executes scripts at exact time" do + script = schedules[2] + allow(manager).to receive(:current_time).and_return("00:00") + + expect { manager.send(:execute_time, script) }.to(change do + manager.instance_variable_get(:@last_executions)[script[:path]] + end) + end + end + + describe "#execute_day" do + it "executes scripts at specific time and day" do + script = schedules[3] + allow(manager).to receive(:current_time).and_return("12:40") + allow(manager).to receive(:current_day).and_return("Monday") + + expect { manager.send(:execute_day, script) }.to(change do + manager.instance_variable_get(:@last_executions)[script[:path]] + end) + end + + it "does not execute script if time is correct but the day is incorrect" do + script = schedules[3] + allow(manager).to receive(:current_time).and_return("12:40") + allow(manager).to receive(:current_day).and_return("Tuesday") + + expect { manager.send(:execute_day, script) }.not_to(change do + manager.instance_variable_get(:@last_executions)[script[:path]] + end) + end + end +end diff --git a/spec/bas/orchestrator_spec.rb b/spec/bas/orchestrator_spec.rb new file mode 100644 index 0000000..ec428b6 --- /dev/null +++ b/spec/bas/orchestrator_spec.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +require "spec_helper" +require "bas/orchestrator" + +RSpec.describe Bas::Orchestrator do + let(:schedules) do + [ + { path: "websites_availability/fetch_domain_services_from_notion.rb", interval: 600_000 }, + { path: "websites_availability/notify_domain_availability.rb", interval: 60_000 }, + { path: "websites_availability/garbage_collector.rb", time: ["00:00"] }, + { path: "pto_next_week/fetch_next_week_pto_from_notion.rb", time: ["12:40"], day: ["Monday"] }, + { path: "pto/fetch_pto_from_notion.rb", time: ["13:10"] } + ] + end + + let(:manager) { instance_double(Bas::Orchestrator::Manager, run: true) } + + before do + allow(Bas::Orchestrator::Manager).to receive(:new).with(schedules).and_return(manager) + end + + describe ".start" do + it "initializes and runs the manager" do + expect(manager).to receive(:run) + described_class.start(schedules) + end + end +end