Skip to content

Commit

Permalink
Made Delayed::Backend::ActiveRecord::Job class load lazily
Browse files Browse the repository at this point in the history
In Rails, classes like `ActiveRecord::Base` are designed to be loaded lazily. If a gem eagerly loads classes when required, as this gem did, it can cause issues where configurations written in `config/initializers/*.rb` for ActiveRecord are not applied properly. To avoid this, the `ActiveSupport.on_load(:active_record) do ... end` block is used to delay loading the `ActiveRecord::Base` class and `Delayed::Backend::ActiveRecord::Job` class until after Rails has fully initialized.

In this commit, I wrapped the existing `Delayed::Backend::ActiveRecord::Job` class definition in a block to minimize changes. However, it might be better to separate `Delayed::Backend::ActiveRecord::Configuration` into a dedicated file for easier management.
  • Loading branch information
willnet committed Jan 20, 2025
1 parent e5b4fbf commit 2c07740
Showing 1 changed file with 156 additions and 152 deletions.
308 changes: 156 additions & 152 deletions lib/delayed/backend/active_record.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# frozen_string_literal: true

require "active_record/version"
require "active_support/lazy_load_hooks"

module Delayed
module Backend
module ActiveRecord
Expand Down Expand Up @@ -28,184 +30,186 @@ def self.configure
yield(configuration)
end

# A job object that is persisted to the database.
# Contains the work object as a YAML field.
class Job < ::ActiveRecord::Base
include Delayed::Backend::Base
ActiveSupport.on_load(:active_record) do
# A job object that is persisted to the database.
# Contains the work object as a YAML field.
class Job < ::ActiveRecord::Base
include Delayed::Backend::Base

if ::ActiveRecord::VERSION::MAJOR < 4 || defined?(::ActiveRecord::MassAssignmentSecurity)
attr_accessible :priority, :run_at, :queue, :payload_object,
:failed_at, :locked_at, :locked_by, :handler
end
if ::ActiveRecord::VERSION::MAJOR < 4 || defined?(::ActiveRecord::MassAssignmentSecurity)
attr_accessible :priority, :run_at, :queue, :payload_object,
:failed_at, :locked_at, :locked_by, :handler
end

scope :by_priority, lambda { order("priority ASC, run_at ASC") }
scope :min_priority, lambda { where("priority >= ?", Worker.min_priority) if Worker.min_priority }
scope :max_priority, lambda { where("priority <= ?", Worker.max_priority) if Worker.max_priority }
scope :for_queues, lambda { |queues = Worker.queues| where(queue: queues) if Array(queues).any? }
scope :by_priority, lambda { order("priority ASC, run_at ASC") }
scope :min_priority, lambda { where("priority >= ?", Worker.min_priority) if Worker.min_priority }
scope :max_priority, lambda { where("priority <= ?", Worker.max_priority) if Worker.max_priority }
scope :for_queues, lambda { |queues = Worker.queues| where(queue: queues) if Array(queues).any? }

before_save :set_default_run_at
before_save :set_default_run_at

def self.set_delayed_job_table_name
delayed_job_table_name = "#{::ActiveRecord::Base.table_name_prefix}delayed_jobs"
self.table_name = delayed_job_table_name
end
def self.set_delayed_job_table_name
delayed_job_table_name = "#{::ActiveRecord::Base.table_name_prefix}delayed_jobs"
self.table_name = delayed_job_table_name
end

set_delayed_job_table_name
set_delayed_job_table_name

def self.ready_to_run(worker_name, max_run_time)
where(
"((run_at <= ? AND (locked_at IS NULL OR locked_at < ?)) OR locked_by = ?) AND failed_at IS NULL",
db_time_now,
db_time_now - max_run_time,
worker_name
)
end
def self.ready_to_run(worker_name, max_run_time)
where(
"((run_at <= ? AND (locked_at IS NULL OR locked_at < ?)) OR locked_by = ?) AND failed_at IS NULL",
db_time_now,
db_time_now - max_run_time,
worker_name
)
end

