Skip to content

Commit

Permalink
Refactor inline adapter to enable deferred execution after enqueue to…
Browse files Browse the repository at this point in the history
… allow batch-callbacks to use transaction-based advisory lock
  • Loading branch information
bensheldon committed Jul 17, 2024
1 parent c5bcbfb commit 1aa8e46
Show file tree
Hide file tree
Showing 8 changed files with 205 additions and 27 deletions.
8 changes: 6 additions & 2 deletions app/models/concerns/good_job/advisory_lockable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,9 @@ def with_advisory_lock(column: _advisory_lockable_column, function: advisory_loc
if unlock_session
advisory_unlock_session
else
unlock_function = advisory_unlockable_function(function)
records.each do |record|
record.advisory_unlock(key: record.lockable_column_key(column: column), function: advisory_unlockable_function(function))
record.advisory_unlock(key: record.lockable_column_key(column: column), function: unlock_function) if unlock_function
end
end
end
Expand Down Expand Up @@ -209,7 +210,8 @@ def advisory_lock_key(key, function: advisory_lockable_function)
begin
yield
ensure
advisory_unlock_key(key, function: advisory_unlockable_function(function))
unlock_function = advisory_unlockable_function(function)
advisory_unlock_key(key, function: unlock_function) if unlock_function
end
end

Expand Down Expand Up @@ -284,6 +286,8 @@ def supports_cte_materialization_specifiers?
# @param function [String, Symbol] name of advisory lock or unlock function
# @return [Boolean]
def advisory_unlockable_function(function = advisory_lockable_function)
return nil if function.include? "_xact_"

function.to_s.sub("_lock", "_unlock").sub("_try_", "_")
end

Expand Down
15 changes: 10 additions & 5 deletions app/models/good_job/batch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,17 @@ def enqueue(active_jobs = [], **properties, &block)
active_jobs = add(active_jobs, &block)

Rails.application.executor.wrap do
record.with_advisory_lock(function: "pg_advisory_lock") do
record.update!(enqueued_at: Time.current)

# During inline execution, this could enqueue and execute further jobs
record._continue_discard_or_finish(lock: false)
buffer = GoodJob::Adapter::InlineBuffer.capture do
record.transaction do
record.with_advisory_lock(function: "pg_advisory_xact_lock") do
record.update!(enqueued_at: Time.current)

# During inline execution, this could enqueue and execute further jobs
record._continue_discard_or_finish(lock: false)
end
end
end
buffer.call
end

active_jobs
Expand Down
29 changes: 16 additions & 13 deletions app/models/good_job/batch_record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,21 +54,24 @@ def display_attributes

def _continue_discard_or_finish(execution = nil, lock: true)
execution_discarded = execution && execution.finished_at.present? && execution.error.present?
take_advisory_lock(lock) do
Batch.within_thread(batch_id: nil, batch_callback_id: id) do
reload
if execution_discarded && !discarded_at
update(discarded_at: Time.current)
on_discard.constantize.set(priority: callback_priority, queue: callback_queue_name).perform_later(to_batch, { event: :discard }) if on_discard.present?
end

if enqueued_at && !finished_at && jobs.where(finished_at: nil).count.zero?
update(finished_at: Time.current)
on_success.constantize.set(priority: callback_priority, queue: callback_queue_name).perform_later(to_batch, { event: :success }) if !discarded_at && on_success.present?
on_finish.constantize.set(priority: callback_priority, queue: callback_queue_name).perform_later(to_batch, { event: :finish }) if on_finish.present?
buffer = GoodJob::Adapter::InlineBuffer.capture do
take_advisory_lock(lock) do
Batch.within_thread(batch_id: nil, batch_callback_id: id) do
reload
if execution_discarded && !discarded_at
update(discarded_at: Time.current)
on_discard.constantize.set(priority: callback_priority, queue: callback_queue_name).perform_later(to_batch, { event: :discard }) if on_discard.present?
end

if enqueued_at && !finished_at && jobs.where(finished_at: nil).count.zero?
update(finished_at: Time.current)
on_success.constantize.set(priority: callback_priority, queue: callback_queue_name).perform_later(to_batch, { event: :success }) if !discarded_at && on_success.present?
on_finish.constantize.set(priority: callback_priority, queue: callback_queue_name).perform_later(to_batch, { event: :finish }) if on_finish.present?
end
end
end
end
buffer.call
end

class PropertySerializer
Expand Down Expand Up @@ -102,7 +105,7 @@ def properties=(value)

def take_advisory_lock(value, &block)
if value
with_advisory_lock(function: "pg_advisory_lock", &block)
transaction { with_advisory_lock(function: "pg_advisory_xact_lock", &block) }
else
yield
end
Expand Down
1 change: 1 addition & 0 deletions lib/good_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
require_relative "good_job/engine"

require_relative "good_job/adapter"
require_relative "good_job/adapter/inline_buffer"
require_relative "active_job/queue_adapters/good_job_adapter"
require_relative "good_job/active_job_extensions/batches"
require_relative "good_job/active_job_extensions/concurrency"
Expand Down
26 changes: 19 additions & 7 deletions lib/good_job/adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,24 @@ def enqueue_all(active_jobs)
end

if inline_jobs.any?
@capsule.tracker.register do
until inline_jobs.empty?
inline_job = inline_jobs.shift
perform_inline(inline_job, notify: false)
deferred = InlineBuffer.defer?
InlineBuffer.perform_now_or_defer do
@capsule.tracker.register do
until inline_jobs.empty?
inline_job = inline_jobs.shift
perform_inline(inline_job, notify: deferred ? send_notify?(inline_job) : false)
end
ensure
inline_jobs.each(&:advisory_unlock)
end
end
end

non_inline_jobs = jobs.reject(&:finished_at)
non_inline_jobs = if InlineBuffer.defer?
jobs - inline_jobs
else
jobs.reject(&:finished_at)
end
if non_inline_jobs.any?
job_id_to_active_jobs = active_jobs.index_by(&:job_id)
non_inline_jobs.group_by(&:queue_name).each do |queue_name, jobs_by_queue|
Expand Down Expand Up @@ -143,8 +152,10 @@ def enqueue_at(active_job, timestamp)
scheduled_at: scheduled_at,
create_with_advisory_lock: true
)
@capsule.tracker.register do
perform_inline(job, notify: send_notify?(active_job))
InlineBuffer.perform_now_or_defer do
@capsule.tracker.register do
perform_inline(job, notify: send_notify?(active_job))
end
end
else
job = GoodJob::Job.enqueue(
Expand Down Expand Up @@ -238,6 +249,7 @@ def perform_inline(job, notify: true)
end

Notifier.notify(retried_job.job_state) if notify && retried_job&.scheduled_at && retried_job.scheduled_at > Time.current
result
ensure
job.advisory_unlock
job.run_callbacks(:perform_unlocked)
Expand Down
73 changes: 73 additions & 0 deletions lib/good_job/adapter/inline_buffer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# frozen_string_literal: true

require 'active_support/core_ext/module/attribute_accessors_per_thread'

module GoodJob
class Adapter
# The InlineBuffer is integrated into the Adapter and captures jobs that have been enqueued inline.
# The purpose is allow job records to be persisted, in a locked state, while within a transaction,
# and then execute the jobs after the transaction has been committed to ensure that the jobs
# do not run within a transaction.
#
# @private This is intended for internal GoodJob usage only.
class InlineBuffer
# @!attribute [rw] current_buffer
# @!scope class
# Current buffer of jobs to be enqueued.
# @return [GoodJob::Adapter::InlineBuffer, nil]
thread_mattr_accessor :current_buffer

# This block should be used to wrap the transaction that could enqueue jobs.
# @yield The block that may enqueue jobs.
# @return [Proc] A proc that will execute enqueued jobs after the transaction has been committed.
# @example Wrapping a transaction
# buffer = GoodJob::Adapter::InlineBuffer.capture do
# ActiveRecord::Base.transaction do
# MyJob.perform_later
# end
# end
# buffer.call
def self.capture
if current_buffer
yield
return proc {}
end

begin
self.current_buffer = new
yield
current_buffer.to_proc
ensure
self.current_buffer = nil
end
end

# Used within the adapter to wrap inline job execution
def self.perform_now_or_defer(&block)
if defer?
current_buffer.defer(block)
else
yield
end
end

def self.defer?
current_buffer.present?
end

def initialize
@callables = []
end

def defer(callable)
@callables << callable
end

def to_proc
proc do
@callables.map(&:call)
end
end
end
end
end
2 changes: 2 additions & 0 deletions sorbet/rbi/todo.rbi
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ module ::THREAD_HAS_RUN; end
module ::THREAD_JOBS; end
module ::TestError; end
module ::TestJob; end
module ::SuccessJob; end
module ::ErrorJob; end
module ::WAIT_EVENT; end
module ::TestJob::Error; end
module ::TestJob::ExpectedError; end
Expand Down
78 changes: 78 additions & 0 deletions spec/lib/good_job/adapter/inline_buffer_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# frozen_string_literal: true

require 'rails_helper'

describe GoodJob::Adapter::InlineBuffer do
before do
stub_const 'SuccessJob', (Class.new(ActiveJob::Base) do
def perform
true
end
end)
SuccessJob.queue_adapter = GoodJob::Adapter.new(execution_mode: :inline)

stub_const 'ErrorJob', (Class.new(ActiveJob::Base) do
def perform
raise 'Error'
end
end)
ErrorJob.queue_adapter = GoodJob::Adapter.new(execution_mode: :inline)
end

context "when using enqueue_all" do
it "defers enqueue_all" do
Rails.application.executor.wrap do
buffer = described_class.capture do
SuccessJob.queue_adapter.enqueue_all([SuccessJob.new, SuccessJob.new])
expect(GoodJob::Job.count).to eq 2
expect(GoodJob::Job.all).to all have_attributes(finished_at: nil)
end

buffer.call

expect(GoodJob::Job.all).to all have_attributes(finished_at: be_present)
end
end

it "defers enqueue_all with errors" do
Rails.application.executor.wrap do
buffer = described_class.capture do
ErrorJob.queue_adapter.enqueue_all([ErrorJob.new, SuccessJob.new])
expect(GoodJob::Job.count).to eq 2
expect(GoodJob::Job.all).to all have_attributes(finished_at: nil)
end

expect { buffer.call }.to raise_error(/Error/)
expect(GoodJob::Job.find_by(job_class: "ErrorJob")).to have_attributes(finished_at: be_present, error: be_present, error_event: 'unhandled')
expect(GoodJob::Job.find_by(job_class: "SuccessJob")).to have_attributes(finished_at: nil)
end
end
end

context "when using enqueue" do
it "defers inline enqueued jobs" do
Rails.application.executor.wrap do
buffer = described_class.capture do
SuccessJob.perform_later
expect(GoodJob::Job.count).to eq 1
end
buffer.call

expect(GoodJob::Job.count).to eq 1
expect(GoodJob::Job.first).to have_attributes(finished_at: be_present)
end
end

it "defers inline enqueued jobs with errors" do
Rails.application.executor.wrap do
buffer = described_class.capture do
ErrorJob.perform_later
expect(GoodJob::Job.count).to eq 1
end

expect { buffer.call }.to raise_error(/Error/)
expect(GoodJob::Job.first).to have_attributes(finished_at: be_present, error: be_present, error_event: 'unhandled')
end
end
end
end

0 comments on commit 1aa8e46

Please sign in to comment.