diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 59d5eb4..877bf9a 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -39,14 +39,17 @@ jobs: strategy: fail-fast: false matrix: - crystal: ['1.0.0'] + cockroachdb: ['v22.2.0', latest] + crystal: ['1.4.0'] experimental: [false] shard_file: [shard.yml] include: - - crystal: latest + - cockroachdb: latest + crystal: latest experimental: false shard_file: shard.latest.yml - - crystal: nightly + - cockroachdb: latest + crystal: nightly experimental: true shard_file: shard.edge.yml runs-on: ubuntu-latest @@ -74,7 +77,32 @@ jobs: sudo apt -y install redis - name: Start Redis run: sudo systemctl start redis + - name: Install Postgresql + run: | + sudo apt update + sudo apt -y install postgresql + - name: Start Postgresql + run: sudo systemctl start postgresql + - name: Set Postgres password + run: >- + sudo -u postgres psql -c + "ALTER USER postgres WITH PASSWORD 'password';" + - name: Install CockroachDB + run: | + sudo apt update + sudo apt -y install tar wget + sudo mkdir -p /usr/local/lib/cockroach + wget -O cockroachdb.tgz https://binaries.cockroachdb.com/cockroach-${{ matrix.cockroachdb }}.linux-amd64.tgz + tar -xzf cockroachdb.tgz + sudo cp -f cockroach-*/cockroach /usr/local/bin/ + sudo chmod +x /usr/local/bin/cockroach + sudo cp -rf cockroach-*/lib/* /usr/local/lib/cockroach/ + working-directory: /tmp + - name: Start CockroachDB + run: cockroach start-single-node --insecure --listen-addr=localhost:36257 --sql-addr=localhost:26257 --background - name: Run tests env: + COCKROACH_URL: postgres://root@localhost:26257/mel_spec?sslmode=disable + POSTGRES_URL: postgres://postgres:password@localhost:5432/mel_spec REDIS_URL: redis://localhost:6379/0 run: crystal spec --error-on-warnings -Dpreview_mt diff --git a/CHANGELOG.md b/CHANGELOG.md index 3d5efd4..ed763c5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,8 +8,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] - ### Added +- Add `Mel::Postgres` adapter - Add `#be_enqueued` spec expectation +### Changed +- Bump minimum required Crystal version to 1.4 + ### Changed - Move Lua script into its own file diff --git a/README.md b/README.md index 75c631a..ca38b5b 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,8 @@ This makes the storage backend the *source of truth* for schedules, allowing to github: GrottoPress/mel #redis: # Uncomment if using the Redis backend # github: jgaskins/redis + #pg: # Uncomment if using the Posgres backend + # github: will/crystal-pg ``` 1. Run `shards update` @@ -75,6 +77,29 @@ This makes the storage backend the *source of truth* for schedules, allowing to # ... ``` + - Using the Postgres backend + + ```crystal + # ->>> src/app/config.cr + + # ... + + require "mel/postgres" + + db_url = "postgres://username:password@localhost:5432/database_name" + + # Uncomment to create database + #Mel::Postgres.create_database(db_url) + + Mel.configure do |settings| + # ... + settings.store = Mel::Postgres.new(db_url, namespace: "mel") + # ... + end + + # ... + ``` + - Using the Memory backend (Not for production use) ```crystal @@ -800,8 +825,10 @@ If it is a negative integer `-N` (other than `-1`), the number of due tasks pul Create a `.env.sh` file: ```bash -#!/bin/bash +#!/usr/bin/env bash +export COCKROACH_URL='postgres://root@localhost:26257/mel_spec?sslmode=disable' +export POSTGRES_URL='postgres://postgres:password@localhost:5432/mel_spec' export REDIS_URL='redis://localhost:6379/0' ``` diff --git a/shard.yml b/shard.yml index c6f9387..dfd9b7c 100644 --- a/shard.yml +++ b/shard.yml @@ -8,7 +8,7 @@ authors: license: MIT -crystal: ~> 1.0 +crystal: ~> 1.4 dependencies: cron_parser: @@ -25,6 +25,9 @@ development_dependencies: carbon: github: luckyframework/carbon version: ~> 0.2.0 + pg: + github: will/crystal-pg + version: ~> 0.29.0 redis: github: jgaskins/redis version: ~> 0.8.0 diff --git a/spec/setup/worker.cr b/spec/setup/worker.cr index 1a4be1b..4826d1b 100644 --- a/spec/setup/worker.cr +++ b/spec/setup/worker.cr @@ -4,6 +4,9 @@ Mel.configure do |settings| settings.timezone = Time::Location.load("America/Los_Angeles") end +Mel::Postgres.create_database(ENV["COCKROACH_URL"]) +Mel::Postgres.create_database(ENV["POSTGRES_URL"]) + tasks = ->do Mel.stop Mel::RunPool.delete @@ -14,7 +17,12 @@ end Spec.around_each do |spec| next spec.run if all_tags(spec.example).includes?("skip_around_each") - {Mel::Memory.new, Mel::Redis.new(ENV["REDIS_URL"])}.each do |store| + { + Mel::Memory.new, + Mel::Postgres.new(ENV["COCKROACH_URL"]), + Mel::Postgres.new(ENV["POSTGRES_URL"]), + Mel::Redis.new(ENV["REDIS_URL"]) + }.each do |store| Mel.settings.store = store tasks.call spec.run diff --git a/spec/spec_helper.cr b/spec/spec_helper.cr index 03734cb..4945de0 100644 --- a/spec/spec_helper.cr +++ b/spec/spec_helper.cr @@ -4,10 +4,11 @@ require "log/spec" require "carbon" require "timecop" +require "../src/redis" +require "../src/postgres" require "../src/spec" require "./setup/**" require "./support/**" require "../src/carbon" -require "../src/redis" include Carbon::Expectations diff --git a/src/mel/memory.cr b/src/mel/memory.cr index 6234282..088d0ac 100644 --- a/src/mel/memory.cr +++ b/src/mel/memory.cr @@ -118,8 +118,8 @@ module Mel private def query(count, delete, time = nil) queue = sorted_queue.select do |_, value| if delete.nil? - next orphan_score <= value <= time.to_unix if time - next value >= orphan_score + next orphan_timestamp <= value <= time.to_unix if time + next value >= orphan_timestamp end next 0 <= value <= time.to_unix if time @@ -138,7 +138,7 @@ module Mel end private def to_running(ids) - ids.each { |id| queue[id] = running_score } + ids.each { |id| queue[id] = running_timestamp } ids end diff --git a/src/mel/run_pool.cr b/src/mel/run_pool.cr index 89b73f3..6a18f38 100644 --- a/src/mel/run_pool.cr +++ b/src/mel/run_pool.cr @@ -1,8 +1,8 @@ module Mel - # This saves a worker's currently running tasks in memory. + # Saves a worker's currently running tasks in memory. # - # Every time a worker polls for tasks, it updates the timestamp (score) of - # its running tasks in the store. The Pool is how it knows which tasks to + # Every time a worker polls for tasks, it updates the timestamp of + # its running tasks in the store. This pool is how it knows which tasks to # update. module RunPool extend self diff --git a/src/mel/store.cr b/src/mel/store.cr index d420401..79b4f0f 100644 --- a/src/mel/store.cr +++ b/src/mel/store.cr @@ -89,19 +89,22 @@ module Mel transaction &.set_progress(id, value, description) end - # We assume a task is orphaned if its score has not been updated after - # 3 polls. - private def orphan_after - poll_interval = Mel.settings.poll_interval - {poll_interval * 3, poll_interval + 1.second}.max + private def orphan_timestamp + -orphan_after.ago.to_unix end - private def running_score + private def running_timestamp -Time.local.to_unix end - private def orphan_score - -orphan_after.ago.to_unix + # A task is assumed to be orphaned if its timestamp has not been updated + # after 3 polls. + # + # This must never be less than 1 second, since timestamps are saved in + # seconds. + private def orphan_after + poll_interval = Mel.settings.poll_interval + {poll_interval * 3, poll_interval + 1.second}.max end end end diff --git a/src/postgres.cr b/src/postgres.cr new file mode 100644 index 0000000..298baff --- /dev/null +++ b/src/postgres.cr @@ -0,0 +1,369 @@ +require "pg" + +require "./mel" + +module Mel + struct Postgres + include Store + + getter :client, :progress_table, :tasks_table + + def initialize( + @client : DB::Database, + @namespace : Symbol | String = :mel + ) + @progress_table = "#{@namespace}_progress" + @tasks_table = "#{@namespace}_tasks" + + create_tables + end + + def self.new(url, namespace = :mel) + new DB.open(url), namespace + end + + def self.create_database(url : String) + create_database URI.parse(url) + end + + def self.create_database(url : URI) + db_name = url.path.lchop('/') + + default_url = URI.parse(url.to_s) + default_url.path = "/postgres" + + DB.connect(default_url) do |connection| + create_database(connection, db_name) + end + end + + def self.delete_database(url : String) + delete_database URI.parse(url) + end + + def self.delete_database(url : URI) + db_name = url.path.lchop('/') + + default_url = URI.parse(url.to_s) + default_url.path = "/postgres" + + DB.connect(default_url) do |connection| + delete_database(connection, db_name) + end + end + + def find_due( + at time = Time.local, + count : Int = -1, *, + delete : Bool? = false + ) : Array(String)? + return if count.zero? + + return find_due_delete_nil(time, count) if delete.nil? + + delete ? + find_due_delete_true(time, count) : + find_due_delete_false(time, count) + end + + def find(count : Int, *, delete : Bool? = false) : Array(String)? + return if count.zero? + + return find_delete_nil(count) if delete.nil? + delete ? find_delete_true(count) : find_delete_false(count) + end + + def find(ids : Indexable, *, delete : Bool = false) : Array(String)? + return if ids.empty? + + with_transaction do |connection| + data = connection.query_all <<-SQL, ids.map(&.to_s), as: String + SELECT data FROM #{tasks_table} WHERE id = ANY($1); + SQL + + delete(connection, ids) if delete + data unless data.empty? + end + end + + def transaction(& : Transaction -> _) + with_transaction do |connection| + yield Transaction.new(self, connection) + end + end + + def truncate + with_connection &.exec <<-SQL + TRUNCATE TABLE #{tasks_table}; + SQL + end + + def get_progress(ids : Indexable) : Array(String)? + return if ids.empty? + + with_transaction do |connection| + connection.exec <<-SQL + DELETE FROM #{progress_table} + WHERE expires_at IS NOT NULL AND expires_at <= CURRENT_TIMESTAMP; + SQL + + data = connection.query_all <<-SQL, ids.map(&.to_s), as: String + SELECT data FROM #{progress_table} + WHERE id = ANY($1) + AND (expires_at IS NULL OR expires_at > CURRENT_TIMESTAMP); + SQL + + data unless data.empty? + end + end + + def truncate_progress + with_connection &.exec <<-SQL + TRUNCATE TABLE #{progress_table}; + SQL + end + + private def create_tables + with_transaction do |connection| + connection.exec <<-SQL + CREATE TABLE IF NOT EXISTS #{tasks_table} ( + id TEXT PRIMARY KEY, + data TEXT NOT NULL, + schedule BIGINT NOT NULL + ); + SQL + + connection.exec <<-SQL + CREATE INDEX IF NOT EXISTS idx_#{tasks_table}_schedule + ON #{tasks_table} (schedule); + SQL + + connection.exec <<-SQL + CREATE TABLE IF NOT EXISTS #{progress_table} ( + id TEXT PRIMARY KEY, + data TEXT NOT NULL, + expires_at TIMESTAMP WITH TIME ZONE + ); + SQL + end + end + + private def find_due_delete_true(time, count) + sql = <<-SQL + SELECT id, data FROM #{tasks_table} + WHERE schedule >= $1 AND schedule <= $2 + ORDER BY schedule LIMIT $3 FOR UPDATE; + SQL + + with_transaction do |connection| + ids, data = unpack connection.query_all( + sql, + 0, + time.to_unix, + limit(count), + as: {id: String, data: String} + ) + + delete(connection, ids) + data unless data.empty? + end + end + + private def find_due_delete_false(time, count) + sql = <<-SQL + SELECT data FROM #{tasks_table} + WHERE schedule >= $1 AND schedule <= $2 + ORDER BY schedule LIMIT $3; + SQL + + with_transaction do |connection| + data = connection.query_all( + sql, + 0, + time.to_unix, + limit(count), + as: String + ) + + data unless data.empty? + end + end + + private def find_due_delete_nil(time, count) + sql = <<-SQL + SELECT id, data FROM #{tasks_table} + WHERE schedule >= $1 AND schedule <= $2 + ORDER BY schedule LIMIT $3 FOR UPDATE; + SQL + + with_transaction do |connection| + to_running(connection, RunPool.fetch) + + ids, data = unpack connection.query_all( + sql, + orphan_timestamp, + time.to_unix, + limit(count), + as: {id: String, data: String} + ) + + to_running(connection, ids) + RunPool.update(ids) + + data unless data.empty? + end + end + + private def find_delete_true(count) + sql = <<-SQL + SELECT id, data FROM #{tasks_table} WHERE schedule >= $1 + ORDER BY schedule LIMIT $2 FOR UPDATE; + SQL + + with_transaction do |connection| + ids, data = unpack connection.query_all( + sql, + 0, + limit(count), + as: {id: String, data: String} + ) + + delete(connection, ids) + data unless data.empty? + end + end + + private def find_delete_false(count) + sql = <<-SQL + SELECT data FROM #{tasks_table} WHERE schedule >= $1 + ORDER BY schedule LIMIT $2; + SQL + + with_transaction do |connection| + data = connection.query_all( + sql, + 0, + limit(count), + as: String + ) + + data unless data.empty? + end + end + + private def find_delete_nil(count) + sql = <<-SQL + SELECT id, data FROM #{tasks_table} WHERE schedule >= $1 + ORDER BY schedule LIMIT $2 FOR UPDATE; + SQL + + with_transaction do |connection| + to_running(connection, RunPool.fetch) + + ids, data = unpack connection.query_all( + sql, + orphan_timestamp, + limit(count), + as: {id: String, data: String} + ) + + to_running(connection, ids) + RunPool.update(ids) + + data unless data.empty? + end + end + + private def delete(connection, ids) + connection.exec <<-SQL, ids.map(&.to_s) + DELETE FROM #{tasks_table} WHERE id = ANY($1); + SQL + end + + private def to_running(connection, ids) + return if ids.empty? + + connection.exec <<-SQL, running_timestamp, ids.to_a + UPDATE #{tasks_table} SET schedule = $1 WHERE id = ANY($2); + SQL + end + + private def limit(count) + count > 0 ? count : nil + end + + private def unpack(values) + ids = values.map(&.[:id]) + data = values.map(&.[:data]) + + {ids, data} + end + + private def with_transaction(&) + with_connection &.transaction do |transaction| + yield transaction.connection + end + end + + private def with_connection(&) + client.retry do + client.using_connection { |connection| yield connection } + end + end + + private def self.create_database(connection, name) + clean_name = PG::EscapeHelper.escape_identifier(name) + + connection.exec <<-SQL + CREATE DATABASE #{clean_name}; + SQL + rescue error : PQ::PQError + message = error.message.to_s + raise error unless message.includes?(%("#{name}" already exists)) + end + + private def self.delete_database(connection, name) + clean_name = PG::EscapeHelper.escape_identifier(name) + + connection.exec <<-SQL + DROP DATABASE IF EXISTS #{clean_name}; + SQL + end + + struct Transaction + include Mel::Transaction + + def initialize(@postgres : Postgres, @connection : DB::Connection) + end + + def create(task : Task) + @connection.exec <<-SQL, task.id, task.to_json, task.time.to_unix + INSERT INTO #{@postgres.tasks_table} (id, data, schedule) + VALUES ($1, $2, $3) + ON CONFLICT (id) DO NOTHING; + SQL + end + + def update(task : Task) + time = task.retry_time || task.time + + @connection.exec <<-SQL, task.id, task.to_json, time.to_unix + INSERT INTO #{@postgres.tasks_table} (id, data, schedule) + VALUES ($1, $2, $3) + ON CONFLICT (id) DO UPDATE SET data = $2, schedule = $3; + SQL + end + + def set_progress(id : String, value : Int, description : String) + report = Progress::Report.new(id, description, value) + expiry = Mel.settings.progress_expiry.try(&.from_now) + + @connection.exec <<-SQL, id, report.to_json, expiry + INSERT INTO #{@postgres.progress_table} (id, data, expires_at) + VALUES ($1, $2, $3) + ON CONFLICT (id) DO UPDATE SET data = $2, expires_at = $3; + SQL + end + end + end +end diff --git a/src/redis.cr b/src/redis.cr index e797e08..8eec11f 100644 --- a/src/redis.cr +++ b/src/redis.cr @@ -38,10 +38,10 @@ module Mel if delete.nil? ids = client.eval(LUA, {key.name}, { - orphan_score.to_s, + orphan_timestamp.to_s, time.to_unix.to_s, count.to_s, - running_score.to_s, + running_timestamp.to_s, run_pool_lua }).as(Array) @@ -64,10 +64,10 @@ module Mel if delete.nil? ids = client.eval(LUA, {key.name}, { - orphan_score.to_s, + orphan_timestamp.to_s, "+inf", count.to_s, - running_score.to_s, + running_timestamp.to_s, run_pool_lua }).as(Array)