diff --git a/lib/delayed/backend/active_record.rb b/lib/delayed/backend/active_record.rb index 6c396dbf..3817c34c 100644 --- a/lib/delayed/backend/active_record.rb +++ b/lib/delayed/backend/active_record.rb @@ -1,6 +1,8 @@ # frozen_string_literal: true require "active_record/version" +require "active_support/lazy_load_hooks" + module Delayed module Backend module ActiveRecord @@ -28,184 +30,186 @@ def self.configure yield(configuration) end - # A job object that is persisted to the database. - # Contains the work object as a YAML field. - class Job < ::ActiveRecord::Base - include Delayed::Backend::Base + ActiveSupport.on_load(:active_record) do + # A job object that is persisted to the database. + # Contains the work object as a YAML field. + class Job < ::ActiveRecord::Base + include Delayed::Backend::Base - if ::ActiveRecord::VERSION::MAJOR < 4 || defined?(::ActiveRecord::MassAssignmentSecurity) - attr_accessible :priority, :run_at, :queue, :payload_object, - :failed_at, :locked_at, :locked_by, :handler - end + if ::ActiveRecord::VERSION::MAJOR < 4 || defined?(::ActiveRecord::MassAssignmentSecurity) + attr_accessible :priority, :run_at, :queue, :payload_object, + :failed_at, :locked_at, :locked_by, :handler + end - scope :by_priority, lambda { order("priority ASC, run_at ASC") } - scope :min_priority, lambda { where("priority >= ?", Worker.min_priority) if Worker.min_priority } - scope :max_priority, lambda { where("priority <= ?", Worker.max_priority) if Worker.max_priority } - scope :for_queues, lambda { |queues = Worker.queues| where(queue: queues) if Array(queues).any? } + scope :by_priority, lambda { order("priority ASC, run_at ASC") } + scope :min_priority, lambda { where("priority >= ?", Worker.min_priority) if Worker.min_priority } + scope :max_priority, lambda { where("priority <= ?", Worker.max_priority) if Worker.max_priority } + scope :for_queues, lambda { |queues = Worker.queues| where(queue: queues) if Array(queues).any? } - before_save :set_default_run_at + before_save :set_default_run_at - def self.set_delayed_job_table_name - delayed_job_table_name = "#{::ActiveRecord::Base.table_name_prefix}delayed_jobs" - self.table_name = delayed_job_table_name - end + def self.set_delayed_job_table_name + delayed_job_table_name = "#{::ActiveRecord::Base.table_name_prefix}delayed_jobs" + self.table_name = delayed_job_table_name + end - set_delayed_job_table_name + set_delayed_job_table_name - def self.ready_to_run(worker_name, max_run_time) - where( - "((run_at <= ? AND (locked_at IS NULL OR locked_at < ?)) OR locked_by = ?) AND failed_at IS NULL", - db_time_now, - db_time_now - max_run_time, - worker_name - ) - end + def self.ready_to_run(worker_name, max_run_time) + where( + "((run_at <= ? AND (locked_at IS NULL OR locked_at < ?)) OR locked_by = ?) AND failed_at IS NULL", + db_time_now, + db_time_now - max_run_time, + worker_name + ) + end - def self.before_fork - if Gem::Version.new("7.1.0") <= Gem::Version.new(::ActiveRecord::VERSION::STRING) - ::ActiveRecord::Base.connection_handler.clear_all_connections!(:all) - else - ::ActiveRecord::Base.connection_handler.clear_all_connections! + def self.before_fork + if Gem::Version.new("7.1.0") <= Gem::Version.new(::ActiveRecord::VERSION::STRING) + ::ActiveRecord::Base.connection_handler.clear_all_connections!(:all) + else + ::ActiveRecord::Base.connection_handler.clear_all_connections! + end end - end - def self.after_fork - ::ActiveRecord::Base.establish_connection - end + def self.after_fork + ::ActiveRecord::Base.establish_connection + end - # When a worker is exiting, make sure we don't have any locked jobs. - def self.clear_locks!(worker_name) - where(locked_by: worker_name).update_all(locked_by: nil, locked_at: nil) - end + # When a worker is exiting, make sure we don't have any locked jobs. + def self.clear_locks!(worker_name) + where(locked_by: worker_name).update_all(locked_by: nil, locked_at: nil) + end - def self.reserve(worker, max_run_time = Worker.max_run_time) - ready_scope = - ready_to_run(worker.name, max_run_time) - .min_priority - .max_priority - .for_queues - .by_priority + def self.reserve(worker, max_run_time = Worker.max_run_time) + ready_scope = + ready_to_run(worker.name, max_run_time) + .min_priority + .max_priority + .for_queues + .by_priority - reserve_with_scope(ready_scope, worker, db_time_now) - end + reserve_with_scope(ready_scope, worker, db_time_now) + end - def self.reserve_with_scope(ready_scope, worker, now) - case Delayed::Backend::ActiveRecord.configuration.reserve_sql_strategy - # Optimizations for faster lookups on some common databases - when :optimized_sql - reserve_with_scope_using_optimized_sql(ready_scope, worker, now) - # Slower but in some cases more unproblematic strategy to lookup records - # See https://github.com/collectiveidea/delayed_job_active_record/pull/89 for more details. - when :default_sql - reserve_with_scope_using_default_sql(ready_scope, worker, now) + def self.reserve_with_scope(ready_scope, worker, now) + case Delayed::Backend::ActiveRecord.configuration.reserve_sql_strategy + # Optimizations for faster lookups on some common databases + when :optimized_sql + reserve_with_scope_using_optimized_sql(ready_scope, worker, now) + # Slower but in some cases more unproblematic strategy to lookup records + # See https://github.com/collectiveidea/delayed_job_active_record/pull/89 for more details. + when :default_sql + reserve_with_scope_using_default_sql(ready_scope, worker, now) + end end - end - def self.reserve_with_scope_using_optimized_sql(ready_scope, worker, now) - case connection.adapter_name - when "PostgreSQL", "PostGIS" - reserve_with_scope_using_optimized_postgres(ready_scope, worker, now) - when "MySQL", "Mysql2", "Trilogy" - reserve_with_scope_using_optimized_mysql(ready_scope, worker, now) - when "MSSQL", "Teradata" - reserve_with_scope_using_optimized_mssql(ready_scope, worker, now) - # Fallback for unknown / other DBMS - else - reserve_with_scope_using_default_sql(ready_scope, worker, now) + def self.reserve_with_scope_using_optimized_sql(ready_scope, worker, now) + case connection.adapter_name + when "PostgreSQL", "PostGIS" + reserve_with_scope_using_optimized_postgres(ready_scope, worker, now) + when "MySQL", "Mysql2", "Trilogy" + reserve_with_scope_using_optimized_mysql(ready_scope, worker, now) + when "MSSQL", "Teradata" + reserve_with_scope_using_optimized_mssql(ready_scope, worker, now) + # Fallback for unknown / other DBMS + else + reserve_with_scope_using_default_sql(ready_scope, worker, now) + end end - end - def self.reserve_with_scope_using_default_sql(ready_scope, worker, now) - # This is our old fashion, tried and true, but possibly slower lookup - # Instead of reading the entire job record for our detect loop, we select only the id, - # and only read the full job record after we've successfully locked the job. - # This can have a noticeable impact on large read_ahead configurations and large payload jobs. - ready_scope.limit(worker.read_ahead).select(:id).detect do |job| - count = ready_scope.where(id: job.id).update_all(locked_at: now, locked_by: worker.name) - count == 1 && job.reload + def self.reserve_with_scope_using_default_sql(ready_scope, worker, now) + # This is our old fashion, tried and true, but possibly slower lookup + # Instead of reading the entire job record for our detect loop, we select only the id, + # and only read the full job record after we've successfully locked the job. + # This can have a noticeable impact on large read_ahead configurations and large payload jobs. + ready_scope.limit(worker.read_ahead).select(:id).detect do |job| + count = ready_scope.where(id: job.id).update_all(locked_at: now, locked_by: worker.name) + count == 1 && job.reload + end end - end - def self.reserve_with_scope_using_optimized_postgres(ready_scope, worker, now) - # Custom SQL required for PostgreSQL because postgres does not support UPDATE...LIMIT - # This locks the single record 'FOR UPDATE' in the subquery - # http://www.postgresql.org/docs/9.0/static/sql-select.html#SQL-FOR-UPDATE-SHARE - # Note: active_record would attempt to generate UPDATE...LIMIT like - # SQL for Postgres if we use a .limit() filter, but it would not - # use 'FOR UPDATE' and we would have many locking conflicts - subquery = ready_scope.limit(1).lock(true).select("id").to_sql - - # On PostgreSQL >= 9.5 we leverage SKIP LOCK to avoid multiple workers blocking each other - # when attempting to get the next available job - # https://www.postgresql.org/docs/9.5/sql-select.html#SQL-FOR-UPDATE-SHARE - if connection.send(:postgresql_version) >= 9_05_00 # rubocop:disable Style/NumericLiterals - subquery += " SKIP LOCKED" - end - - quoted_name = connection.quote_table_name(table_name) - find_by_sql( - [ - "UPDATE #{quoted_name} SET locked_at = ?, locked_by = ? WHERE id IN (#{subquery}) RETURNING *", - now, - worker.name - ] - ).first - end + def self.reserve_with_scope_using_optimized_postgres(ready_scope, worker, now) + # Custom SQL required for PostgreSQL because postgres does not support UPDATE...LIMIT + # This locks the single record 'FOR UPDATE' in the subquery + # http://www.postgresql.org/docs/9.0/static/sql-select.html#SQL-FOR-UPDATE-SHARE + # Note: active_record would attempt to generate UPDATE...LIMIT like + # SQL for Postgres if we use a .limit() filter, but it would not + # use 'FOR UPDATE' and we would have many locking conflicts + subquery = ready_scope.limit(1).lock(true).select("id").to_sql + + # On PostgreSQL >= 9.5 we leverage SKIP LOCK to avoid multiple workers blocking each other + # when attempting to get the next available job + # https://www.postgresql.org/docs/9.5/sql-select.html#SQL-FOR-UPDATE-SHARE + if connection.send(:postgresql_version) >= 9_05_00 # rubocop:disable Style/NumericLiterals + subquery += " SKIP LOCKED" + end + + quoted_name = connection.quote_table_name(table_name) + find_by_sql( + [ + "UPDATE #{quoted_name} SET locked_at = ?, locked_by = ? WHERE id IN (#{subquery}) RETURNING *", + now, + worker.name + ] + ).first + end - def self.reserve_with_scope_using_optimized_mysql(ready_scope, worker, now) - # Removing the millisecond precision from now(time object) - # MySQL 5.6.4 onwards millisecond precision exists, but the - # datetime object created doesn't have precision, so discarded - # while updating. But during the where clause, for mysql(>=5.6.4), - # it queries with precision as well. So removing the precision - now = now.change(usec: 0) - # This works on MySQL and possibly some other DBs that support - # UPDATE...LIMIT. It uses separate queries to lock and return the job - count = ready_scope.limit(1).update_all(locked_at: now, locked_by: worker.name) - return nil if count == 0 - - where(locked_at: now, locked_by: worker.name, failed_at: nil).first - end + def self.reserve_with_scope_using_optimized_mysql(ready_scope, worker, now) + # Removing the millisecond precision from now(time object) + # MySQL 5.6.4 onwards millisecond precision exists, but the + # datetime object created doesn't have precision, so discarded + # while updating. But during the where clause, for mysql(>=5.6.4), + # it queries with precision as well. So removing the precision + now = now.change(usec: 0) + # This works on MySQL and possibly some other DBs that support + # UPDATE...LIMIT. It uses separate queries to lock and return the job + count = ready_scope.limit(1).update_all(locked_at: now, locked_by: worker.name) + return nil if count == 0 + + where(locked_at: now, locked_by: worker.name, failed_at: nil).first + end - def self.reserve_with_scope_using_optimized_mssql(ready_scope, worker, now) - # The MSSQL driver doesn't generate a limit clause when update_all - # is called directly - subsubquery_sql = ready_scope.limit(1).to_sql - # select("id") doesn't generate a subquery, so force a subquery - subquery_sql = "SELECT id FROM (#{subsubquery_sql}) AS x" - quoted_table_name = connection.quote_table_name(table_name) - sql = "UPDATE #{quoted_table_name} SET locked_at = ?, locked_by = ? WHERE id IN (#{subquery_sql})" - count = connection.execute(sanitize_sql([sql, now, worker.name])) - return nil if count == 0 - - # MSSQL JDBC doesn't support OUTPUT INSERTED.* for returning a result set, so query locked row - where(locked_at: now, locked_by: worker.name, failed_at: nil).first - end + def self.reserve_with_scope_using_optimized_mssql(ready_scope, worker, now) + # The MSSQL driver doesn't generate a limit clause when update_all + # is called directly + subsubquery_sql = ready_scope.limit(1).to_sql + # select("id") doesn't generate a subquery, so force a subquery + subquery_sql = "SELECT id FROM (#{subsubquery_sql}) AS x" + quoted_table_name = connection.quote_table_name(table_name) + sql = "UPDATE #{quoted_table_name} SET locked_at = ?, locked_by = ? WHERE id IN (#{subquery_sql})" + count = connection.execute(sanitize_sql([sql, now, worker.name])) + return nil if count == 0 + + # MSSQL JDBC doesn't support OUTPUT INSERTED.* for returning a result set, so query locked row + where(locked_at: now, locked_by: worker.name, failed_at: nil).first + end - # Get the current time (GMT or local depending on DB) - # Note: This does not ping the DB to get the time, so all your clients - # must have synchronized clocks. - def self.db_time_now - if Time.zone - Time.zone.now - elsif default_timezone == :utc - Time.now.utc - else - Time.now # rubocop:disable Rails/TimeZone + # Get the current time (GMT or local depending on DB) + # Note: This does not ping the DB to get the time, so all your clients + # must have synchronized clocks. + def self.db_time_now + if Time.zone + Time.zone.now + elsif default_timezone == :utc + Time.now.utc + else + Time.now # rubocop:disable Rails/TimeZone + end end - end - def self.default_timezone - if ::ActiveRecord.respond_to?(:default_timezone) - ::ActiveRecord.default_timezone - else - ::ActiveRecord::Base.default_timezone + def self.default_timezone + if ::ActiveRecord.respond_to?(:default_timezone) + ::ActiveRecord.default_timezone + else + ::ActiveRecord::Base.default_timezone + end end - end - def reload(*args) - reset - super + def reload(*args) + reset + super + end end end end