Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
delayed (2.0.1)
delayed (2.0.2)
activerecord (>= 6.0)
concurrent-ruby

Expand Down
31 changes: 13 additions & 18 deletions lib/delayed/helpers/migration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@ def concurrent_index_creation_supported?
end

def upsert_index(*args, **opts)
dir(:up) { _add_or_replace_index(*args, **opts) }
dir(:down) { _drop_index_if_exists(*args, **opts) }
with_retry_loop(**opts) do
dir(:up) { _add_or_replace_index(*args, **opts) }
dir(:down) { _drop_index_if_exists(*args, **opts) }
end
end

def remove_index_if_exists(*args, **opts)
dir(:up) { _drop_index_if_exists(*args, **opts) }
dir(:down) { _add_or_replace_index(*args, **opts) }
with_retry_loop(**opts) do
dir(:up) { _drop_index_if_exists(*args, **opts) }
dir(:down) { _add_or_replace_index(*args, **opts) }
end
end

RETRY_EXCEPTIONS = [
Expand All @@ -48,7 +52,7 @@ def with_retry_loop(wait_timeout: 5.minutes, **opts)
end
end

def with_timeouts(statement_timeout: 1.minute, lock_timeout: 5.seconds)
def with_timeouts(statement_timeout: 1.minute, lock_timeout: 5.seconds, **_opts)
dir(:both) { set_timeouts!(statement_timeout: statement_timeout, lock_timeout: lock_timeout) }
yield
ensure
Expand All @@ -61,24 +65,15 @@ def _add_or_replace_index(table, columns, **opts)
index = _lookup_index(table, columns, **opts)
if index && !_index_matches?(index, **opts)
Delayed.logger.warn("Recreating index #{index.name} (is invalid or does not match desired options)")
_drop_index(table, name: index.name, **opts)
remove_index(table, name: index.name)
end
_add_index(table, columns, **opts) if !index || !_index_matches?(index, **opts)
opts = opts.except(:wait_timeout, :statement_timeout, :lock_timeout)
add_index(table, columns, **opts) if !index || !_index_matches?(index, **opts)
end

def _drop_index_if_exists(table, columns, **opts)
index = _lookup_index(table, columns, **opts)
_drop_index(table, name: index.name, **opts) if index
end

def _add_index(*args, **opts)
index_opts = opts.slice!(:wait_timeout, :statement_timeout, :lock_timeout)
with_retry_loop(**opts) { add_index(*args, **index_opts) }
end

def _drop_index(table, name:, **opts)
opts.slice!(:wait_timeout, :statement_timeout, :lock_timeout)
with_retry_loop(**opts) { remove_index(table, name: name) }
remove_index(table, name: index.name) if index
end

def _lookup_index(table, columns, **opts)
Expand Down
2 changes: 1 addition & 1 deletion lib/delayed/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# frozen_string_literal: true

module Delayed
VERSION = '2.0.1'
VERSION = '2.0.2'
end
51 changes: 33 additions & 18 deletions spec/delayed/helpers/migration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,35 +33,50 @@ def direction.down
migration.migration_start = Delayed::Job.db_time_now
end

describe '#with_retry_loop timeout tracking' do
describe '#upsert_index retry behavior' do
it 'raises exception when wait_timeout is exceeded based on @migration_start' do
# Simulate migration that started 6 minutes ago
migration.migration_start = Delayed::Job.db_time_now - 6.minutes

allow(migration).to receive(:add_index).and_raise(ActiveRecord::LockWaitTimeout)
allow(migration.connection).to receive(:indexes).and_return([])

expect {
migration.with_retry_loop(wait_timeout: 5.minutes) do
raise ActiveRecord::LockWaitTimeout
end
migration.upsert_index(:delayed_jobs, :name, wait_timeout: 5.minutes)
}.to raise_error(ActiveRecord::LockWaitTimeout)
end

it 'continues retrying while within the timeout window' do
call_count = 0
it 're-checks for invalid index and drops it before retrying after timeout' do
add_index_calls = 0
remove_index_calls = 0
lookup_calls = 0

# First retry is within timeout, second exceeds it
allow(Delayed::Job).to receive(:db_time_now).and_return(
migration.migration_start + 4.minutes, # Within timeout
migration.migration_start + 6.minutes, # Exceeds timeout
invalid_opts = ActiveRecord.version >= Gem::Version.new('7.1.0') ? { valid?: false } : { unique: true }
invalid_index = instance_double(
ActiveRecord::ConnectionAdapters::IndexDefinition,
name: 'test_idx',
columns: ['name'],
**invalid_opts,
)

expect {
migration.with_retry_loop(wait_timeout: 5.minutes) do
call_count += 1
raise ActiveRecord::LockWaitTimeout
end
}.to raise_error(ActiveRecord::LockWaitTimeout)
allow(migration.connection).to receive(:indexes) do
lookup_calls += 1
lookup_calls == 2 ? [invalid_index] : []
end

allow(migration).to receive(:add_index) do |*_args|
add_index_calls += 1
raise ActiveRecord::StatementTimeout, 'timeout' if add_index_calls == 1
end

allow(migration).to receive(:remove_index) do |*_args|
remove_index_calls += 1
end

migration.upsert_index(:delayed_jobs, :name, name: 'test_idx', wait_timeout: 5.minutes)

expect(call_count).to eq(2)
expect(lookup_calls).to eq(3)
expect(remove_index_calls).to eq(1)
expect(add_index_calls).to eq(2)
end
end
end