Skip to content

Commit 80121fc

Browse files
committed
feat: add orchestrator logic
1 parent b88ab0e commit 80121fc

5 files changed

Lines changed: 270 additions & 0 deletions

File tree

README.md

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,57 @@ CREATE TABLE api_data(
8888
);
8989
```
9090

91+
## Orchestrator: Manager for Scheduling and Running Scripts ##
92+
93+
This module manages the execution of scheduled bots based on time intervals, specific times, or days. It uses a **ThreadPool** to run the scripts concurrently, ensuring efficient execution.
94+
95+
### Adding Schedules to Your Repository
96+
97+
To use the Orchestrator for scheduling and executing your bots, you need to define a `schedules` array in your repository. This array should include the paths to your scripts and the schedules for execution.
98+
99+
### Example of the `schedules` defintion
100+
101+
```ruby
102+
BIRTHDAY_SCHEDULES = [
103+
# Execute every 1000ms (1 second)
104+
{ path: '/birthday/fetch_birthday_from_notion.rb', interval: 1000 },
105+
{ path: '/birthday/format_birthday.rb', interval: 1000 },
106+
{ path: '/birthday/notify_birthday_in_discord.rb', interval: 1000 },
107+
{ path: '/birthday/garbage_collector.rb', interval: 1000 },
108+
].freeze
109+
110+
# With days and hours
111+
# Execute at 08:00 AM on Mondays
112+
{ path: '/birthday/notify_birthday_in_email.rb', day: ['Monday'], time: ['08:00'] }
113+
114+
```
115+
116+
### How to Use the Orchestrator
117+
118+
Once you've defined your schedules, you can initialize and run the Orchestrator to begin executing your scripts based on their schedules.
119+
120+
121+
```ruby
122+
123+
# Initialize the orchestrator with the defined schedules
124+
manager = Bas::Orchestrator::Manager.new(BIRTHDAY_SCHEDULES)
125+
126+
# Run the orchestrator
127+
manager.run
128+
129+
```
130+
### Folder structure example:
131+
132+
```bash
133+
src/use_cases_execution/
134+
├── birthday/
135+
│ ├── fetch_birthday_from_notion.rb
136+
│ ├── format_birthday.rb
137+
│ ├── notify_birthday_in_discord.rb
138+
│ └── garbage_collector.rb
139+
└── schedules.rb
140+
```
141+
91142
### Implementation examples
92143

93144
#### Example 1: Using the Same Shared Storage for Reading and Writing

lib/bas/orchestrator.rb

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# frozen_string_literal: true
2+
3+
require "bas/orchestrator/manager"
4+
module Bas
5+
# The Orchestrator module is responsible for managing the scheduling and execution
6+
# of scripts within the business automation services. It provides a high-level
7+
# interface to start the orchestration process using the `Manager` class.
8+
#
9+
module Orchestrator
10+
# Starts the orchestration process with the given schedules.
11+
#
12+
# @param schedules [Array<Hash>] A list of scripts with execution details.
13+
def self.start(schedules)
14+
manager = Manager.new(schedules)
15+
manager.run
16+
end
17+
end
18+
end

lib/bas/orchestrator/manager.rb

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
# frozen_string_literal: true
2+
3+
require "concurrent-ruby"
4+
5+
module Bas
6+
module Orchestrator
7+
##
8+
# Manager class responsible for scheduling and executing scripts concurrently.
9+
#
10+
# This class initializes a thread pool and processes scheduled scripts based on
11+
# time intervals, specific days, or exact times.
12+
#
13+
class Manager
14+
def initialize(schedules)
15+
@last_executions = Hash.new(0.0)
16+
@schedules = schedules
17+
@pool = Concurrent::FixedThreadPool.new(@schedules.size)
18+
end
19+
20+
def run
21+
@schedules.each { |script| @pool.post { process_script(script) } }
22+
23+
@pool.shutdown
24+
@pool.wait_for_termination
25+
end
26+
27+
private
28+
29+
def process_script(script)
30+
loop do
31+
@actual_time = Time.new
32+
33+
execute_interval(script) if interval?(script)
34+
execute_day(script) if day?(script) && time?(script)
35+
execute_time(script) if time?(script) && !day?(script)
36+
37+
sleep 0.1
38+
rescue StandardError => e
39+
puts "Error in thread: #{e.message}"
40+
end
41+
end
42+
43+
def execute_interval(script)
44+
return unless time_in_milliseconds - @last_executions[script[:path]] >= script[:interval]
45+
46+
execute(script)
47+
@last_executions[script[:path]] = time_in_milliseconds
48+
end
49+
50+
def execute_day(script)
51+
return unless script[:day].include?(current_day) && script[:time].include?(current_time)
52+
53+
execute(script) unless @last_executions[script[:path]].eql?(current_time)
54+
@last_executions[script[:path]] = current_time
55+
end
56+
57+
def execute_time(script)
58+
execute(script) if script[:time].include?(current_time) && !@last_executions[script[:path]].eql?(current_time)
59+
@last_executions[script[:path]] = current_time
60+
end
61+
62+
def interval?(script)
63+
script[:interval]
64+
end
65+
66+
def time?(script)
67+
script[:time]
68+
end
69+
70+
def day?(script)
71+
script[:day]
72+
end
73+
74+
def time_in_milliseconds
75+
@actual_time.to_f * 1000
76+
end
77+
78+
def current_time
79+
@actual_time.strftime("%H:%M")
80+
end
81+
82+
def current_day
83+
@actual_time.strftime("%A")
84+
end
85+
86+
def execute(script)
87+
puts "Executing #{script[:path]} at #{current_time}"
88+
system("ruby ", script[:path])
89+
end
90+
end
91+
end
92+
end
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
# frozen_string_literal: true
2+
3+
require "spec_helper"
4+
require "bas/orchestrator/manager"
5+
6+
RSpec.describe Bas::Orchestrator::Manager do
7+
let(:schedules) do
8+
[
9+
{ path: "websites_availability/fetch_domain_services_from_notion.rb", interval: 600_000 },
10+
{ path: "websites_availability/notify_domain_availability.rb", interval: 60_000 },
11+
{ path: "websites_availability/garbage_collector.rb", time: ["00:00"] },
12+
{ path: "pto_next_week/fetch_next_week_pto_from_notion.rb", time: ["12:40"], day: ["Monday"] },
13+
{ path: "pto/fetch_pto_from_notion.rb", time: ["13:10"] }
14+
]
15+
end
16+
17+
let(:manager) { described_class.new(schedules) }
18+
19+
before do
20+
allow(manager).to receive(:current_time).and_return("12:40")
21+
allow(manager).to receive(:current_day).and_return("Monday")
22+
allow(manager).to receive(:time_in_milliseconds).and_return(10_000)
23+
allow(manager).to receive(:system).and_return(true)
24+
end
25+
26+
describe "#execute_interval" do
27+
it "executes scripts when interval has elapsed" do
28+
script = schedules[0]
29+
manager.instance_variable_set(:@last_executions, { script[:path] => 0 })
30+
allow(manager).to receive(:time_in_milliseconds).and_return(600_000)
31+
32+
expect { manager.send(:execute_interval, script) }.to(change do
33+
manager.instance_variable_get(:@last_executions)[script[:path]]
34+
end)
35+
end
36+
37+
it "does not execute script if interval has not elapsed" do
38+
script = schedules[0]
39+
manager.instance_variable_set(:@last_executions, { script[:path] => 0 })
40+
allow(manager).to receive(:time_in_milliseconds).and_return(10_000)
41+
42+
expect { manager.send(:execute_interval, script) }.not_to(change do
43+
manager.instance_variable_get(:@last_executions)[script[:path]]
44+
end)
45+
end
46+
end
47+
48+
describe "#execute_time" do
49+
it "executes scripts at exact time" do
50+
script = schedules[2]
51+
allow(manager).to receive(:current_time).and_return("00:00")
52+
53+
expect { manager.send(:execute_time, script) }.to(change do
54+
manager.instance_variable_get(:@last_executions)[script[:path]]
55+
end)
56+
end
57+
end
58+
59+
describe "#execute_day" do
60+
it "executes scripts at specific time and day" do
61+
script = schedules[3]
62+
allow(manager).to receive(:current_time).and_return("12:40")
63+
allow(manager).to receive(:current_day).and_return("Monday")
64+
65+
expect { manager.send(:execute_day, script) }.to(change do
66+
manager.instance_variable_get(:@last_executions)[script[:path]]
67+
end)
68+
end
69+
70+
it "does not execute script if time is correct but the day is incorrect" do
71+
script = schedules[3]
72+
allow(manager).to receive(:current_time).and_return("12:40")
73+
allow(manager).to receive(:current_day).and_return("Tuesday")
74+
75+
expect { manager.send(:execute_day, script) }.not_to(change do
76+
manager.instance_variable_get(:@last_executions)[script[:path]]
77+
end)
78+
end
79+
end
80+
end

spec/bas/orchestrator_spec.rb

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# frozen_string_literal: true
2+
3+
require "spec_helper"
4+
require "bas/orchestrator"
5+
6+
RSpec.describe Bas::Orchestrator do
7+
let(:schedules) do
8+
[
9+
{ path: "websites_availability/fetch_domain_services_from_notion.rb", interval: 600_000 },
10+
{ path: "websites_availability/notify_domain_availability.rb", interval: 60_000 },
11+
{ path: "websites_availability/garbage_collector.rb", time: ["00:00"] },
12+
{ path: "pto_next_week/fetch_next_week_pto_from_notion.rb", time: ["12:40"], day: ["Monday"] },
13+
{ path: "pto/fetch_pto_from_notion.rb", time: ["13:10"] }
14+
]
15+
end
16+
17+
let(:manager) { instance_double(Bas::Orchestrator::Manager, run: true) }
18+
19+
before do
20+
allow(Bas::Orchestrator::Manager).to receive(:new).with(schedules).and_return(manager)
21+
end
22+
23+
describe ".start" do
24+
it "initializes and runs the manager" do
25+
expect(manager).to receive(:run)
26+
described_class.start(schedules)
27+
end
28+
end
29+
end

0 commit comments

Comments
 (0)