diff --git a/Gemfile.lock b/Gemfile.lock index bbf9032..299dbc3 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,7 +1,7 @@ PATH remote: . specs: - delayed (2.0.1) + delayed (2.0.2) activerecord (>= 6.0) concurrent-ruby diff --git a/lib/delayed/helpers/migration.rb b/lib/delayed/helpers/migration.rb index f76454a..0df60fe 100644 --- a/lib/delayed/helpers/migration.rb +++ b/lib/delayed/helpers/migration.rb @@ -20,13 +20,17 @@ def concurrent_index_creation_supported? end def upsert_index(*args, **opts) - dir(:up) { _add_or_replace_index(*args, **opts) } - dir(:down) { _drop_index_if_exists(*args, **opts) } + with_retry_loop(**opts) do + dir(:up) { _add_or_replace_index(*args, **opts) } + dir(:down) { _drop_index_if_exists(*args, **opts) } + end end def remove_index_if_exists(*args, **opts) - dir(:up) { _drop_index_if_exists(*args, **opts) } - dir(:down) { _add_or_replace_index(*args, **opts) } + with_retry_loop(**opts) do + dir(:up) { _drop_index_if_exists(*args, **opts) } + dir(:down) { _add_or_replace_index(*args, **opts) } + end end RETRY_EXCEPTIONS = [ @@ -48,7 +52,7 @@ def with_retry_loop(wait_timeout: 5.minutes, **opts) end end - def with_timeouts(statement_timeout: 1.minute, lock_timeout: 5.seconds) + def with_timeouts(statement_timeout: 1.minute, lock_timeout: 5.seconds, **_opts) dir(:both) { set_timeouts!(statement_timeout: statement_timeout, lock_timeout: lock_timeout) } yield ensure @@ -61,24 +65,15 @@ def _add_or_replace_index(table, columns, **opts) index = _lookup_index(table, columns, **opts) if index && !_index_matches?(index, **opts) Delayed.logger.warn("Recreating index #{index.name} (is invalid or does not match desired options)") - _drop_index(table, name: index.name, **opts) + remove_index(table, name: index.name) end - _add_index(table, columns, **opts) if !index || !_index_matches?(index, **opts) + opts = opts.except(:wait_timeout, :statement_timeout, :lock_timeout) + add_index(table, columns, **opts) if !index || !_index_matches?(index, **opts) end def _drop_index_if_exists(table, columns, **opts) index = _lookup_index(table, columns, **opts) - _drop_index(table, name: index.name, **opts) if index - end - - def _add_index(*args, **opts) - index_opts = opts.slice!(:wait_timeout, :statement_timeout, :lock_timeout) - with_retry_loop(**opts) { add_index(*args, **index_opts) } - end - - def _drop_index(table, name:, **opts) - opts.slice!(:wait_timeout, :statement_timeout, :lock_timeout) - with_retry_loop(**opts) { remove_index(table, name: name) } + remove_index(table, name: index.name) if index end def _lookup_index(table, columns, **opts) diff --git a/lib/delayed/version.rb b/lib/delayed/version.rb index f100879..8efc176 100644 --- a/lib/delayed/version.rb +++ b/lib/delayed/version.rb @@ -1,5 +1,5 @@ # frozen_string_literal: true module Delayed - VERSION = '2.0.1' + VERSION = '2.0.2' end diff --git a/spec/delayed/helpers/migration_spec.rb b/spec/delayed/helpers/migration_spec.rb index 4a9af50..226eb77 100644 --- a/spec/delayed/helpers/migration_spec.rb +++ b/spec/delayed/helpers/migration_spec.rb @@ -33,35 +33,50 @@ def direction.down migration.migration_start = Delayed::Job.db_time_now end - describe '#with_retry_loop timeout tracking' do + describe '#upsert_index retry behavior' do it 'raises exception when wait_timeout is exceeded based on @migration_start' do - # Simulate migration that started 6 minutes ago migration.migration_start = Delayed::Job.db_time_now - 6.minutes + allow(migration).to receive(:add_index).and_raise(ActiveRecord::LockWaitTimeout) + allow(migration.connection).to receive(:indexes).and_return([]) + expect { - migration.with_retry_loop(wait_timeout: 5.minutes) do - raise ActiveRecord::LockWaitTimeout - end + migration.upsert_index(:delayed_jobs, :name, wait_timeout: 5.minutes) }.to raise_error(ActiveRecord::LockWaitTimeout) end - it 'continues retrying while within the timeout window' do - call_count = 0 + it 're-checks for invalid index and drops it before retrying after timeout' do + add_index_calls = 0 + remove_index_calls = 0 + lookup_calls = 0 - # First retry is within timeout, second exceeds it - allow(Delayed::Job).to receive(:db_time_now).and_return( - migration.migration_start + 4.minutes, # Within timeout - migration.migration_start + 6.minutes, # Exceeds timeout + invalid_opts = ActiveRecord.version >= Gem::Version.new('7.1.0') ? { valid?: false } : { unique: true } + invalid_index = instance_double( + ActiveRecord::ConnectionAdapters::IndexDefinition, + name: 'test_idx', + columns: ['name'], + **invalid_opts, ) - expect { - migration.with_retry_loop(wait_timeout: 5.minutes) do - call_count += 1 - raise ActiveRecord::LockWaitTimeout - end - }.to raise_error(ActiveRecord::LockWaitTimeout) + allow(migration.connection).to receive(:indexes) do + lookup_calls += 1 + lookup_calls == 2 ? [invalid_index] : [] + end + + allow(migration).to receive(:add_index) do |*_args| + add_index_calls += 1 + raise ActiveRecord::StatementTimeout, 'timeout' if add_index_calls == 1 + end + + allow(migration).to receive(:remove_index) do |*_args| + remove_index_calls += 1 + end + + migration.upsert_index(:delayed_jobs, :name, name: 'test_idx', wait_timeout: 5.minutes) - expect(call_count).to eq(2) + expect(lookup_calls).to eq(3) + expect(remove_index_calls).to eq(1) + expect(add_index_calls).to eq(2) end end end