aboutsummaryrefslogtreecommitdiffstats
path: root/activejob/test/support/integration/adapters/sneakers.rb
blob: 875803a2d8d5bb881d889a87563364df548a0015 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
require 'sneakers/runner'
require 'sneakers/publisher'
require 'timeout'

module Sneakers
  class Publisher
    def safe_ensure_connected
      @mutex.synchronize do
        ensure_connection! unless connected?
      end
    end
  end
end


module SneakersJobsManager
  def setup
    ActiveJob::Base.queue_adapter = :sneakers
    Sneakers.configure  :heartbeat => 2,
                        :amqp => 'amqp://guest:guest@localhost:5672',
                        :vhost => '/',
                        :exchange => 'active_jobs_sneakers_int_test',
                        :exchange_type => :direct,
                        :daemonize => true,
                        :threads => 1,
                        :workers => 1,
                        :pid_path => Rails.root.join("tmp/sneakers.pid").to_s,
                        :log => Rails.root.join("log/sneakers.log").to_s
    unless can_run?
      puts "Cannot run integration tests for sneakers. To be able to run integration tests for sneakers you need to install and start rabbitmq.\n"
      exit
    end
  end

  def clear_jobs
    bunny_queue.purge
  end

  def start_workers
    @pid = fork do
      queues = %w(integration_tests)
      workers = queues.map do |q|
        worker_klass = "ActiveJobWorker"+Digest::MD5.hexdigest(q)
        Sneakers.const_set(worker_klass, Class.new(ActiveJob::QueueAdapters::SneakersAdapter::JobWrapper) do
          from_queue q
        end)
      end
      Sneakers::Runner.new(workers).run
    end
    begin
      Timeout.timeout(10) do
        while bunny_queue.status[:consumer_count] == 0
          sleep 0.5
        end
      end
    rescue Timeout::Error
      stop_workers
      raise "Failed to start sneakers worker"
    end
  end

  def stop_workers
    Process.kill 'TERM', @pid
    Process.kill 'TERM', File.open(Rails.root.join("tmp/sneakers.pid").to_s).read.to_i
  rescue
  end

  def can_run?
    begin
      bunny_publisher
    rescue
      return false
    end
    true
  end

  protected
    def bunny_publisher
      @bunny_publisher ||= begin
        p = ActiveJob::QueueAdapters::SneakersAdapter::JobWrapper.send(:publisher)
        p.safe_ensure_connected
        p
      end
    end

    def bunny_queue
      @queue ||= bunny_publisher.exchange.channel.queue "integration_tests", durable: true
    end

end