diff --git a/app/models/concerns/good_job/advisory_lockable.rb b/app/models/concerns/good_job/advisory_lockable.rb index 4a6ea2f6..5b61a951 100644 --- a/app/models/concerns/good_job/advisory_lockable.rb +++ b/app/models/concerns/good_job/advisory_lockable.rb @@ -174,8 +174,9 @@ def with_advisory_lock(column: _advisory_lockable_column, function: advisory_loc if unlock_session advisory_unlock_session else + unlock_function = advisory_unlockable_function(function) records.each do |record| - record.advisory_unlock(key: record.lockable_column_key(column: column), function: advisory_unlockable_function(function)) + record.advisory_unlock(key: record.lockable_column_key(column: column), function: unlock_function) if unlock_function end end end @@ -209,7 +210,8 @@ def advisory_lock_key(key, function: advisory_lockable_function) begin yield ensure - advisory_unlock_key(key, function: advisory_unlockable_function(function)) + unlock_function = advisory_unlockable_function(function) + advisory_unlock_key(key, function: unlock_function) if unlock_function end end @@ -284,6 +286,8 @@ def supports_cte_materialization_specifiers? # @param function [String, Symbol] name of advisory lock or unlock function # @return [Boolean] def advisory_unlockable_function(function = advisory_lockable_function) + return nil if function.include? "_xact_" + function.to_s.sub("_lock", "_unlock").sub("_try_", "_") end diff --git a/app/models/good_job/batch.rb b/app/models/good_job/batch.rb index 17a635c4..f28b807a 100644 --- a/app/models/good_job/batch.rb +++ b/app/models/good_job/batch.rb @@ -102,12 +102,17 @@ def enqueue(active_jobs = [], **properties, &block) active_jobs = add(active_jobs, &block) Rails.application.executor.wrap do - record.with_advisory_lock(function: "pg_advisory_lock") do - record.update!(enqueued_at: Time.current) - - # During inline execution, this could enqueue and execute further jobs - record._continue_discard_or_finish(lock: false) + buffer = GoodJob::Adapter::InlineBuffer.capture do + record.transaction do + record.with_advisory_lock(function: "pg_advisory_xact_lock") do + record.update!(enqueued_at: Time.current) + + # During inline execution, this could enqueue and execute further jobs + record._continue_discard_or_finish(lock: false) + end + end end + buffer.call end active_jobs diff --git a/app/models/good_job/batch_record.rb b/app/models/good_job/batch_record.rb index 89839500..94d93eae 100644 --- a/app/models/good_job/batch_record.rb +++ b/app/models/good_job/batch_record.rb @@ -54,21 +54,24 @@ def display_attributes def _continue_discard_or_finish(execution = nil, lock: true) execution_discarded = execution && execution.finished_at.present? && execution.error.present? - take_advisory_lock(lock) do - Batch.within_thread(batch_id: nil, batch_callback_id: id) do - reload - if execution_discarded && !discarded_at - update(discarded_at: Time.current) - on_discard.constantize.set(priority: callback_priority, queue: callback_queue_name).perform_later(to_batch, { event: :discard }) if on_discard.present? - end - - if enqueued_at && !finished_at && jobs.where(finished_at: nil).count.zero? - update(finished_at: Time.current) - on_success.constantize.set(priority: callback_priority, queue: callback_queue_name).perform_later(to_batch, { event: :success }) if !discarded_at && on_success.present? - on_finish.constantize.set(priority: callback_priority, queue: callback_queue_name).perform_later(to_batch, { event: :finish }) if on_finish.present? + buffer = GoodJob::Adapter::InlineBuffer.capture do + take_advisory_lock(lock) do + Batch.within_thread(batch_id: nil, batch_callback_id: id) do + reload + if execution_discarded && !discarded_at + update(discarded_at: Time.current) + on_discard.constantize.set(priority: callback_priority, queue: callback_queue_name).perform_later(to_batch, { event: :discard }) if on_discard.present? + end + + if enqueued_at && !finished_at && jobs.where(finished_at: nil).count.zero? + update(finished_at: Time.current) + on_success.constantize.set(priority: callback_priority, queue: callback_queue_name).perform_later(to_batch, { event: :success }) if !discarded_at && on_success.present? + on_finish.constantize.set(priority: callback_priority, queue: callback_queue_name).perform_later(to_batch, { event: :finish }) if on_finish.present? + end end end end + buffer.call end class PropertySerializer @@ -102,7 +105,7 @@ def properties=(value) def take_advisory_lock(value, &block) if value - with_advisory_lock(function: "pg_advisory_lock", &block) + transaction { with_advisory_lock(function: "pg_advisory_xact_lock", &block) } else yield end diff --git a/lib/good_job.rb b/lib/good_job.rb index 7558cd06..72728d92 100644 --- a/lib/good_job.rb +++ b/lib/good_job.rb @@ -7,6 +7,7 @@ require_relative "good_job/engine" require_relative "good_job/adapter" +require_relative "good_job/adapter/inline_buffer" require_relative "active_job/queue_adapters/good_job_adapter" require_relative "good_job/active_job_extensions/batches" require_relative "good_job/active_job_extensions/concurrency" diff --git a/lib/good_job/adapter.rb b/lib/good_job/adapter.rb index 4b3aff44..d8204827 100644 --- a/lib/good_job/adapter.rb +++ b/lib/good_job/adapter.rb @@ -87,15 +87,24 @@ def enqueue_all(active_jobs) end if inline_jobs.any? - @capsule.tracker.register do - until inline_jobs.empty? - inline_job = inline_jobs.shift - perform_inline(inline_job, notify: false) + deferred = InlineBuffer.defer? + InlineBuffer.perform_now_or_defer do + @capsule.tracker.register do + until inline_jobs.empty? + inline_job = inline_jobs.shift + perform_inline(inline_job, notify: deferred ? send_notify?(inline_job) : false) + end + ensure + inline_jobs.each(&:advisory_unlock) end end end - non_inline_jobs = jobs.reject(&:finished_at) + non_inline_jobs = if InlineBuffer.defer? + jobs - inline_jobs + else + jobs.reject(&:finished_at) + end if non_inline_jobs.any? job_id_to_active_jobs = active_jobs.index_by(&:job_id) non_inline_jobs.group_by(&:queue_name).each do |queue_name, jobs_by_queue| @@ -143,8 +152,10 @@ def enqueue_at(active_job, timestamp) scheduled_at: scheduled_at, create_with_advisory_lock: true ) - @capsule.tracker.register do - perform_inline(job, notify: send_notify?(active_job)) + InlineBuffer.perform_now_or_defer do + @capsule.tracker.register do + perform_inline(job, notify: send_notify?(active_job)) + end end else job = GoodJob::Job.enqueue( @@ -238,6 +249,7 @@ def perform_inline(job, notify: true) end Notifier.notify(retried_job.job_state) if notify && retried_job&.scheduled_at && retried_job.scheduled_at > Time.current + result ensure job.advisory_unlock job.run_callbacks(:perform_unlocked) diff --git a/lib/good_job/adapter/inline_buffer.rb b/lib/good_job/adapter/inline_buffer.rb new file mode 100644 index 00000000..273b7cc1 --- /dev/null +++ b/lib/good_job/adapter/inline_buffer.rb @@ -0,0 +1,73 @@ +# frozen_string_literal: true + +require 'active_support/core_ext/module/attribute_accessors_per_thread' + +module GoodJob + class Adapter + # The InlineBuffer is integrated into the Adapter and captures jobs that have been enqueued inline. + # The purpose is allow job records to be persisted, in a locked state, while within a transaction, + # and then execute the jobs after the transaction has been committed to ensure that the jobs + # do not run within a transaction. + # + # @private This is intended for internal GoodJob usage only. + class InlineBuffer + # @!attribute [rw] current_buffer + # @!scope class + # Current buffer of jobs to be enqueued. + # @return [GoodJob::Adapter::InlineBuffer, nil] + thread_mattr_accessor :current_buffer + + # This block should be used to wrap the transaction that could enqueue jobs. + # @yield The block that may enqueue jobs. + # @return [Proc] A proc that will execute enqueued jobs after the transaction has been committed. + # @example Wrapping a transaction + # buffer = GoodJob::Adapter::InlineBuffer.capture do + # ActiveRecord::Base.transaction do + # MyJob.perform_later + # end + # end + # buffer.call + def self.capture + if current_buffer + yield + return proc {} + end + + begin + self.current_buffer = new + yield + current_buffer.to_proc + ensure + self.current_buffer = nil + end + end + + # Used within the adapter to wrap inline job execution + def self.perform_now_or_defer(&block) + if defer? + current_buffer.defer(block) + else + yield + end + end + + def self.defer? + current_buffer.present? + end + + def initialize + @callables = [] + end + + def defer(callable) + @callables << callable + end + + def to_proc + proc do + @callables.map(&:call) + end + end + end + end +end diff --git a/sorbet/rbi/todo.rbi b/sorbet/rbi/todo.rbi index d4c1fee9..02047d8a 100644 --- a/sorbet/rbi/todo.rbi +++ b/sorbet/rbi/todo.rbi @@ -33,6 +33,8 @@ module ::THREAD_HAS_RUN; end module ::THREAD_JOBS; end module ::TestError; end module ::TestJob; end +module ::SuccessJob; end +module ::ErrorJob; end module ::WAIT_EVENT; end module ::TestJob::Error; end module ::TestJob::ExpectedError; end diff --git a/spec/lib/good_job/adapter/inline_buffer_spec.rb b/spec/lib/good_job/adapter/inline_buffer_spec.rb new file mode 100644 index 00000000..13bc1255 --- /dev/null +++ b/spec/lib/good_job/adapter/inline_buffer_spec.rb @@ -0,0 +1,78 @@ +# frozen_string_literal: true + +require 'rails_helper' + +describe GoodJob::Adapter::InlineBuffer do + before do + stub_const 'SuccessJob', (Class.new(ActiveJob::Base) do + def perform + true + end + end) + SuccessJob.queue_adapter = GoodJob::Adapter.new(execution_mode: :inline) + + stub_const 'ErrorJob', (Class.new(ActiveJob::Base) do + def perform + raise 'Error' + end + end) + ErrorJob.queue_adapter = GoodJob::Adapter.new(execution_mode: :inline) + end + + context "when using enqueue_all" do + it "defers enqueue_all" do + Rails.application.executor.wrap do + buffer = described_class.capture do + SuccessJob.queue_adapter.enqueue_all([SuccessJob.new, SuccessJob.new]) + expect(GoodJob::Job.count).to eq 2 + expect(GoodJob::Job.all).to all have_attributes(finished_at: nil) + end + + buffer.call + + expect(GoodJob::Job.all).to all have_attributes(finished_at: be_present) + end + end + + it "defers enqueue_all with errors" do + Rails.application.executor.wrap do + buffer = described_class.capture do + ErrorJob.queue_adapter.enqueue_all([ErrorJob.new, SuccessJob.new]) + expect(GoodJob::Job.count).to eq 2 + expect(GoodJob::Job.all).to all have_attributes(finished_at: nil) + end + + expect { buffer.call }.to raise_error(/Error/) + expect(GoodJob::Job.find_by(job_class: "ErrorJob")).to have_attributes(finished_at: be_present, error: be_present, error_event: 'unhandled') + expect(GoodJob::Job.find_by(job_class: "SuccessJob")).to have_attributes(finished_at: nil) + end + end + end + + context "when using enqueue" do + it "defers inline enqueued jobs" do + Rails.application.executor.wrap do + buffer = described_class.capture do + SuccessJob.perform_later + expect(GoodJob::Job.count).to eq 1 + end + buffer.call + + expect(GoodJob::Job.count).to eq 1 + expect(GoodJob::Job.first).to have_attributes(finished_at: be_present) + end + end + + it "defers inline enqueued jobs with errors" do + Rails.application.executor.wrap do + buffer = described_class.capture do + ErrorJob.perform_later + expect(GoodJob::Job.count).to eq 1 + end + + expect { buffer.call }.to raise_error(/Error/) + expect(GoodJob::Job.first).to have_attributes(finished_at: be_present, error: be_present, error_event: 'unhandled') + end + end + end +end