From b3295efeab9167811c961f753be88e3c663d7a98 Mon Sep 17 00:00:00 2001 From: akadusei Date: Wed, 7 Jan 2026 00:47:20 +0000 Subject: [PATCH 01/13] Add `Mel::Postgres` adapter --- .github/workflows/test.yml | 34 ++++- CHANGELOG.md | 4 + README.md | 28 +++- shard.yml | 5 +- spec/setup/worker.cr | 7 +- spec/spec_helper.cr | 1 + src/postgres.cr | 304 +++++++++++++++++++++++++++++++++++++ 7 files changed, 377 insertions(+), 6 deletions(-) create mode 100644 src/postgres.cr diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 59d5eb4..877bf9a 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -39,14 +39,17 @@ jobs: strategy: fail-fast: false matrix: - crystal: ['1.0.0'] + cockroachdb: ['v22.2.0', latest] + crystal: ['1.4.0'] experimental: [false] shard_file: [shard.yml] include: - - crystal: latest + - cockroachdb: latest + crystal: latest experimental: false shard_file: shard.latest.yml - - crystal: nightly + - cockroachdb: latest + crystal: nightly experimental: true shard_file: shard.edge.yml runs-on: ubuntu-latest @@ -74,7 +77,32 @@ jobs: sudo apt -y install redis - name: Start Redis run: sudo systemctl start redis + - name: Install Postgresql + run: | + sudo apt update + sudo apt -y install postgresql + - name: Start Postgresql + run: sudo systemctl start postgresql + - name: Set Postgres password + run: >- + sudo -u postgres psql -c + "ALTER USER postgres WITH PASSWORD 'password';" + - name: Install CockroachDB + run: | + sudo apt update + sudo apt -y install tar wget + sudo mkdir -p /usr/local/lib/cockroach + wget -O cockroachdb.tgz https://binaries.cockroachdb.com/cockroach-${{ matrix.cockroachdb }}.linux-amd64.tgz + tar -xzf cockroachdb.tgz + sudo cp -f cockroach-*/cockroach /usr/local/bin/ + sudo chmod +x /usr/local/bin/cockroach + sudo cp -rf cockroach-*/lib/* /usr/local/lib/cockroach/ + working-directory: /tmp + - name: Start CockroachDB + run: cockroach start-single-node --insecure --listen-addr=localhost:36257 --sql-addr=localhost:26257 --background - name: Run tests env: + COCKROACH_URL: postgres://root@localhost:26257/mel_spec?sslmode=disable + POSTGRES_URL: postgres://postgres:password@localhost:5432/mel_spec REDIS_URL: redis://localhost:6379/0 run: crystal spec --error-on-warnings -Dpreview_mt diff --git a/CHANGELOG.md b/CHANGELOG.md index 3d5efd4..ed763c5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,8 +8,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] - ### Added +- Add `Mel::Postgres` adapter - Add `#be_enqueued` spec expectation +### Changed +- Bump minimum required Crystal version to 1.4 + ### Changed - Move Lua script into its own file diff --git a/README.md b/README.md index 75c631a..81206a5 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,8 @@ This makes the storage backend the *source of truth* for schedules, allowing to github: GrottoPress/mel #redis: # Uncomment if using the Redis backend # github: jgaskins/redis + #pg: # Uncomment if using the Posgres backend + # github: will/crystal-pg ``` 1. Run `shards update` @@ -75,6 +77,28 @@ This makes the storage backend the *source of truth* for schedules, allowing to # ... ``` + - Using the Postgres backend + + ```crystal + # ->>> src/app/config.cr + + # ... + + require "mel/postgres" + + Mel.configure do |settings| + # ... + settings.store = Mel::Postgres.new( + "postgres://username:password@localhost:5432/database_name", + namespace: "mel", + setup: true # <= Creates database + ) + # ... + end + + # ... + ``` + - Using the Memory backend (Not for production use) ```crystal @@ -800,8 +824,10 @@ If it is a negative integer `-N` (other than `-1`), the number of due tasks pul Create a `.env.sh` file: ```bash -#!/bin/bash +#!/usr/bin/env bash +export COCKROACH_URL='postgres://root@localhost:26257/mel_spec?sslmode=disable' +export POSTGRES_URL='postgres://postgres:password@localhost:5432/mel_spec' export REDIS_URL='redis://localhost:6379/0' ``` diff --git a/shard.yml b/shard.yml index c6f9387..dfd9b7c 100644 --- a/shard.yml +++ b/shard.yml @@ -8,7 +8,7 @@ authors: license: MIT -crystal: ~> 1.0 +crystal: ~> 1.4 dependencies: cron_parser: @@ -25,6 +25,9 @@ development_dependencies: carbon: github: luckyframework/carbon version: ~> 0.2.0 + pg: + github: will/crystal-pg + version: ~> 0.29.0 redis: github: jgaskins/redis version: ~> 0.8.0 diff --git a/spec/setup/worker.cr b/spec/setup/worker.cr index 1a4be1b..549fc99 100644 --- a/spec/setup/worker.cr +++ b/spec/setup/worker.cr @@ -14,7 +14,12 @@ end Spec.around_each do |spec| next spec.run if all_tags(spec.example).includes?("skip_around_each") - {Mel::Memory.new, Mel::Redis.new(ENV["REDIS_URL"])}.each do |store| + { + Mel::Memory.new, + Mel::Postgres.new(ENV["COCKROACH_URL"], setup: true), + Mel::Postgres.new(ENV["POSTGRES_URL"], setup: true), + Mel::Redis.new(ENV["REDIS_URL"]) + }.each do |store| Mel.settings.store = store tasks.call spec.run diff --git a/spec/spec_helper.cr b/spec/spec_helper.cr index 03734cb..53822c4 100644 --- a/spec/spec_helper.cr +++ b/spec/spec_helper.cr @@ -8,6 +8,7 @@ require "../src/spec" require "./setup/**" require "./support/**" require "../src/carbon" +require "../src/postgres" require "../src/redis" include Carbon::Expectations diff --git a/src/postgres.cr b/src/postgres.cr new file mode 100644 index 0000000..142ef66 --- /dev/null +++ b/src/postgres.cr @@ -0,0 +1,304 @@ +require "pg" + +require "./mel" + +module Mel + struct Postgres + include Store + + getter :client, :progress_table, :tasks_table + + def initialize( + @client : DB::Database, + @namespace : Symbol | String = :mel + ) + @progress_table = "#{@namespace}_progress" + @tasks_table = "#{@namespace}_tasks" + + create_tables + end + + def self.new(url, namespace = :mel, *, setup = false) + create_database(url) if setup + + new DB.open(url), namespace + end + + def self.create_database(url : String) + create_database URI.parse(url) + end + + def self.create_database(url : URI) + db_name = url.path.lchop('/') + + default_url = URI.parse(url.to_s) + default_url.path = "/postgres" + + DB.connect(default_url) do |connection| + create_database(connection, db_name) + end + end + + def self.delete_database(url : String) + delete_database URI.parse(url) + end + + def self.delete_database(url : URI) + db_name = url.path.lchop('/') + + default_url = URI.parse(url.to_s) + default_url.path = "/postgres" + + DB.connect(default_url) do |connection| + delete_database(connection, db_name) + end + end + + def find_due( + at time = Time.local, + count : Int = -1, *, + delete : Bool? = false + ) : Array(String)? + return if count.zero? + + running_select_sql = <<-SQL + SELECT id, data FROM #{tasks_table} WHERE score >= $1 AND score <= $2 + ORDER BY score LIMIT $3 FOR UPDATE SKIP LOCKED; + SQL + + running_update_sql = <<-SQL + UPDATE #{tasks_table} SET score = $1 WHERE id = ANY($2); + SQL + + select_sql = <<-SQL + SELECT data FROM #{tasks_table} WHERE score >= $1 AND score <= $2 + ORDER BY score LIMIT $3; + SQL + + limit = count > 0 ? count : nil + + with_transaction do |connection| + if delete.nil? + values = connection.query_all( + running_select_sql, + orphan_score, + time.to_unix, + limit, + as: {id: String, data: String} + ) + + data = values.map(&.[:data]) + ids = values.map(&.[:id]) + + connection.exec(running_update_sql, running_score, ids) + RunPool.update(ids) + + break data.empty? ? nil : data + end + + data = connection.query_all( + select_sql, + 0, + time.to_unix, + limit, + as: String + ) + + data unless data.empty? + end + end + + def find(count : Int, *, delete : Bool? = false) : Array(String)? + return if count.zero? + + running_select_sql = <<-SQL + SELECT id, data FROM #{tasks_table} WHERE score >= $1 + ORDER BY score LIMIT $2 FOR UPDATE SKIP LOCKED; + SQL + + running_update_sql = <<-SQL + UPDATE #{tasks_table} SET score = $1 WHERE id = ANY($2); + SQL + + select_sql = <<-SQL + SELECT data FROM #{tasks_table} WHERE score >= $1 + ORDER BY score LIMIT $2; + SQL + + limit = count > 0 ? count : nil + + with_transaction do |connection| + if delete.nil? + values = connection.query_all( + running_select_sql, + orphan_score, + limit, + as: {id: String, data: String} + ) + + data = values.map(&.[:data]) + ids = values.map(&.[:id]) + + connection.exec(running_update_sql, running_score, ids) + RunPool.update(ids) + + break data.empty? ? nil : data + end + + data = connection.query_all(select_sql, 0, limit, as: String) + data unless data.empty? + end + end + + def find(ids : Indexable, *, delete : Bool = false) : Array(String)? + return if ids.empty? + + with_transaction do |connection| + data = connection.query_all <<-SQL, ids.map(&.to_s), as: String + SELECT data FROM #{tasks_table} WHERE id = ANY($1); + SQL + + if delete + connection.exec <<-SQL, ids.map(&.to_s) + DELETE FROM #{tasks_table} WHERE id = ANY($1); + SQL + end + + data unless data.empty? + end + end + + def transaction(& : Transaction -> _) + with_transaction do |connection| + yield Transaction.new(self, connection) + end + end + + def truncate + with_connection do |connection| + connection.exec <<-SQL + TRUNCATE TABLE #{tasks_table}; + SQL + end + end + + def get_progress(ids : Indexable) : Array(String)? + return if ids.empty? + + data = with_transaction do |connection| + connection.exec <<-SQL + DELETE FROM #{progress_table} + WHERE expires_at IS NOT NULL AND expires_at <= CURRENT_TIMESTAMP; + SQL + + connection.query_all <<-SQL, ids.map(&.to_s), as: String + SELECT data FROM #{progress_table} + WHERE id = ANY($1) + AND (expires_at IS NULL OR expires_at > CURRENT_TIMESTAMP); + SQL + end + + data.try { |_data| _data unless _data.empty? } + end + + def truncate_progress + with_connection do |connection| + connection.exec <<-SQL + TRUNCATE TABLE #{progress_table}; + SQL + end + end + + private def create_tables + with_transaction do |connection| + connection.exec <<-SQL + CREATE TABLE IF NOT EXISTS #{tasks_table} ( + id TEXT PRIMARY KEY, + data TEXT NOT NULL, + score BIGINT NOT NULL + ); + SQL + + connection.exec <<-SQL + CREATE INDEX IF NOT EXISTS idx_#{tasks_table}_score + ON #{tasks_table} (score); + SQL + + connection.exec <<-SQL + CREATE TABLE IF NOT EXISTS #{progress_table} ( + id TEXT PRIMARY KEY, + data TEXT NOT NULL, + expires_at TIMESTAMP WITH TIME ZONE + ); + SQL + end + end + + private def with_transaction(&) + with_connection do |connection| + connection.transaction { |transaction| yield transaction.connection } + end + end + + private def with_connection(&) + client.retry do + client.using_connection { |connection| yield connection } + end + end + + private def self.create_database(connection, name) + clean_name = PG::EscapeHelper.escape_identifier(name) + + connection.exec <<-SQL + CREATE DATABASE #{clean_name}; + SQL + rescue error : PQ::PQError + message = error.message.to_s + raise error unless message.includes?(%("#{name}" already exists)) + end + + private def self.delete_database(connection, name) + clean_name = PG::EscapeHelper.escape_identifier(name) + + connection.exec <<-SQL + DROP DATABASE IF EXISTS #{clean_name}; + SQL + end + + struct Transaction + include Mel::Transaction + + def initialize(@postgres : Postgres, @connection : DB::Connection) + end + + def create(task : Task) + @connection.exec <<-SQL, task.id, task.to_json, task.time.to_unix + INSERT INTO #{@postgres.tasks_table} (id, data, score) + VALUES ($1, $2, $3) + ON CONFLICT (id) DO NOTHING; + SQL + end + + def update(task : Task) + time = task.retry_time || task.time + + @connection.exec <<-SQL, task.id, task.to_json, time.to_unix + INSERT INTO #{@postgres.tasks_table} (id, data, score) + VALUES ($1, $2, $3) + ON CONFLICT (id) DO UPDATE SET data = $2, score = $3; + SQL + end + + def set_progress(id : String, value : Int, description : String) + report = Progress::Report.new(id, description, value) + expiry = Mel.settings.progress_expiry.try(&.from_now) + + @connection.exec <<-SQL, id, report.to_json, expiry + INSERT INTO #{@postgres.progress_table} (id, data, expires_at) + VALUES ($1, $2, $3) + ON CONFLICT (id) DO UPDATE SET data = $2, expires_at = $3; + SQL + end + end + end +end From 274303bd8ea362c8ee7f4e88e7092e5870d5d653 Mon Sep 17 00:00:00 2001 From: akadusei Date: Wed, 7 Jan 2026 13:34:05 +0000 Subject: [PATCH 02/13] Replace `break` with `next` in block `break` exits the block preventing any code called after the block was yielded to run. --- src/postgres.cr | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/postgres.cr b/src/postgres.cr index 142ef66..fd6531f 100644 --- a/src/postgres.cr +++ b/src/postgres.cr @@ -93,7 +93,7 @@ module Mel connection.exec(running_update_sql, running_score, ids) RunPool.update(ids) - break data.empty? ? nil : data + next data.empty? ? nil : data end data = connection.query_all( @@ -142,7 +142,7 @@ module Mel connection.exec(running_update_sql, running_score, ids) RunPool.update(ids) - break data.empty? ? nil : data + next data.empty? ? nil : data end data = connection.query_all(select_sql, 0, limit, as: String) From d7b1c0a2cf6a79a454bfdf7628a1f88d5f583a48 Mon Sep 17 00:00:00 2001 From: akadusei Date: Wed, 7 Jan 2026 22:09:36 +0000 Subject: [PATCH 03/13] Simplify method --- src/postgres.cr | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/postgres.cr b/src/postgres.cr index fd6531f..b34772e 100644 --- a/src/postgres.cr +++ b/src/postgres.cr @@ -185,20 +185,20 @@ module Mel def get_progress(ids : Indexable) : Array(String)? return if ids.empty? - data = with_transaction do |connection| + with_transaction do |connection| connection.exec <<-SQL DELETE FROM #{progress_table} WHERE expires_at IS NOT NULL AND expires_at <= CURRENT_TIMESTAMP; SQL - connection.query_all <<-SQL, ids.map(&.to_s), as: String + data = connection.query_all <<-SQL, ids.map(&.to_s), as: String SELECT data FROM #{progress_table} WHERE id = ANY($1) AND (expires_at IS NULL OR expires_at > CURRENT_TIMESTAMP); SQL - end - data.try { |_data| _data unless _data.empty? } + data unless data.empty? + end end def truncate_progress From 44e03c21d3081ec68e2e05b3920969705cb5ce26 Mon Sep 17 00:00:00 2001 From: akadusei Date: Wed, 7 Jan 2026 22:47:26 +0000 Subject: [PATCH 04/13] Update running tasks score every poll for Postgres adapter --- src/postgres.cr | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/src/postgres.cr b/src/postgres.cr index b34772e..9fcece7 100644 --- a/src/postgres.cr +++ b/src/postgres.cr @@ -66,10 +66,6 @@ module Mel ORDER BY score LIMIT $3 FOR UPDATE SKIP LOCKED; SQL - running_update_sql = <<-SQL - UPDATE #{tasks_table} SET score = $1 WHERE id = ANY($2); - SQL - select_sql = <<-SQL SELECT data FROM #{tasks_table} WHERE score >= $1 AND score <= $2 ORDER BY score LIMIT $3; @@ -79,6 +75,8 @@ module Mel with_transaction do |connection| if delete.nil? + to_running(connection, RunPool.fetch) + values = connection.query_all( running_select_sql, orphan_score, @@ -90,7 +88,7 @@ module Mel data = values.map(&.[:data]) ids = values.map(&.[:id]) - connection.exec(running_update_sql, running_score, ids) + to_running(connection, ids) RunPool.update(ids) next data.empty? ? nil : data @@ -116,10 +114,6 @@ module Mel ORDER BY score LIMIT $2 FOR UPDATE SKIP LOCKED; SQL - running_update_sql = <<-SQL - UPDATE #{tasks_table} SET score = $1 WHERE id = ANY($2); - SQL - select_sql = <<-SQL SELECT data FROM #{tasks_table} WHERE score >= $1 ORDER BY score LIMIT $2; @@ -129,6 +123,8 @@ module Mel with_transaction do |connection| if delete.nil? + to_running(connection, RunPool.fetch) + values = connection.query_all( running_select_sql, orphan_score, @@ -139,7 +135,7 @@ module Mel data = values.map(&.[:data]) ids = values.map(&.[:id]) - connection.exec(running_update_sql, running_score, ids) + to_running(connection, ids) RunPool.update(ids) next data.empty? ? nil : data @@ -234,6 +230,14 @@ module Mel end end + private def to_running(connection, ids) + sql = <<-SQL + UPDATE #{tasks_table} SET score = $1 WHERE id = ANY($2); + SQL + + connection.exec(sql, running_score, ids.to_a) + end + private def with_transaction(&) with_connection do |connection| connection.transaction { |transaction| yield transaction.connection } From ad0960874b8b2928786a1ef45a37b600d1f623f1 Mon Sep 17 00:00:00 2001 From: akadusei Date: Thu, 8 Jan 2026 14:05:21 +0000 Subject: [PATCH 05/13] Avoid needless database query --- src/postgres.cr | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/postgres.cr b/src/postgres.cr index 9fcece7..3d3265b 100644 --- a/src/postgres.cr +++ b/src/postgres.cr @@ -231,11 +231,11 @@ module Mel end private def to_running(connection, ids) - sql = <<-SQL + return if ids.empty? + + connection.exec <<-SQL, running_score, ids.to_a UPDATE #{tasks_table} SET score = $1 WHERE id = ANY($2); SQL - - connection.exec(sql, running_score, ids.to_a) end private def with_transaction(&) From f8351fedc3ade485a04b2308c479e10e32d2890a Mon Sep 17 00:00:00 2001 From: akadusei Date: Thu, 8 Jan 2026 16:05:09 +0000 Subject: [PATCH 06/13] Simplify methods --- src/postgres.cr | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/src/postgres.cr b/src/postgres.cr index 3d3265b..8016621 100644 --- a/src/postgres.cr +++ b/src/postgres.cr @@ -171,11 +171,9 @@ module Mel end def truncate - with_connection do |connection| - connection.exec <<-SQL - TRUNCATE TABLE #{tasks_table}; - SQL - end + with_connection &.exec <<-SQL + TRUNCATE TABLE #{tasks_table}; + SQL end def get_progress(ids : Indexable) : Array(String)? @@ -198,11 +196,9 @@ module Mel end def truncate_progress - with_connection do |connection| - connection.exec <<-SQL - TRUNCATE TABLE #{progress_table}; - SQL - end + with_connection &.exec <<-SQL + TRUNCATE TABLE #{progress_table}; + SQL end private def create_tables @@ -239,8 +235,8 @@ module Mel end private def with_transaction(&) - with_connection do |connection| - connection.transaction { |transaction| yield transaction.connection } + with_connection &.transaction do |transaction| + yield transaction.connection end end From c6a63e9707c3641c8cb0fc26fe2b4d27a5b08865 Mon Sep 17 00:00:00 2001 From: akadusei Date: Thu, 8 Jan 2026 17:35:35 +0000 Subject: [PATCH 07/13] Rename helper methods - `#orphan_score` -> `#orphan_timestamp` - `#running_score` -> `#running_timestamp` This gives it a more general name as "score" is specific to redis. --- src/mel/memory.cr | 6 +++--- src/mel/run_pool.cr | 6 +++--- src/mel/store.cr | 19 +++++++++++-------- src/postgres.cr | 6 +++--- src/redis.cr | 8 ++++---- 5 files changed, 24 insertions(+), 21 deletions(-) diff --git a/src/mel/memory.cr b/src/mel/memory.cr index 6234282..088d0ac 100644 --- a/src/mel/memory.cr +++ b/src/mel/memory.cr @@ -118,8 +118,8 @@ module Mel private def query(count, delete, time = nil) queue = sorted_queue.select do |_, value| if delete.nil? - next orphan_score <= value <= time.to_unix if time - next value >= orphan_score + next orphan_timestamp <= value <= time.to_unix if time + next value >= orphan_timestamp end next 0 <= value <= time.to_unix if time @@ -138,7 +138,7 @@ module Mel end private def to_running(ids) - ids.each { |id| queue[id] = running_score } + ids.each { |id| queue[id] = running_timestamp } ids end diff --git a/src/mel/run_pool.cr b/src/mel/run_pool.cr index 89b73f3..6a18f38 100644 --- a/src/mel/run_pool.cr +++ b/src/mel/run_pool.cr @@ -1,8 +1,8 @@ module Mel - # This saves a worker's currently running tasks in memory. + # Saves a worker's currently running tasks in memory. # - # Every time a worker polls for tasks, it updates the timestamp (score) of - # its running tasks in the store. The Pool is how it knows which tasks to + # Every time a worker polls for tasks, it updates the timestamp of + # its running tasks in the store. This pool is how it knows which tasks to # update. module RunPool extend self diff --git a/src/mel/store.cr b/src/mel/store.cr index d420401..79b4f0f 100644 --- a/src/mel/store.cr +++ b/src/mel/store.cr @@ -89,19 +89,22 @@ module Mel transaction &.set_progress(id, value, description) end - # We assume a task is orphaned if its score has not been updated after - # 3 polls. - private def orphan_after - poll_interval = Mel.settings.poll_interval - {poll_interval * 3, poll_interval + 1.second}.max + private def orphan_timestamp + -orphan_after.ago.to_unix end - private def running_score + private def running_timestamp -Time.local.to_unix end - private def orphan_score - -orphan_after.ago.to_unix + # A task is assumed to be orphaned if its timestamp has not been updated + # after 3 polls. + # + # This must never be less than 1 second, since timestamps are saved in + # seconds. + private def orphan_after + poll_interval = Mel.settings.poll_interval + {poll_interval * 3, poll_interval + 1.second}.max end end end diff --git a/src/postgres.cr b/src/postgres.cr index 8016621..3705d0a 100644 --- a/src/postgres.cr +++ b/src/postgres.cr @@ -79,7 +79,7 @@ module Mel values = connection.query_all( running_select_sql, - orphan_score, + orphan_timestamp, time.to_unix, limit, as: {id: String, data: String} @@ -127,7 +127,7 @@ module Mel values = connection.query_all( running_select_sql, - orphan_score, + orphan_timestamp, limit, as: {id: String, data: String} ) @@ -229,7 +229,7 @@ module Mel private def to_running(connection, ids) return if ids.empty? - connection.exec <<-SQL, running_score, ids.to_a + connection.exec <<-SQL, running_timestamp, ids.to_a UPDATE #{tasks_table} SET score = $1 WHERE id = ANY($2); SQL end diff --git a/src/redis.cr b/src/redis.cr index e797e08..8eec11f 100644 --- a/src/redis.cr +++ b/src/redis.cr @@ -38,10 +38,10 @@ module Mel if delete.nil? ids = client.eval(LUA, {key.name}, { - orphan_score.to_s, + orphan_timestamp.to_s, time.to_unix.to_s, count.to_s, - running_score.to_s, + running_timestamp.to_s, run_pool_lua }).as(Array) @@ -64,10 +64,10 @@ module Mel if delete.nil? ids = client.eval(LUA, {key.name}, { - orphan_score.to_s, + orphan_timestamp.to_s, "+inf", count.to_s, - running_score.to_s, + running_timestamp.to_s, run_pool_lua }).as(Array) From 4a77429a349d5fb9e0a484a06cc7e72a3fe5a273 Mon Sep 17 00:00:00 2001 From: akadusei Date: Thu, 8 Jan 2026 17:39:44 +0000 Subject: [PATCH 08/13] Rename database column from "score" to "schedule" --- src/postgres.cr | 32 +++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/src/postgres.cr b/src/postgres.cr index 3705d0a..8e26b2a 100644 --- a/src/postgres.cr +++ b/src/postgres.cr @@ -62,13 +62,15 @@ module Mel return if count.zero? running_select_sql = <<-SQL - SELECT id, data FROM #{tasks_table} WHERE score >= $1 AND score <= $2 - ORDER BY score LIMIT $3 FOR UPDATE SKIP LOCKED; + SELECT id, data FROM #{tasks_table} + WHERE schedule >= $1 AND schedule <= $2 + ORDER BY schedule LIMIT $3 FOR UPDATE SKIP LOCKED; SQL select_sql = <<-SQL - SELECT data FROM #{tasks_table} WHERE score >= $1 AND score <= $2 - ORDER BY score LIMIT $3; + SELECT data FROM #{tasks_table} + WHERE schedule >= $1 AND schedule <= $2 + ORDER BY schedule LIMIT $3; SQL limit = count > 0 ? count : nil @@ -110,13 +112,13 @@ module Mel return if count.zero? running_select_sql = <<-SQL - SELECT id, data FROM #{tasks_table} WHERE score >= $1 - ORDER BY score LIMIT $2 FOR UPDATE SKIP LOCKED; + SELECT id, data FROM #{tasks_table} WHERE schedule >= $1 + ORDER BY schedule LIMIT $2 FOR UPDATE SKIP LOCKED; SQL select_sql = <<-SQL - SELECT data FROM #{tasks_table} WHERE score >= $1 - ORDER BY score LIMIT $2; + SELECT data FROM #{tasks_table} WHERE schedule >= $1 + ORDER BY schedule LIMIT $2; SQL limit = count > 0 ? count : nil @@ -207,13 +209,13 @@ module Mel CREATE TABLE IF NOT EXISTS #{tasks_table} ( id TEXT PRIMARY KEY, data TEXT NOT NULL, - score BIGINT NOT NULL + schedule BIGINT NOT NULL ); SQL connection.exec <<-SQL - CREATE INDEX IF NOT EXISTS idx_#{tasks_table}_score - ON #{tasks_table} (score); + CREATE INDEX IF NOT EXISTS idx_#{tasks_table}_schedule + ON #{tasks_table} (schedule); SQL connection.exec <<-SQL @@ -230,7 +232,7 @@ module Mel return if ids.empty? connection.exec <<-SQL, running_timestamp, ids.to_a - UPDATE #{tasks_table} SET score = $1 WHERE id = ANY($2); + UPDATE #{tasks_table} SET schedule = $1 WHERE id = ANY($2); SQL end @@ -273,7 +275,7 @@ module Mel def create(task : Task) @connection.exec <<-SQL, task.id, task.to_json, task.time.to_unix - INSERT INTO #{@postgres.tasks_table} (id, data, score) + INSERT INTO #{@postgres.tasks_table} (id, data, schedule) VALUES ($1, $2, $3) ON CONFLICT (id) DO NOTHING; SQL @@ -283,9 +285,9 @@ module Mel time = task.retry_time || task.time @connection.exec <<-SQL, task.id, task.to_json, time.to_unix - INSERT INTO #{@postgres.tasks_table} (id, data, score) + INSERT INTO #{@postgres.tasks_table} (id, data, schedule) VALUES ($1, $2, $3) - ON CONFLICT (id) DO UPDATE SET data = $2, score = $3; + ON CONFLICT (id) DO UPDATE SET data = $2, schedule = $3; SQL end From 000015ba80a0371d734aa01fa8293e68ba6afbfb Mon Sep 17 00:00:00 2001 From: akadusei Date: Thu, 8 Jan 2026 22:36:00 +0000 Subject: [PATCH 09/13] Honor `delete` when querying tasks in `Mel::Postgres` adapter --- src/postgres.cr | 65 ++++++++++++++++++++++++------------------------- 1 file changed, 32 insertions(+), 33 deletions(-) diff --git a/src/postgres.cr b/src/postgres.cr index 8e26b2a..925aecf 100644 --- a/src/postgres.cr +++ b/src/postgres.cr @@ -61,49 +61,41 @@ module Mel ) : Array(String)? return if count.zero? - running_select_sql = <<-SQL + sql = <<-SQL SELECT id, data FROM #{tasks_table} WHERE schedule >= $1 AND schedule <= $2 ORDER BY schedule LIMIT $3 FOR UPDATE SKIP LOCKED; SQL - select_sql = <<-SQL - SELECT data FROM #{tasks_table} - WHERE schedule >= $1 AND schedule <= $2 - ORDER BY schedule LIMIT $3; - SQL - limit = count > 0 ? count : nil with_transaction do |connection| if delete.nil? to_running(connection, RunPool.fetch) - values = connection.query_all( - running_select_sql, + ids, data = unpack connection.query_all( + sql, orphan_timestamp, time.to_unix, limit, as: {id: String, data: String} ) - data = values.map(&.[:data]) - ids = values.map(&.[:id]) - to_running(connection, ids) RunPool.update(ids) next data.empty? ? nil : data end - data = connection.query_all( - select_sql, + ids, data = unpack connection.query_all( + sql, 0, time.to_unix, limit, - as: String + as: {id: String, data: String} ) + delete_tasks(connection, ids) if delete data unless data.empty? end end @@ -111,39 +103,38 @@ module Mel def find(count : Int, *, delete : Bool? = false) : Array(String)? return if count.zero? - running_select_sql = <<-SQL + sql = <<-SQL SELECT id, data FROM #{tasks_table} WHERE schedule >= $1 ORDER BY schedule LIMIT $2 FOR UPDATE SKIP LOCKED; SQL - select_sql = <<-SQL - SELECT data FROM #{tasks_table} WHERE schedule >= $1 - ORDER BY schedule LIMIT $2; - SQL - limit = count > 0 ? count : nil with_transaction do |connection| if delete.nil? to_running(connection, RunPool.fetch) - values = connection.query_all( - running_select_sql, + ids, data = unpack connection.query_all( + sql, orphan_timestamp, limit, as: {id: String, data: String} ) - data = values.map(&.[:data]) - ids = values.map(&.[:id]) - to_running(connection, ids) RunPool.update(ids) next data.empty? ? nil : data end - data = connection.query_all(select_sql, 0, limit, as: String) + ids, data = unpack connection.query_all( + sql, + 0, + limit, + as: {id: String, data: String} + ) + + delete_tasks(connection, ids) if delete data unless data.empty? end end @@ -156,12 +147,7 @@ module Mel SELECT data FROM #{tasks_table} WHERE id = ANY($1); SQL - if delete - connection.exec <<-SQL, ids.map(&.to_s) - DELETE FROM #{tasks_table} WHERE id = ANY($1); - SQL - end - + delete_tasks(connection, ids) if delete data unless data.empty? end end @@ -228,6 +214,12 @@ module Mel end end + private def delete_tasks(connection, ids) + connection.exec <<-SQL, ids.map(&.to_s) + DELETE FROM #{tasks_table} WHERE id = ANY($1); + SQL + end + private def to_running(connection, ids) return if ids.empty? @@ -248,6 +240,13 @@ module Mel end end + private def unpack(values) + ids = values.map(&.[:id]) + data = values.map(&.[:data]) + + {ids, data} + end + private def self.create_database(connection, name) clean_name = PG::EscapeHelper.escape_identifier(name) From 1b381bf5e4f7a3a2fa8821bc6fce6bcb752626eb Mon Sep 17 00:00:00 2001 From: akadusei Date: Thu, 8 Jan 2026 23:46:44 +0000 Subject: [PATCH 10/13] Remove `setup` parameter from `Mel::Postgres.new` --- README.md | 11 ++++++----- spec/setup/worker.cr | 7 +++++-- spec/spec_helper.cr | 4 ++-- src/postgres.cr | 4 +--- 4 files changed, 14 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 81206a5..ca38b5b 100644 --- a/README.md +++ b/README.md @@ -86,13 +86,14 @@ This makes the storage backend the *source of truth* for schedules, allowing to require "mel/postgres" + db_url = "postgres://username:password@localhost:5432/database_name" + + # Uncomment to create database + #Mel::Postgres.create_database(db_url) + Mel.configure do |settings| # ... - settings.store = Mel::Postgres.new( - "postgres://username:password@localhost:5432/database_name", - namespace: "mel", - setup: true # <= Creates database - ) + settings.store = Mel::Postgres.new(db_url, namespace: "mel") # ... end diff --git a/spec/setup/worker.cr b/spec/setup/worker.cr index 549fc99..4826d1b 100644 --- a/spec/setup/worker.cr +++ b/spec/setup/worker.cr @@ -4,6 +4,9 @@ Mel.configure do |settings| settings.timezone = Time::Location.load("America/Los_Angeles") end +Mel::Postgres.create_database(ENV["COCKROACH_URL"]) +Mel::Postgres.create_database(ENV["POSTGRES_URL"]) + tasks = ->do Mel.stop Mel::RunPool.delete @@ -16,8 +19,8 @@ Spec.around_each do |spec| { Mel::Memory.new, - Mel::Postgres.new(ENV["COCKROACH_URL"], setup: true), - Mel::Postgres.new(ENV["POSTGRES_URL"], setup: true), + Mel::Postgres.new(ENV["COCKROACH_URL"]), + Mel::Postgres.new(ENV["POSTGRES_URL"]), Mel::Redis.new(ENV["REDIS_URL"]) }.each do |store| Mel.settings.store = store diff --git a/spec/spec_helper.cr b/spec/spec_helper.cr index 53822c4..4945de0 100644 --- a/spec/spec_helper.cr +++ b/spec/spec_helper.cr @@ -4,11 +4,11 @@ require "log/spec" require "carbon" require "timecop" +require "../src/redis" +require "../src/postgres" require "../src/spec" require "./setup/**" require "./support/**" require "../src/carbon" -require "../src/postgres" -require "../src/redis" include Carbon::Expectations diff --git a/src/postgres.cr b/src/postgres.cr index 925aecf..52ccb1b 100644 --- a/src/postgres.cr +++ b/src/postgres.cr @@ -18,9 +18,7 @@ module Mel create_tables end - def self.new(url, namespace = :mel, *, setup = false) - create_database(url) if setup - + def self.new(url, namespace = :mel) new DB.open(url), namespace end From ce3e4f7796ea60790285bae6dd08632c8fd15eec Mon Sep 17 00:00:00 2001 From: akadusei Date: Fri, 9 Jan 2026 01:05:17 +0000 Subject: [PATCH 11/13] Split `Mel::Postgres#find*` methods into multiple helpers --- src/postgres.cr | 223 +++++++++++++++++++++++++++++++----------------- 1 file changed, 143 insertions(+), 80 deletions(-) diff --git a/src/postgres.cr b/src/postgres.cr index 52ccb1b..ce51fed 100644 --- a/src/postgres.cr +++ b/src/postgres.cr @@ -59,82 +59,15 @@ module Mel ) : Array(String)? return if count.zero? - sql = <<-SQL - SELECT id, data FROM #{tasks_table} - WHERE schedule >= $1 AND schedule <= $2 - ORDER BY schedule LIMIT $3 FOR UPDATE SKIP LOCKED; - SQL - - limit = count > 0 ? count : nil - - with_transaction do |connection| - if delete.nil? - to_running(connection, RunPool.fetch) - - ids, data = unpack connection.query_all( - sql, - orphan_timestamp, - time.to_unix, - limit, - as: {id: String, data: String} - ) - - to_running(connection, ids) - RunPool.update(ids) - - next data.empty? ? nil : data - end - - ids, data = unpack connection.query_all( - sql, - 0, - time.to_unix, - limit, - as: {id: String, data: String} - ) - - delete_tasks(connection, ids) if delete - data unless data.empty? - end + return find_due_update(time, count) if delete.nil? + delete ? find_due_delete(time, count) : find_due_no_delete(time, count) end def find(count : Int, *, delete : Bool? = false) : Array(String)? return if count.zero? - sql = <<-SQL - SELECT id, data FROM #{tasks_table} WHERE schedule >= $1 - ORDER BY schedule LIMIT $2 FOR UPDATE SKIP LOCKED; - SQL - - limit = count > 0 ? count : nil - - with_transaction do |connection| - if delete.nil? - to_running(connection, RunPool.fetch) - - ids, data = unpack connection.query_all( - sql, - orphan_timestamp, - limit, - as: {id: String, data: String} - ) - - to_running(connection, ids) - RunPool.update(ids) - - next data.empty? ? nil : data - end - - ids, data = unpack connection.query_all( - sql, - 0, - limit, - as: {id: String, data: String} - ) - - delete_tasks(connection, ids) if delete - data unless data.empty? - end + return find_update(count) if delete.nil? + delete ? find_delete(count) : find_no_delete(count) end def find(ids : Indexable, *, delete : Bool = false) : Array(String)? @@ -145,7 +78,7 @@ module Mel SELECT data FROM #{tasks_table} WHERE id = ANY($1); SQL - delete_tasks(connection, ids) if delete + delete(connection, ids) if delete data unless data.empty? end end @@ -212,7 +145,133 @@ module Mel end end - private def delete_tasks(connection, ids) + private def find_due_delete(time, count) + sql = <<-SQL + SELECT id, data FROM #{tasks_table} + WHERE schedule >= $1 AND schedule <= $2 + ORDER BY schedule LIMIT $3 FOR UPDATE SKIP LOCKED; + SQL + + with_transaction do |connection| + ids, data = unpack connection.query_all( + sql, + 0, + time.to_unix, + limit(count), + as: {id: String, data: String} + ) + + delete(connection, ids) + data unless data.empty? + end + end + + private def find_due_no_delete(time, count) + sql = <<-SQL + SELECT data FROM #{tasks_table} + WHERE schedule >= $1 AND schedule <= $2 + ORDER BY schedule LIMIT $3; + SQL + + with_transaction do |connection| + data = connection.query_all( + sql, + 0, + time.to_unix, + limit(count), + as: String + ) + + data unless data.empty? + end + end + + private def find_due_update(time, count) + sql = <<-SQL + SELECT id, data FROM #{tasks_table} + WHERE schedule >= $1 AND schedule <= $2 + ORDER BY schedule LIMIT $3 FOR UPDATE SKIP LOCKED; + SQL + + with_transaction do |connection| + to_running(connection, RunPool.fetch) + + ids, data = unpack connection.query_all( + sql, + orphan_timestamp, + time.to_unix, + limit(count), + as: {id: String, data: String} + ) + + to_running(connection, ids) + RunPool.update(ids) + + data unless data.empty? + end + end + + private def find_delete(count) + sql = <<-SQL + SELECT id, data FROM #{tasks_table} WHERE schedule >= $1 + ORDER BY schedule LIMIT $2 FOR UPDATE SKIP LOCKED; + SQL + + with_transaction do |connection| + ids, data = unpack connection.query_all( + sql, + 0, + limit(count), + as: {id: String, data: String} + ) + + delete(connection, ids) + data unless data.empty? + end + end + + private def find_no_delete(count) + sql = <<-SQL + SELECT data FROM #{tasks_table} WHERE schedule >= $1 + ORDER BY schedule LIMIT $2; + SQL + + with_transaction do |connection| + data = connection.query_all( + sql, + 0, + limit(count), + as: String + ) + + data unless data.empty? + end + end + + private def find_update(count) + sql = <<-SQL + SELECT id, data FROM #{tasks_table} WHERE schedule >= $1 + ORDER BY schedule LIMIT $2 FOR UPDATE SKIP LOCKED; + SQL + + with_transaction do |connection| + to_running(connection, RunPool.fetch) + + ids, data = unpack connection.query_all( + sql, + orphan_timestamp, + limit(count), + as: {id: String, data: String} + ) + + to_running(connection, ids) + RunPool.update(ids) + + data unless data.empty? + end + end + + private def delete(connection, ids) connection.exec <<-SQL, ids.map(&.to_s) DELETE FROM #{tasks_table} WHERE id = ANY($1); SQL @@ -226,6 +285,17 @@ module Mel SQL end + private def limit(count) + count > 0 ? count : nil + end + + private def unpack(values) + ids = values.map(&.[:id]) + data = values.map(&.[:data]) + + {ids, data} + end + private def with_transaction(&) with_connection &.transaction do |transaction| yield transaction.connection @@ -238,13 +308,6 @@ module Mel end end - private def unpack(values) - ids = values.map(&.[:id]) - data = values.map(&.[:data]) - - {ids, data} - end - private def self.create_database(connection, name) clean_name = PG::EscapeHelper.escape_identifier(name) From 7342af3dd705ce3afc1b1dc413c53b0372bc17b4 Mon Sep 17 00:00:00 2001 From: akadusei Date: Fri, 9 Jan 2026 11:14:44 +0000 Subject: [PATCH 12/13] Rename helper methods --- src/postgres.cr | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/src/postgres.cr b/src/postgres.cr index ce51fed..5d863ec 100644 --- a/src/postgres.cr +++ b/src/postgres.cr @@ -59,15 +59,18 @@ module Mel ) : Array(String)? return if count.zero? - return find_due_update(time, count) if delete.nil? - delete ? find_due_delete(time, count) : find_due_no_delete(time, count) + return find_due_delete_nil(time, count) if delete.nil? + + delete ? + find_due_delete_true(time, count) : + find_due_delete_false(time, count) end def find(count : Int, *, delete : Bool? = false) : Array(String)? return if count.zero? - return find_update(count) if delete.nil? - delete ? find_delete(count) : find_no_delete(count) + return find_delete_nil(count) if delete.nil? + delete ? find_delete_true(count) : find_delete_false(count) end def find(ids : Indexable, *, delete : Bool = false) : Array(String)? @@ -145,7 +148,7 @@ module Mel end end - private def find_due_delete(time, count) + private def find_due_delete_true(time, count) sql = <<-SQL SELECT id, data FROM #{tasks_table} WHERE schedule >= $1 AND schedule <= $2 @@ -166,7 +169,7 @@ module Mel end end - private def find_due_no_delete(time, count) + private def find_due_delete_false(time, count) sql = <<-SQL SELECT data FROM #{tasks_table} WHERE schedule >= $1 AND schedule <= $2 @@ -186,7 +189,7 @@ module Mel end end - private def find_due_update(time, count) + private def find_due_delete_nil(time, count) sql = <<-SQL SELECT id, data FROM #{tasks_table} WHERE schedule >= $1 AND schedule <= $2 @@ -211,7 +214,7 @@ module Mel end end - private def find_delete(count) + private def find_delete_true(count) sql = <<-SQL SELECT id, data FROM #{tasks_table} WHERE schedule >= $1 ORDER BY schedule LIMIT $2 FOR UPDATE SKIP LOCKED; @@ -230,7 +233,7 @@ module Mel end end - private def find_no_delete(count) + private def find_delete_false(count) sql = <<-SQL SELECT data FROM #{tasks_table} WHERE schedule >= $1 ORDER BY schedule LIMIT $2; @@ -248,7 +251,7 @@ module Mel end end - private def find_update(count) + private def find_delete_nil(count) sql = <<-SQL SELECT id, data FROM #{tasks_table} WHERE schedule >= $1 ORDER BY schedule LIMIT $2 FOR UPDATE SKIP LOCKED; From 1e62afe504b84bdcc337212d38b92359f3540943 Mon Sep 17 00:00:00 2001 From: akadusei Date: Fri, 9 Jan 2026 14:28:24 +0000 Subject: [PATCH 13/13] Fix spec failures in CockroachDB ``` Failures: 1) Mel::Task #run does not retry beyond current schedule window Failure/Error: Mel::PeriodicTask.find(id).try(&.attempts).should eq(0) Expected: 0 got: 1 Error: # spec/mel/task_spec.cr:62 2) Mel::Task #run retries beyond current schedule window if task not rescheduled Failure/Error: Mel::PeriodicTask.find(id).try(&.attempts).should eq(2) Expected: 2 got: 1 Error: # spec/mel/task_spec.cr:84 3) Mel::Task #run deletes tasks from env after completion Failure/Error: Mel::RunPool.fetch.should_not be_empty Expected: Set{} not to be empty Error: # spec/mel/task_spec.cr:107 4) Mel::Task .find_due skips running jobs Failure/Error: Mel::InstantTask.find_due(count: -1, delete: nil).try(&.size).should eq(1) Expected: 1 got: nil Error: # spec/mel/task_spec.cr:214 Finished in 36.32 seconds 352 examples, 4 failures, 0 errors, 0 pending Failed examples: crystal spec spec/mel/task_spec.cr:49 # Mel::Task #run does not retry beyond current schedule window crystal spec spec/mel/task_spec.cr:66 # Mel::Task #run retries beyond current schedule window if task not rescheduled crystal spec spec/mel/task_spec.cr:97 # Mel::Task #run deletes tasks from env after completion crystal spec spec/mel/task_spec.cr:211 # Mel::Task .find_due skips running jobs Error: Process completed with exit code 1. ``` See . --- src/postgres.cr | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/postgres.cr b/src/postgres.cr index 5d863ec..298baff 100644 --- a/src/postgres.cr +++ b/src/postgres.cr @@ -152,7 +152,7 @@ module Mel sql = <<-SQL SELECT id, data FROM #{tasks_table} WHERE schedule >= $1 AND schedule <= $2 - ORDER BY schedule LIMIT $3 FOR UPDATE SKIP LOCKED; + ORDER BY schedule LIMIT $3 FOR UPDATE; SQL with_transaction do |connection| @@ -193,7 +193,7 @@ module Mel sql = <<-SQL SELECT id, data FROM #{tasks_table} WHERE schedule >= $1 AND schedule <= $2 - ORDER BY schedule LIMIT $3 FOR UPDATE SKIP LOCKED; + ORDER BY schedule LIMIT $3 FOR UPDATE; SQL with_transaction do |connection| @@ -217,7 +217,7 @@ module Mel private def find_delete_true(count) sql = <<-SQL SELECT id, data FROM #{tasks_table} WHERE schedule >= $1 - ORDER BY schedule LIMIT $2 FOR UPDATE SKIP LOCKED; + ORDER BY schedule LIMIT $2 FOR UPDATE; SQL with_transaction do |connection| @@ -254,7 +254,7 @@ module Mel private def find_delete_nil(count) sql = <<-SQL SELECT id, data FROM #{tasks_table} WHERE schedule >= $1 - ORDER BY schedule LIMIT $2 FOR UPDATE SKIP LOCKED; + ORDER BY schedule LIMIT $2 FOR UPDATE; SQL with_transaction do |connection|