From d3ff144f89a1d42118f84f075fa3cc9b94fb190e Mon Sep 17 00:00:00 2001 From: Cristian Bica Date: Mon, 19 May 2014 00:47:35 +0300 Subject: Implemented delayed job --- Gemfile | 3 +- Gemfile.lock | 3 + Rakefile | 4 +- .../queue_adapters/delayed_job_adapter.rb | 13 +++ test/adapters/delayed_job.rb | 6 ++ test/cases/adapter_test.rb | 5 + test/helper.rb | 2 +- test/support/delayed_job/delayed/backend/test.rb | 113 +++++++++++++++++++++ .../delayed_job/delayed/serialization/test.rb | 0 9 files changed, 145 insertions(+), 4 deletions(-) create mode 100644 lib/active_job/queue_adapters/delayed_job_adapter.rb create mode 100644 test/adapters/delayed_job.rb create mode 100644 test/support/delayed_job/delayed/backend/test.rb create mode 100644 test/support/delayed_job/delayed/serialization/test.rb diff --git a/Gemfile b/Gemfile index e9b5ee9c8b..f8c97a8cab 100644 --- a/Gemfile +++ b/Gemfile @@ -4,4 +4,5 @@ gemspec gem 'resque' gem 'sidekiq' -gem 'sucker_punch' \ No newline at end of file +gem 'sucker_punch' +gem 'delayed_job' diff --git a/Gemfile.lock b/Gemfile.lock index 51a3812ad9..7005cc4d46 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -16,6 +16,8 @@ GEM celluloid (0.15.2) timers (~> 1.1.0) connection_pool (2.0.0) + delayed_job (4.0.1) + activesupport (>= 3.0, < 4.2) i18n (0.6.9) json (1.8.1) minitest (5.3.4) @@ -58,6 +60,7 @@ PLATFORMS DEPENDENCIES activejob! + delayed_job resque sidekiq sucker_punch diff --git a/Rakefile b/Rakefile index 7f38657160..d0d837602a 100644 --- a/Rakefile +++ b/Rakefile @@ -22,12 +22,12 @@ task :default => :test desc 'Run all adapter tests' task :test do - tasks = %w(test_inline test_resque test_sidekiq test_sucker_punch) + tasks = %w(test_inline test_resque test_sidekiq test_sucker_punch test_delayed_job) run_without_aborting(*tasks) end -%w( inline resque sidekiq sucker_punch).each do |adapter| +%w( inline resque sidekiq sucker_punch delayed_job).each do |adapter| Rake::TestTask.new("test_#{adapter}") do |t| t.libs << 'test' t.test_files = FileList['test/cases/**/*_test.rb'] diff --git a/lib/active_job/queue_adapters/delayed_job_adapter.rb b/lib/active_job/queue_adapters/delayed_job_adapter.rb new file mode 100644 index 0000000000..33229dece4 --- /dev/null +++ b/lib/active_job/queue_adapters/delayed_job_adapter.rb @@ -0,0 +1,13 @@ +require 'delayed_job' + +module ActiveJob + module QueueAdapters + class DelayedJobAdapter + class << self + def queue(job, *args) + job.delay(queue: job.queue_name).perform(*args) + end + end + end + end +end diff --git a/test/adapters/delayed_job.rb b/test/adapters/delayed_job.rb new file mode 100644 index 0000000000..2055a6b6e6 --- /dev/null +++ b/test/adapters/delayed_job.rb @@ -0,0 +1,6 @@ +require 'delayed_job' +$LOAD_PATH << File.dirname(__FILE__) + "/../support/delayed_job" + +Delayed::Worker.delay_jobs = false +Delayed::Worker.backend = :test +ActiveJob::Base.adapter = :delayed_job diff --git a/test/cases/adapter_test.rb b/test/cases/adapter_test.rb index eac92fb27d..c51130bc15 100644 --- a/test/cases/adapter_test.rb +++ b/test/cases/adapter_test.rb @@ -25,6 +25,11 @@ class AdapterTest < ActiveSupport::TestCase assert_equal ActiveJob::QueueAdapters::SuckerPunchAdapter, ActiveJob::Base.queue_adapter end + test 'should load delayed_job adapter' do + ActiveJob::Base.adapter = :delayed_job + assert_equal ActiveJob::QueueAdapters::DelayedJobAdapter, ActiveJob::Base.queue_adapter + end + def teardown ActiveJob::Base.queue_adapter = @old_adapter end diff --git a/test/helper.rb b/test/helper.rb index df0a2a53f8..fc3e2642df 100644 --- a/test/helper.rb +++ b/test/helper.rb @@ -1,7 +1,7 @@ require 'bundler' Bundler.setup -$LOAD_PATH << File.dirname(__FILE__ + "/../lib") +$LOAD_PATH << File.dirname(__FILE__) + "/../lib" require 'active_job' require "adapters/#{ENV['AJADAPTER'] || 'inline'}" diff --git a/test/support/delayed_job/delayed/backend/test.rb b/test/support/delayed_job/delayed/backend/test.rb new file mode 100644 index 0000000000..b50ed36fc2 --- /dev/null +++ b/test/support/delayed_job/delayed/backend/test.rb @@ -0,0 +1,113 @@ +#copied from https://github.com/collectiveidea/delayed_job/blob/master/spec/delayed/backend/test.rb +require 'ostruct' + +# An in-memory backend suitable only for testing. Tries to behave as if it were an ORM. +module Delayed + module Backend + module Test + class Job + attr_accessor :id + attr_accessor :priority + attr_accessor :attempts + attr_accessor :handler + attr_accessor :last_error + attr_accessor :run_at + attr_accessor :locked_at + attr_accessor :locked_by + attr_accessor :failed_at + attr_accessor :queue + + include Delayed::Backend::Base + + cattr_accessor :id + self.id = 0 + + def initialize(hash = {}) + self.attempts = 0 + self.priority = 0 + self.id = (self.class.id += 1) + hash.each{|k,v| send(:"#{k}=", v)} + end + + @jobs = [] + def self.all + @jobs + end + + def self.count + all.size + end + + def self.delete_all + all.clear + end + + def self.create(attrs = {}) + new(attrs).tap do |o| + o.save + end + end + + def self.create!(*args); create(*args); end + + def self.clear_locks!(worker_name) + all.select{|j| j.locked_by == worker_name}.each {|j| j.locked_by = nil; j.locked_at = nil} + end + + # Find a few candidate jobs to run (in case some immediately get locked by others). + def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time) + jobs = all.select do |j| + j.run_at <= db_time_now && + (j.locked_at.nil? || j.locked_at < db_time_now - max_run_time || j.locked_by == worker_name) && + !j.failed? + end + + jobs = jobs.select{|j| Worker.queues.include?(j.queue)} if Worker.queues.any? + jobs = jobs.select{|j| j.priority >= Worker.min_priority} if Worker.min_priority + jobs = jobs.select{|j| j.priority <= Worker.max_priority} if Worker.max_priority + jobs.sort_by{|j| [j.priority, j.run_at]}[0..limit-1] + end + + # Lock this job for this worker. + # Returns true if we have the lock, false otherwise. + def lock_exclusively!(max_run_time, worker) + now = self.class.db_time_now + if locked_by != worker + # We don't own this job so we will update the locked_by name and the locked_at + self.locked_at = now + self.locked_by = worker + end + + return true + end + + def self.db_time_now + Time.current + end + + def update_attributes(attrs = {}) + attrs.each{|k,v| send(:"#{k}=", v)} + save + end + + def destroy + self.class.all.delete(self) + end + + def save + self.run_at ||= Time.current + + self.class.all << self unless self.class.all.include?(self) + true + end + + def save!; save; end + + def reload + reset + self + end + end + end + end +end diff --git a/test/support/delayed_job/delayed/serialization/test.rb b/test/support/delayed_job/delayed/serialization/test.rb new file mode 100644 index 0000000000..e69de29bb2 -- cgit v1.2.3 From 6e18a7469fb43714c8a37e6cf6ff94280cd12465 Mon Sep 17 00:00:00 2001 From: Cristian Bica Date: Mon, 19 May 2014 11:47:12 +0300 Subject: Modified readme --- README.rdoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.rdoc b/README.rdoc index 8531988f33..c83a5b2986 100644 --- a/README.rdoc +++ b/README.rdoc @@ -18,10 +18,10 @@ We currently have adapters for: * Resque 1.x * Sidekiq * Sucker Punch +* Delayed Job We would like to have adapters for: -* Delayed Job * beanstalkd * rabbitmq -- cgit v1.2.3