def self.before_fork
if Gem::Version.new("7.1.0") <= Gem::Version.new(::ActiveRecord::VERSION::STRING)
::ActiveRecord::Base.connection_handler.clear_all_connections!(:all)
else
::ActiveRecord::Base.connection_handler.clear_all_connections!
def self.before_fork
if Gem::Version.new("7.1.0") <= Gem::Version.new(::ActiveRecord::VERSION::STRING)
::ActiveRecord::Base.connection_handler.clear_all_connections!(:all)
else
::ActiveRecord::Base.connection_handler.clear_all_connections!
end
end
end

def self.after_fork
::ActiveRecord::Base.establish_connection
end
def self.after_fork
::ActiveRecord::Base.establish_connection
end

# When a worker is exiting, make sure we don't have any locked jobs.
def self.clear_locks!(worker_name)
where(locked_by: worker_name).update_all(locked_by: nil, locked_at: nil)
end
# When a worker is exiting, make sure we don't have any locked jobs.
def self.clear_locks!(worker_name)
where(locked_by: worker_name).update_all(locked_by: nil, locked_at: nil)
end

def self.reserve(worker, max_run_time = Worker.max_run_time)
ready_scope =
ready_to_run(worker.name, max_run_time)
.min_priority
.max_priority
.for_queues
.by_priority
def self.reserve(worker, max_run_time = Worker.max_run_time)
ready_scope =
ready_to_run(worker.name, max_run_time)
.min_priority
.max_priority
.for_queues
.by_priority

reserve_with_scope(ready_scope, worker, db_time_now)
end
reserve_with_scope(ready_scope, worker, db_time_now)
end

def self.reserve_with_scope(ready_scope, worker, now)
case Delayed::Backend::ActiveRecord.configuration.reserve_sql_strategy
# Optimizations for faster lookups on some common databases
when :optimized_sql
reserve_with_scope_using_optimized_sql(ready_scope, worker, now)
# Slower but in some cases more unproblematic strategy to lookup records
# See https://github.com/collectiveidea/delayed_job_active_record/pull/89 for more details.
when :default_sql
reserve_with_scope_using_default_sql(ready_scope, worker, now)
def self.reserve_with_scope(ready_scope, worker, now)
case Delayed::Backend::ActiveRecord.configuration.reserve_sql_strategy
# Optimizations for faster lookups on some common databases
when :optimized_sql
reserve_with_scope_using_optimized_sql(ready_scope, worker, now)
# Slower but in some cases more unproblematic strategy to lookup records
# See https://github.com/collectiveidea/delayed_job_active_record/pull/89 for more details.
when :default_sql
reserve_with_scope_using_default_sql(ready_scope, worker, now)
end
end
end

def self.reserve_with_scope_using_optimized_sql(ready_scope, worker, now)
case connection.adapter_name
when "PostgreSQL", "PostGIS"
reserve_with_scope_using_optimized_postgres(ready_scope, worker, now)
when "MySQL", "Mysql2", "Trilogy"
reserve_with_scope_using_optimized_mysql(ready_scope, worker, now)
when "MSSQL", "Teradata"
reserve_with_scope_using_optimized_mssql(ready_scope, worker, now)
# Fallback for unknown / other DBMS
else
reserve_with_scope_using_default_sql(ready_scope, worker, now)
def self.reserve_with_scope_using_optimized_sql(ready_scope, worker, now)
case connection.adapter_name
when "PostgreSQL", "PostGIS"
reserve_with_scope_using_optimized_postgres(ready_scope, worker, now)
when "MySQL", "Mysql2", "Trilogy"
reserve_with_scope_using_optimized_mysql(ready_scope, worker, now)
when "MSSQL", "Teradata"
reserve_with_scope_using_optimized_mssql(ready_scope, worker, now)
# Fallback for unknown / other DBMS
else
reserve_with_scope_using_default_sql(ready_scope, worker, now)
end
end
end

