diff options
Diffstat (limited to 'activejob/lib/active_job')
27 files changed, 1230 insertions, 0 deletions
diff --git a/activejob/lib/active_job/arguments.rb b/activejob/lib/active_job/arguments.rb new file mode 100644 index 0000000000..099ea4d169 --- /dev/null +++ b/activejob/lib/active_job/arguments.rb @@ -0,0 +1,81 @@ +module ActiveJob + # Raised when an exception is raised during job arguments deserialization. + # + # Wraps the original exception raised as +original_exception+. + class DeserializationError < StandardError + attr_reader :original_exception + + def initialize(e) #:nodoc: + super("Error while trying to deserialize arguments: #{e.message}") + @original_exception = e + set_backtrace e.backtrace + end + end + + # Raised when an unsupported argument type is being set as job argument. We + # currently support NilClass, Fixnum, Float, String, TrueClass, FalseClass, + # Bignum and object that can be represented as GlobalIDs (ex: Active Record). + # Also raised if you set the key for a Hash something else than a string or + # a symbol. + class SerializationError < ArgumentError + end + + module Arguments + extend self + TYPE_WHITELIST = [ NilClass, Fixnum, Float, String, TrueClass, FalseClass, Bignum ] + + def serialize(arguments) + arguments.map { |argument| serialize_argument(argument) } + end + + def deserialize(arguments) + arguments.map { |argument| deserialize_argument(argument) } + rescue => e + raise DeserializationError.new(e) + end + + private + def serialize_argument(argument) + case argument + when *TYPE_WHITELIST + argument + when GlobalID::Identification + argument.to_global_id.to_s + when Array + argument.map { |arg| serialize_argument(arg) } + when Hash + argument.each_with_object({}) do |(key, value), hash| + hash[serialize_hash_key(key)] = serialize_argument(value) + end + else + raise SerializationError.new("Unsupported argument type: #{argument.class.name}") + end + end + + def deserialize_argument(argument) + case argument + when String + GlobalID::Locator.locate(argument) || argument + when *TYPE_WHITELIST + argument + when Array + argument.map { |arg| deserialize_argument(arg) } + when Hash + argument.each_with_object({}.with_indifferent_access) do |(key, value), hash| + hash[key] = deserialize_argument(value) + end + else + raise ArgumentError, "Can only deserialize primitive arguments: #{argument.inspect}" + end + end + + def serialize_hash_key(key) + case key + when String, Symbol + key.to_s + else + raise SerializationError.new("Only string and symbol hash keys may be serialized as job arguments, but #{key.inspect} is a #{key.class}") + end + end + end +end diff --git a/activejob/lib/active_job/base.rb b/activejob/lib/active_job/base.rb new file mode 100644 index 0000000000..a3bec1f827 --- /dev/null +++ b/activejob/lib/active_job/base.rb @@ -0,0 +1,21 @@ +require 'active_job/core' +require 'active_job/queue_adapter' +require 'active_job/queue_name' +require 'active_job/enqueuing' +require 'active_job/execution' +require 'active_job/callbacks' +require 'active_job/logging' + +module ActiveJob + class Base + include Core + include QueueAdapter + include QueueName + include Enqueuing + include Execution + include Callbacks + include Logging + + ActiveSupport.run_load_hooks(:active_job, self) + end +end diff --git a/activejob/lib/active_job/callbacks.rb b/activejob/lib/active_job/callbacks.rb new file mode 100644 index 0000000000..cafa3438c0 --- /dev/null +++ b/activejob/lib/active_job/callbacks.rb @@ -0,0 +1,144 @@ +require 'active_support/callbacks' + +module ActiveJob + # = Active Job Callbacks + # + # Active Job provides hooks during the lifecycle of a job. Callbacks allow you + # to trigger logic during the lifecycle of a job. Available callbacks are: + # + # * <tt>before_enqueue</tt> + # * <tt>around_enqueue</tt> + # * <tt>after_enqueue</tt> + # * <tt>before_perform</tt> + # * <tt>around_perform</tt> + # * <tt>after_perform</tt> + # + module Callbacks + extend ActiveSupport::Concern + include ActiveSupport::Callbacks + + included do + define_callbacks :perform + define_callbacks :enqueue + end + + module ClassMethods + # Defines a callback that will get called right before the + # job's perform method is executed. + # + # class VideoProcessJob < ActiveJob::Base + # queue_as :default + # + # before_perform do |job| + # UserMailer.notify_video_started_processing(job.arguments.first) + # end + # + # def perform(video_id) + # Video.find(video_id).process + # end + # end + # + def before_perform(*filters, &blk) + set_callback(:perform, :before, *filters, &blk) + end + + # Defines a callback that will get called right after the + # job's perform method has finished. + # + # class VideoProcessJob < ActiveJob::Base + # queue_as :default + # + # after_perform do |job| + # UserMailer.notify_video_processed(job.arguments.first) + # end + # + # def perform(video_id) + # Video.find(video_id).process + # end + # end + # + def after_perform(*filters, &blk) + set_callback(:perform, :after, *filters, &blk) + end + + # Defines a callback that will get called around the job's perform method. + # + # class VideoProcessJob < ActiveJob::Base + # queue_as :default + # + # around_perform do |job, block| + # UserMailer.notify_video_started_processing(job.arguments.first) + # block.call + # UserMailer.notify_video_processed(job.arguments.first) + # end + # + # def perform(video_id) + # Video.find(video_id).process + # end + # end + # + def around_perform(*filters, &blk) + set_callback(:perform, :around, *filters, &blk) + end + + # Defines a callback that will get called right before the + # job is enqueued. + # + # class VideoProcessJob < ActiveJob::Base + # queue_as :default + # + # before_enqueue do |job| + # $statsd.increment "enqueue-video-job.try" + # end + # + # def perform(video_id) + # Video.find(video_id).process + # end + # end + # + def before_enqueue(*filters, &blk) + set_callback(:enqueue, :before, *filters, &blk) + end + + # Defines a callback that will get called right after the + # job is enqueued. + # + # class VideoProcessJob < ActiveJob::Base + # queue_as :default + # + # after_enqueue do |job| + # $statsd.increment "enqueue-video-job.success" + # end + # + # def perform(video_id) + # Video.find(video_id).process + # end + # end + # + def after_enqueue(*filters, &blk) + set_callback(:enqueue, :after, *filters, &blk) + end + + # Defines a callback that will get called before and after the + # job is enqueued. + # + # class VideoProcessJob < ActiveJob::Base + # queue_as :default + # + # around_enqueue do |job, block| + # $statsd.time "video-job.process" do + # block.call + # end + # end + # + # def perform(video_id) + # Video.find(video_id).process + # end + # end + # + def around_enqueue(*filters, &blk) + set_callback(:enqueue, :around, *filters, &blk) + end + end + end +end diff --git a/activejob/lib/active_job/configured_job.rb b/activejob/lib/active_job/configured_job.rb new file mode 100644 index 0000000000..979280b910 --- /dev/null +++ b/activejob/lib/active_job/configured_job.rb @@ -0,0 +1,16 @@ +module ActiveJob + class ConfiguredJob #:nodoc: + def initialize(job_class, options={}) + @options = options + @job_class = job_class + end + + def perform_now(*args) + @job_class.new(*args).perform_now + end + + def perform_later(*args) + @job_class.new(*args).enqueue @options + end + end +end diff --git a/activejob/lib/active_job/core.rb b/activejob/lib/active_job/core.rb new file mode 100644 index 0000000000..b6dd03a0bc --- /dev/null +++ b/activejob/lib/active_job/core.rb @@ -0,0 +1,89 @@ +module ActiveJob + module Core + extend ActiveSupport::Concern + + included do + # Job arguments + attr_accessor :arguments + attr_writer :serialized_arguments + + # Timestamp when the job should be performed + attr_accessor :scheduled_at + + # Job Identifier + attr_accessor :job_id + + # Queue on which the job should be run on. + attr_writer :queue_name + end + + module ClassMethods + # Creates a new job instance from a hash created with +serialize+ + def deserialize(job_data) + job = job_data['job_class'].constantize.new + job.job_id = job_data['job_id'] + job.queue_name = job_data['queue_name'] + job.serialized_arguments = job_data['arguments'] + job + end + + # Creates a job preconfigured with the given options. You can call + # perform_later with the job arguments to enqueue the job with the + # preconfigured options + # + # ==== Options + # * <tt>:wait</tt> - Enqueues the job with the specified delay + # * <tt>:wait_until</tt> - Enqueues the job at the time specified + # * <tt>:queue</tt> - Enqueues the job on the specified queue + # + # ==== Examples + # + # VideoJob.set(queue: :some_queue).perform_later(Video.last) + # VideoJob.set(wait: 5.minutes).perform_later(Video.last) + # VideoJob.set(wait_until: Time.tomorroe).perform_later(Video.last) + # VideoJob.set(queue: :some_queue, wait: 5.minutes).perform_later(Video.last) + # VideoJob.set(queue: :some_queue, wait_until: Time.tomorroe).perform_later(Video.last) + def set(options={}) + ConfiguredJob.new(self, options) + end + end + + # Creates a new job instance. Takes as arguments the arguments that + # will be passed to the perform method. + def initialize(*arguments) + @arguments = arguments + @job_id = SecureRandom.uuid + @queue_name = self.class.queue_name + end + + # Returns a hash with the job data that can safely be passed to the + # queueing adapter. + def serialize + { + 'job_class' => self.class.name, + 'job_id' => job_id, + 'queue_name' => queue_name, + 'arguments' => serialize_arguments(arguments) + } + end + + private + def deserialize_arguments_if_needed + if defined?(@serialized_arguments) && @serialized_arguments.present? + @arguments = deserialize_arguments(@serialized_arguments) + @serialized_arguments = nil + end + end + + def serialize_arguments(serialized_args) + Arguments.serialize(serialized_args) + end + + def deserialize_arguments(serialized_args) + Arguments.deserialize(serialized_args) + end + end +end + + + diff --git a/activejob/lib/active_job/enqueuing.rb b/activejob/lib/active_job/enqueuing.rb new file mode 100644 index 0000000000..e8bc44cbc4 --- /dev/null +++ b/activejob/lib/active_job/enqueuing.rb @@ -0,0 +1,75 @@ +require 'active_job/arguments' + +module ActiveJob + module Enqueuing + extend ActiveSupport::Concern + + module ClassMethods + # Push a job onto the queue. The arguments must be legal JSON types + # (string, int, float, nil, true, false, hash or array) or + # GlobalID::Identification instances. Arbitrary Ruby objects + # are not supported. + # + # Returns an instance of the job class queued with args available in + # Job#arguments. + def perform_later(*args) + job_or_instantiate(*args).enqueue + end + + protected + def job_or_instantiate(*args) + args.first.is_a?(self) ? args.first : new(*args) + end + end + + # Reschedule the job to be re-executed. This is usefull in combination + # with the +rescue_from+ option. When you rescue an exception from your job + # you can ask Active Job to retry performing your job. + # + # ==== Options + # * <tt>:wait</tt> - Enqueues the job with the specified delay + # * <tt>:wait_until</tt> - Enqueues the job at the time specified + # * <tt>:queue</tt> - Enqueues the job on the specified queue + # + # ==== Examples + # + # class SiteScrapperJob < ActiveJob::Base + # rescue_from(ErrorLoadingSite) do + # retry_job queue: :low_priority + # end + # def perform(*args) + # # raise ErrorLoadingSite if cannot scrape + # end + # end + def retry_job(options={}) + enqueue options + end + + # Equeue the job to be performed by the queue adapter. + # + # ==== Options + # * <tt>:wait</tt> - Enqueues the job with the specified delay + # * <tt>:wait_until</tt> - Enqueues the job at the time specified + # * <tt>:queue</tt> - Enqueues the job on the specified queue + # + # ==== Examples + # + # my_job_instance.enqueue + # my_job_instance.enqueue wait: 5.minutes + # my_job_instance.enqueue queue: :important + # my_job_instance.enqueue wait_until: Date.tomorrow.midnight + def enqueue(options={}) + self.scheduled_at = options[:wait].seconds.from_now.to_f if options[:wait] + self.scheduled_at = options[:wait_until].to_f if options[:wait_until] + self.queue_name = self.class.queue_name_from_part(options[:queue]) if options[:queue] + run_callbacks :enqueue do + if self.scheduled_at + self.class.queue_adapter.enqueue_at self, self.scheduled_at + else + self.class.queue_adapter.enqueue self + end + end + self + end + end +end diff --git a/activejob/lib/active_job/execution.rb b/activejob/lib/active_job/execution.rb new file mode 100644 index 0000000000..d6d67c46e3 --- /dev/null +++ b/activejob/lib/active_job/execution.rb @@ -0,0 +1,41 @@ +require 'active_support/rescuable' +require 'active_job/arguments' + +module ActiveJob + module Execution + extend ActiveSupport::Concern + include ActiveSupport::Rescuable + + module ClassMethods + # Performs the job immediately. + # + # MyJob.perform_now("mike") + # + def perform_now(*args) + job_or_instantiate(*args).perform_now + end + + def execute(job_data) #:nodoc: + job = deserialize(job_data) + job.perform_now + end + end + + # Performs the job immediately. The job is not sent to the queueing adapter + # and will block the execution until it's finished. + # + # MyJob.new(*args).perform_now + def perform_now + deserialize_arguments_if_needed + run_callbacks :perform do + perform(*arguments) + end + rescue => exception + rescue_with_handler(exception) || raise(exception) + end + + def perform(*) + fail NotImplementedError + end + end +end diff --git a/activejob/lib/active_job/gem_version.rb b/activejob/lib/active_job/gem_version.rb new file mode 100644 index 0000000000..1f25d48326 --- /dev/null +++ b/activejob/lib/active_job/gem_version.rb @@ -0,0 +1,15 @@ +module ActiveJob + # Returns the version of the currently loaded Active Job as a <tt>Gem::Version</tt> + def self.gem_version + Gem::Version.new VERSION::STRING + end + + module VERSION + MAJOR = 4 + MINOR = 2 + TINY = 0 + PRE = "beta1" + + STRING = [MAJOR, MINOR, TINY, PRE].compact.join(".") + end +end diff --git a/activejob/lib/active_job/logging.rb b/activejob/lib/active_job/logging.rb new file mode 100644 index 0000000000..962005cd15 --- /dev/null +++ b/activejob/lib/active_job/logging.rb @@ -0,0 +1,102 @@ +require 'active_support/core_ext/string/filters' +require 'active_support/tagged_logging' +require 'active_support/logger' + +module ActiveJob + module Logging + extend ActiveSupport::Concern + + included do + cattr_accessor(:logger) { ActiveSupport::TaggedLogging.new(ActiveSupport::Logger.new(STDOUT)) } + + around_enqueue do |_, block, _| + tag_logger do + block.call + end + end + + around_perform do |job, block, _| + tag_logger(job.class.name, job.job_id) do + payload = {adapter: job.class.queue_adapter, job: job} + ActiveSupport::Notifications.instrument("perform_start.active_job", payload.dup) + ActiveSupport::Notifications.instrument("perform.active_job", payload) do + block.call + end + end + end + + before_enqueue do |job| + if job.scheduled_at + ActiveSupport::Notifications.instrument "enqueue_at.active_job", + adapter: job.class.queue_adapter, job: job + else + ActiveSupport::Notifications.instrument "enqueue.active_job", + adapter: job.class.queue_adapter, job: job + end + end + end + + private + def tag_logger(*tags) + if logger.respond_to?(:tagged) + tags.unshift "ActiveJob" unless logger_tagged_by_active_job? + ActiveJob::Base.logger.tagged(*tags){ yield } + else + yield + end + end + + def logger_tagged_by_active_job? + logger.formatter.current_tags.include?("ActiveJob") + end + + class LogSubscriber < ActiveSupport::LogSubscriber + def enqueue(event) + info do + job = event.payload[:job] + "Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)}" + args_info(job) + end + end + + def enqueue_at(event) + info do + job = event.payload[:job] + "Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)} at #{scheduled_at(event)}" + args_info(job) + end + end + + def perform_start(event) + info do + job = event.payload[:job] + "Performing #{job.class.name} from #{queue_name(event)}" + args_info(job) + end + end + + def perform(event) + info do + job = event.payload[:job] + "Performed #{job.class.name} from #{queue_name(event)} in #{event.duration.round(2).to_s}ms" + end + end + + private + def queue_name(event) + event.payload[:adapter].name.demodulize.remove('Adapter') + "(#{event.payload[:job].queue_name})" + end + + def args_info(job) + job.arguments.any? ? " with arguments: #{job.arguments.map(&:inspect).join(", ")}" : "" + end + + def scheduled_at(event) + Time.at(event.payload[:job].scheduled_at).utc + end + + def logger + ActiveJob::Base.logger + end + end + end +end + +ActiveJob::Logging::LogSubscriber.attach_to :active_job diff --git a/activejob/lib/active_job/queue_adapter.rb b/activejob/lib/active_job/queue_adapter.rb new file mode 100644 index 0000000000..fb54aec75e --- /dev/null +++ b/activejob/lib/active_job/queue_adapter.rb @@ -0,0 +1,29 @@ +require 'active_job/queue_adapters/inline_adapter' +require 'active_support/core_ext/string/inflections' + +module ActiveJob + module QueueAdapter + extend ActiveSupport::Concern + + module ClassMethods + mattr_reader(:queue_adapter) { ActiveJob::QueueAdapters::InlineAdapter } + + def queue_adapter=(name_or_adapter) + @@queue_adapter = \ + case name_or_adapter + when :test + ActiveJob::QueueAdapters::TestAdapter.new + when Symbol, String + load_adapter(name_or_adapter) + when Class + name_or_adapter + end + end + + private + def load_adapter(name) + "ActiveJob::QueueAdapters::#{name.to_s.camelize}Adapter".constantize + end + end + end +end
\ No newline at end of file diff --git a/activejob/lib/active_job/queue_adapters.rb b/activejob/lib/active_job/queue_adapters.rb new file mode 100644 index 0000000000..345b01ef00 --- /dev/null +++ b/activejob/lib/active_job/queue_adapters.rb @@ -0,0 +1,17 @@ +module ActiveJob + module QueueAdapters + extend ActiveSupport::Autoload + + autoload :InlineAdapter + autoload :BackburnerAdapter + autoload :DelayedJobAdapter + autoload :QuAdapter + autoload :QueAdapter + autoload :QueueClassicAdapter + autoload :ResqueAdapter + autoload :SidekiqAdapter + autoload :SneakersAdapter + autoload :SuckerPunchAdapter + autoload :TestAdapter + end +end diff --git a/activejob/lib/active_job/queue_adapters/backburner_adapter.rb b/activejob/lib/active_job/queue_adapters/backburner_adapter.rb new file mode 100644 index 0000000000..e1b00f1de7 --- /dev/null +++ b/activejob/lib/active_job/queue_adapters/backburner_adapter.rb @@ -0,0 +1,26 @@ +require 'backburner' + +module ActiveJob + module QueueAdapters + class BackburnerAdapter + class << self + def enqueue(job) + Backburner::Worker.enqueue JobWrapper, [ job.serialize ], queue: job.queue_name + end + + def enqueue_at(job, timestamp) + delay = timestamp - Time.current.to_f + Backburner::Worker.enqueue JobWrapper, [ job.serialize ], queue: job.queue_name, delay: delay + end + end + + class JobWrapper + class << self + def perform(job_data) + Base.execute job_data + end + end + end + end + end +end diff --git a/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb b/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb new file mode 100644 index 0000000000..658799edfc --- /dev/null +++ b/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb @@ -0,0 +1,23 @@ +require 'delayed_job' + +module ActiveJob + module QueueAdapters + class DelayedJobAdapter + class << self + def enqueue(job) + JobWrapper.new.delay(queue: job.queue_name).perform(job.serialize) + end + + def enqueue_at(job, timestamp) + JobWrapper.new.delay(queue: job.queue_name, run_at: Time.at(timestamp)).perform(job.serialize) + end + end + + class JobWrapper + def perform(job_data) + Base.execute(job_data) + end + end + end + end +end diff --git a/activejob/lib/active_job/queue_adapters/inline_adapter.rb b/activejob/lib/active_job/queue_adapters/inline_adapter.rb new file mode 100644 index 0000000000..fdefa38d5e --- /dev/null +++ b/activejob/lib/active_job/queue_adapters/inline_adapter.rb @@ -0,0 +1,15 @@ +module ActiveJob + module QueueAdapters + class InlineAdapter + class << self + def enqueue(job) + Base.execute(job.serialize) + end + + def enqueue_at(*) + raise NotImplementedError.new("Use a queueing backend to enqueue jobs in the future. Read more at https://github.com/rails/activejob") + end + end + end + end +end diff --git a/activejob/lib/active_job/queue_adapters/qu_adapter.rb b/activejob/lib/active_job/queue_adapters/qu_adapter.rb new file mode 100644 index 0000000000..f681fd7e8a --- /dev/null +++ b/activejob/lib/active_job/queue_adapters/qu_adapter.rb @@ -0,0 +1,29 @@ +require 'qu' + +module ActiveJob + module QueueAdapters + class QuAdapter + class << self + def enqueue(job, *args) + Qu::Payload.new(klass: JobWrapper, args: [job.serialize]).tap do |payload| + payload.instance_variable_set(:@queue, job.queue_name) + end.push + end + + def enqueue_at(job, timestamp, *args) + raise NotImplementedError + end + end + + class JobWrapper < Qu::Job + def initialize(job_data) + @job_data = job_data + end + + def perform + Base.execute @job_data + end + end + end + end +end diff --git a/activejob/lib/active_job/queue_adapters/que_adapter.rb b/activejob/lib/active_job/queue_adapters/que_adapter.rb new file mode 100644 index 0000000000..51891ab07b --- /dev/null +++ b/activejob/lib/active_job/queue_adapters/que_adapter.rb @@ -0,0 +1,23 @@ +require 'que' + +module ActiveJob + module QueueAdapters + class QueAdapter + class << self + def enqueue(job) + JobWrapper.enqueue job.serialize, queue: job.queue_name + end + + def enqueue_at(job, timestamp) + JobWrapper.enqueue job.serialize, queue: job.queue_name, run_at: Time.at(timestamp) + end + end + + class JobWrapper < Que::Job + def run(job_data) + Base.execute job_data + end + end + end + end +end diff --git a/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb b/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb new file mode 100644 index 0000000000..ddcc868317 --- /dev/null +++ b/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb @@ -0,0 +1,40 @@ +require 'queue_classic' + +module ActiveJob + module QueueAdapters + class QueueClassicAdapter + class << self + def enqueue(job) + build_queue(job.queue_name).enqueue("#{JobWrapper.name}.perform", job.serialize) + end + + def enqueue_at(job, timestamp) + queue = build_queue(job.queue_name) + unless queue.respond_to?(:enqueue_at) + raise NotImplementedError, 'To be able to schedule jobs with Queue Classic ' \ + 'the QC::Queue needs to respond to `enqueue_at(timestamp, method, *args)`. ' + 'You can implement this yourself or you can use the queue_classic-later gem.' + end + queue.enqueue_at(timestamp, "#{JobWrapper.name}.perform", job.serialize) + end + + # Builds a <tt>QC::Queue</tt> object to schedule jobs on. + # + # If you have a custom <tt>QC::Queue</tt> subclass you'll need to suclass + # <tt>ActiveJob::QueueAdapters::QueueClassicAdapter</tt> and override the + # <tt>build_queue</tt> method. + def build_queue(queue_name) + QC::Queue.new(queue_name) + end + end + + class JobWrapper + class << self + def perform(job_data) + Base.execute job_data + end + end + end + end + end +end diff --git a/activejob/lib/active_job/queue_adapters/resque_adapter.rb b/activejob/lib/active_job/queue_adapters/resque_adapter.rb new file mode 100644 index 0000000000..affa3bdfbc --- /dev/null +++ b/activejob/lib/active_job/queue_adapters/resque_adapter.rb @@ -0,0 +1,41 @@ +require 'resque' +require 'active_support/core_ext/enumerable' +require 'active_support/core_ext/array/access' + +begin + require 'resque-scheduler' +rescue LoadError + begin + require 'resque_scheduler' + rescue LoadError + false + end +end + +module ActiveJob + module QueueAdapters + class ResqueAdapter + class << self + def enqueue(job) + Resque.enqueue_to job.queue_name, JobWrapper, job.serialize + end + + def enqueue_at(job, timestamp) + unless Resque.respond_to?(:enqueue_at_with_queue) + raise NotImplementedError, "To be able to schedule jobs with Resque you need the " \ + "resque-scheduler gem. Please add it to your Gemfile and run bundle install" + end + Resque.enqueue_at_with_queue job.queue_name, timestamp, JobWrapper, job.serialize + end + end + + class JobWrapper + class << self + def perform(job_data) + Base.execute job_data + end + end + end + end + end +end diff --git a/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb b/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb new file mode 100644 index 0000000000..79926a1e24 --- /dev/null +++ b/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb @@ -0,0 +1,35 @@ +require 'sidekiq' + +module ActiveJob + module QueueAdapters + class SidekiqAdapter + class << self + def enqueue(job) + #Sidekiq::Client does not support symbols as keys + Sidekiq::Client.push \ + 'class' => JobWrapper, + 'queue' => job.queue_name, + 'args' => [ job.serialize ], + 'retry' => true + end + + def enqueue_at(job, timestamp) + Sidekiq::Client.push \ + 'class' => JobWrapper, + 'queue' => job.queue_name, + 'args' => [ job.serialize ], + 'retry' => true, + 'at' => timestamp + end + end + + class JobWrapper + include Sidekiq::Worker + + def perform(job_data) + Base.execute job_data + end + end + end + end +end diff --git a/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb b/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb new file mode 100644 index 0000000000..1ab0a87485 --- /dev/null +++ b/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb @@ -0,0 +1,34 @@ +require 'sneakers' +require 'thread' + +module ActiveJob + module QueueAdapters + class SneakersAdapter + @monitor = Monitor.new + + class << self + def enqueue(job) + @monitor.synchronize do + JobWrapper.from_queue job.queue_name + JobWrapper.enqueue ActiveSupport::JSON.encode(job.serialize) + end + end + + def enqueue_at(job, timestamp) + raise NotImplementedError + end + end + + class JobWrapper + include Sneakers::Worker + from_queue 'default' + + def work(msg) + job_data = ActiveSupport::JSON.decode(msg) + Base.execute job_data + ack! + end + end + end + end +end diff --git a/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb b/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb new file mode 100644 index 0000000000..b19a38093f --- /dev/null +++ b/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb @@ -0,0 +1,25 @@ +require 'sucker_punch' + +module ActiveJob + module QueueAdapters + class SuckerPunchAdapter + class << self + def enqueue(job) + JobWrapper.new.async.perform job.serialize + end + + def enqueue_at(job, timestamp) + raise NotImplementedError + end + end + + class JobWrapper + include SuckerPunch::Job + + def perform(job_data) + Base.execute job_data + end + end + end + end +end diff --git a/activejob/lib/active_job/queue_adapters/test_adapter.rb b/activejob/lib/active_job/queue_adapters/test_adapter.rb new file mode 100644 index 0000000000..12ef72310d --- /dev/null +++ b/activejob/lib/active_job/queue_adapters/test_adapter.rb @@ -0,0 +1,37 @@ +module ActiveJob + module QueueAdapters + class TestAdapter + delegate :name, to: :class + attr_accessor(:perform_enqueued_jobs, :perform_enqueued_at_jobs) + attr_writer(:enqueued_jobs, :performed_jobs) + + # Provides a store of all the enqueued jobs with the TestAdapter so you can check them. + def enqueued_jobs + @enqueued_jobs ||= [] + end + + # Provides a store of all the performed jobs with the TestAdapter so you can check them. + def performed_jobs + @performed_jobs ||= [] + end + + def enqueue(job) + if perform_enqueued_jobs + performed_jobs << {job: job.class, args: job.arguments, queue: job.queue_name} + job.perform_now + else + enqueued_jobs << {job: job.class, args: job.arguments, queue: job.queue_name} + end + end + + def enqueue_at(job, timestamp) + if perform_enqueued_at_jobs + performed_jobs << {job: job.class, args: job.arguments, queue: job.queue_name, at: timestamp} + job.perform_now + else + enqueued_jobs << {job: job.class, args: job.arguments, queue: job.queue_name, at: timestamp} + end + end + end + end +end diff --git a/activejob/lib/active_job/queue_name.rb b/activejob/lib/active_job/queue_name.rb new file mode 100644 index 0000000000..45acb71605 --- /dev/null +++ b/activejob/lib/active_job/queue_name.rb @@ -0,0 +1,38 @@ +module ActiveJob + module QueueName + extend ActiveSupport::Concern + + module ClassMethods + mattr_accessor(:queue_name_prefix) + mattr_accessor(:default_queue_name) { "default" } + + def queue_as(part_name=nil, &block) + if block_given? + self.queue_name = block + else + self.queue_name = queue_name_from_part(part_name) + end + end + + def queue_name_from_part(part_name) #:nodoc: + queue_name = part_name.to_s.presence || default_queue_name + name_parts = [queue_name_prefix.presence, queue_name] + name_parts.compact.join('_') + end + end + + included do + class_attribute :queue_name, instance_accessor: false + self.queue_name = default_queue_name + end + + # Returns the name of the queue the job will be run on + def queue_name + if @queue_name.is_a?(Proc) + @queue_name = self.class.queue_name_from_part(instance_exec(&@queue_name)) + end + @queue_name + end + + end +end diff --git a/activejob/lib/active_job/railtie.rb b/activejob/lib/active_job/railtie.rb new file mode 100644 index 0000000000..6538ac1b30 --- /dev/null +++ b/activejob/lib/active_job/railtie.rb @@ -0,0 +1,23 @@ +require 'global_id/railtie' +require 'active_job' + +module ActiveJob + # = Active Job Railtie + class Railtie < Rails::Railtie # :nodoc: + config.active_job = ActiveSupport::OrderedOptions.new + + initializer 'active_job.logger' do + ActiveSupport.on_load(:active_job) { self.logger = ::Rails.logger } + end + + initializer "active_job.set_configs" do |app| + options = app.config.active_job + options.queue_adapter ||= :inline + + ActiveSupport.on_load(:active_job) do + options.each { |k,v| send("#{k}=", v) } + end + end + + end +end diff --git a/activejob/lib/active_job/test_case.rb b/activejob/lib/active_job/test_case.rb new file mode 100644 index 0000000000..d894a7b5cd --- /dev/null +++ b/activejob/lib/active_job/test_case.rb @@ -0,0 +1,7 @@ +require 'active_support/test_case' + +module ActiveJob + class TestCase < ActiveSupport::TestCase + include ActiveJob::TestHelper + end +end diff --git a/activejob/lib/active_job/test_helper.rb b/activejob/lib/active_job/test_helper.rb new file mode 100644 index 0000000000..fa0576669e --- /dev/null +++ b/activejob/lib/active_job/test_helper.rb @@ -0,0 +1,196 @@ +module ActiveJob + # Provides helper methods for testing Active Job + module TestHelper + extend ActiveSupport::Concern + + included do + def before_setup + @old_queue_adapter = queue_adapter + ActiveJob::Base.queue_adapter = :test + clear_enqueued_jobs + clear_performed_jobs + super + end + + def after_teardown + super + ActiveJob::Base.queue_adapter = @old_queue_adapter + end + + # Asserts that the number of enqueued jobs matches the given number. + # + # def test_jobs + # assert_enqueued_jobs 0 + # HelloJob.perform_later('david') + # assert_enqueued_jobs 1 + # HelloJob.perform_later('abdelkader') + # assert_enqueued_jobs 2 + # end + # + # If a block is passed, that block should cause the specified number of + # jobs to be enqueued. + # + # def test_jobs_again + # assert_enqueued_jobs 1 do + # HelloJob.perform_later('cristian') + # end + # + # assert_enqueued_jobs 2 do + # HelloJob.perform_later('aaron') + # HelloJob.perform_later('rafael') + # end + # end + def assert_enqueued_jobs(number) + if block_given? + original_count = enqueued_jobs.size + yield + new_count = enqueued_jobs.size + assert_equal original_count + number, new_count, + "#{number} jobs expected, but #{new_count - original_count} were enqueued" + else + enqueued_jobs_size = enqueued_jobs.size + assert_equal number, enqueued_jobs_size, "#{number} jobs expected, but #{enqueued_jobs_size} were enqueued" + end + end + + # Assert that no job have been enqueued. + # + # def test_jobs + # assert_no_enqueued_jobs + # HelloJob.perform_later('jeremy') + # assert_enqueued_jobs 1 + # end + # + # If a block is passed, that block should not cause any job to be enqueued. + # + # def test_jobs_again + # assert_no_enqueued_jobs do + # # No job should be enqueued from this block + # end + # end + # + # Note: This assertion is simply a shortcut for: + # + # assert_enqueued_jobs 0 + def assert_no_enqueued_jobs(&block) + assert_enqueued_jobs 0, &block + end + + # Asserts that the number of performed jobs matches the given number. + # + # def test_jobs + # assert_performed_jobs 0 + # HelloJob.perform_later('xavier') + # assert_performed_jobs 1 + # HelloJob.perform_later('yves') + # assert_performed_jobs 2 + # end + # + # If a block is passed, that block should cause the specified number of + # jobs to be performed. + # + # def test_jobs_again + # assert_performed_jobs 1 do + # HelloJob.perform_later('robin') + # end + # + # assert_performed_jobs 2 do + # HelloJob.perform_later('carlos') + # HelloJob.perform_later('sean') + # end + # end + def assert_performed_jobs(number) + if block_given? + original_count = performed_jobs.size + yield + new_count = performed_jobs.size + assert_equal original_count + number, new_count, + "#{number} jobs expected, but #{new_count - original_count} were performed" + else + performed_jobs_size = performed_jobs.size + assert_equal number, performed_jobs_size, "#{number} jobs expected, but #{performed_jobs_size} were performed" + end + end + + # Asserts that no jobs have been performed. + # + # def test_jobs + # assert_no_performed_jobs + # HelloJob.perform_later('matthew') + # assert_performed_jobs 1 + # end + # + # If a block is passed, that block should not cause any job to be performed. + # + # def test_jobs_again + # assert_no_performed_jobs do + # # No job should be performed from this block + # end + # end + # + # Note: This assertion is simply a shortcut for: + # + # assert_performed_jobs 0 + def assert_no_performed_jobs(&block) + assert_performed_jobs 0, &block + end + + # Asserts that the job passed in the block has been enqueued with the given arguments. + # + # def assert_enqueued_job + # assert_enqueued_with(job: MyJob, args: [1,2,3], queue: 'low') do + # MyJob.perform_later(1,2,3) + # end + # end + def assert_enqueued_with(args = {}, &_block) + original_enqueued_jobs = enqueued_jobs.dup + clear_enqueued_jobs + args.assert_valid_keys(:job, :args, :at, :queue) + yield + matching_job = enqueued_jobs.any? do |job| + args.all? { |key, value| value == job[key] } + end + assert matching_job + ensure + queue_adapter.enqueued_jobs = original_enqueued_jobs + enqueued_jobs + end + + # Asserts that the job passed in the block has been performed with the given arguments. + # + # def test_assert_performed_with + # assert_performed_with(job: MyJob, args: [1,2,3], queue: 'high') do + # MyJob.perform_later(1,2,3) + # end + # end + def assert_performed_with(args = {}, &_block) + original_performed_jobs = performed_jobs.dup + clear_performed_jobs + args.assert_valid_keys(:job, :args, :at, :queue) + yield + matching_job = performed_jobs.any? do |job| + args.all? { |key, value| value == job[key] } + end + assert matching_job, "No performed job found with #{args}" + ensure + queue_adapter.performed_jobs = original_performed_jobs + performed_jobs + end + + def queue_adapter + ActiveJob::Base.queue_adapter + end + + delegate :enqueued_jobs, :enqueued_jobs=, + :performed_jobs, :performed_jobs=, + to: :queue_adapter + + private + def clear_enqueued_jobs + enqueued_jobs.clear + end + + def clear_performed_jobs + performed_jobs.clear + end + end + end +end diff --git a/activejob/lib/active_job/version.rb b/activejob/lib/active_job/version.rb new file mode 100644 index 0000000000..971ba9fe0c --- /dev/null +++ b/activejob/lib/active_job/version.rb @@ -0,0 +1,8 @@ +require_relative 'gem_version' + +module ActiveJob + # Returns the version of the currently loaded Active Job as a <tt>Gem::Version</tt> + def self.version + gem_version + end +end |