def self.reserve_with_scope_using_default_sql(ready_scope, worker, now)
# This is our old fashion, tried and true, but possibly slower lookup
# Instead of reading the entire job record for our detect loop, we select only the id,
# and only read the full job record after we've successfully locked the job.
# This can have a noticeable impact on large read_ahead configurations and large payload jobs.
ready_scope.limit(worker.read_ahead).select(:id).detect do |job|
count = ready_scope.where(id: job.id).update_all(locked_at: now, locked_by: worker.name)
count == 1 && job.reload
def self.reserve_with_scope_using_default_sql(ready_scope, worker, now)
# This is our old fashion, tried and true, but possibly slower lookup
# Instead of reading the entire job record for our detect loop, we select only the id,
# and only read the full job record after we've successfully locked the job.
# This can have a noticeable impact on large read_ahead configurations and large payload jobs.
ready_scope.limit(worker.read_ahead).select(:id).detect do |job|
count = ready_scope.where(id: job.id).update_all(locked_at: now, locked_by: worker.name)
count == 1 && job.reload
end
end
end

def self.reserve_with_scope_using_optimized_postgres(ready_scope, worker, now)
# Custom SQL required for PostgreSQL because postgres does not support UPDATE...LIMIT
# This locks the single record 'FOR UPDATE' in the subquery
# http://www.postgresql.org/docs/9.0/static/sql-select.html#SQL-FOR-UPDATE-SHARE
# Note: active_record would attempt to generate UPDATE...LIMIT like
# SQL for Postgres if we use a .limit() filter, but it would not
# use 'FOR UPDATE' and we would have many locking conflicts
subquery = ready_scope.limit(1).lock(true).select("id").to_sql

# On PostgreSQL >= 9.5 we leverage SKIP LOCK to avoid multiple workers blocking each other
# when attempting to get the next available job
# https://www.postgresql.org/docs/9.5/sql-select.html#SQL-FOR-UPDATE-SHARE
if connection.send(:postgresql_version) >= 9_05_00 # rubocop:disable Style/NumericLiterals
subquery += " SKIP LOCKED"
end

quoted_name = connection.quote_table_name(table_name)
find_by_sql(
[
"UPDATE #{quoted_name} SET locked_at = ?, locked_by = ? WHERE id IN (#{subquery}) RETURNING *",
now,
worker.name
]
).first
end
def self.reserve_with_scope_using_optimized_postgres(ready_scope, worker, now)
# Custom SQL required for PostgreSQL because postgres does not support UPDATE...LIMIT
# This locks the single record 'FOR UPDATE' in the subquery
# http://www.postgresql.org/docs/9.0/static/sql-select.html#SQL-FOR-UPDATE-SHARE
# Note: active_record would attempt to generate UPDATE...LIMIT like
# SQL for Postgres if we use a .limit() filter, but it would not
# use 'FOR UPDATE' and we would have many locking conflicts
subquery = ready_scope.limit(1).lock(true).select("id").to_sql

# On PostgreSQL >= 9.5 we leverage SKIP LOCK to avoid multiple workers blocking each other
# when attempting to get the next available job
# https://www.postgresql.org/docs/9.5/sql-select.html#SQL-FOR-UPDATE-SHARE
if connection.send(:postgresql_version) >= 9_05_00 # rubocop:disable Style/NumericLiterals
subquery += " SKIP LOCKED"
end

quoted_name = connection.quote_table_name(table_name)
find_by_sql(
[
"UPDATE #{quoted_name} SET locked_at = ?, locked_by = ? WHERE id IN (#{subquery}) RETURNING *",
now,
worker.name
]
).first
end

def self.reserve_with_scope_using_optimized_mysql(ready_scope, worker, now)
# Removing the millisecond precision from now(time object)
# MySQL 5.6.4 onwards millisecond precision exists, but the
# datetime object created doesn't have precision, so discarded
# while updating. But during the where clause, for mysql(>=5.6.4),
# it queries with precision as well. So removing the precision
now = now.change(usec: 0)
# This works on MySQL and possibly some other DBs that support
# UPDATE...LIMIT. It uses separate queries to lock and return the job
count = ready_scope.limit(1).update_all(locked_at: now, locked_by: worker.name)
return nil if count == 0

where(locked_at: now, locked_by: worker.name, failed_at: nil).first
end
def self.reserve_with_scope_using_optimized_mysql(ready_scope, worker, now)
# Removing the millisecond precision from now(time object)
# MySQL 5.6.4 onwards millisecond precision exists, but the
# datetime object created doesn't have precision, so discarded
# while updating. But during the where clause, for mysql(>=5.6.4),
# it queries with precision as well. So removing the precision
now = now.change(usec: 0)
# This works on MySQL and possibly some other DBs that support
# UPDATE...LIMIT. It uses separate queries to lock and return the job
count = ready_scope.limit(1).update_all(locked_at: now, locked_by: worker.name)
return nil if count == 0

where(locked_at: now, locked_by: worker.name, failed_at: nil).first
end

def self.reserve_with_scope_using_optimized_mssql(ready_scope, worker, now)
# The MSSQL driver doesn't generate a limit clause when update_all
# is called directly
subsubquery_sql = ready_scope.limit(1).to_sql
# select("id") doesn't generate a subquery, so force a subquery
subquery_sql = "SELECT id FROM (#{subsubquery_sql}) AS x"
quoted_table_name = connection.quote_table_name(table_name)
sql = "UPDATE #{quoted_table_name} SET locked_at = ?, locked_by = ? WHERE id IN (#{subquery_sql})"
count = connection.execute(sanitize_sql([sql, now, worker.name]))
return nil if count == 0

# MSSQL JDBC doesn't support OUTPUT INSERTED.* for returning a result set, so query locked row
where(locked_at: now, locked_by: worker.name, failed_at: nil).first
end
def self.reserve_with_scope_using_optimized_mssql(ready_scope, worker, now)
# The MSSQL driver doesn't generate a limit clause when update_all
# is called directly
subsubquery_sql = ready_scope.limit(1).to_sql
# select("id") doesn't generate a subquery, so force a subquery
subquery_sql = "SELECT id FROM (#{subsubquery_sql}) AS x"
quoted_table_name = connection.quote_table_name(table_name)
sql = "UPDATE #{quoted_table_name} SET locked_at = ?, locked_by = ? WHERE id IN (#{subquery_sql})"
count = connection.execute(sanitize_sql([sql, now, worker.name]))
return nil if count == 0

# MSSQL JDBC doesn't support OUTPUT INSERTED.* for returning a result set, so query locked row
where(locked_at: now, locked_by: worker.name, failed_at: nil).first
end

# Get the current time (GMT or local depending on DB)
# Note: This does not ping the DB to get the time, so all your clients
# must have synchronized clocks.
def self.db_time_now
if Time.zone
Time.zone.now
elsif default_timezone == :utc
Time.now.utc
else
Time.now # rubocop:disable Rails/TimeZone
# Get the current time (GMT or local depending on DB)
# Note: This does not ping the DB to get the time, so all your clients
# must have synchronized clocks.
def self.db_time_now
if Time.zone
Time.zone.now
elsif default_timezone == :utc
Time.now.utc
else
Time.now # rubocop:disable Rails/TimeZone
end
end
end

def self.default_timezone
if ::ActiveRecord.respond_to?(:default_timezone)
::ActiveRecord.default_timezone
else
::ActiveRecord::Base.default_timezone
def self.default_timezone
if ::ActiveRecord.respond_to?(:default_timezone)
::ActiveRecord.default_timezone
else
::ActiveRecord::Base.default_timezone
end
end
end

def reload(*args)
reset
super
def reload(*args)
reset
super
end
end
end
end
Expand Down

0 comments on commit 2c07740

Please sign in to comment.