aboutsummaryrefslogtreecommitdiffstats
path: root/activejob/test/support/integration/adapters/sneakers.rb
blob: 08743c1f05bfed759d26247e9f1c04bc36ec3b8c (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
require "sneakers/runner"
require "sneakers/publisher"
require "timeout"

module Sneakers
  class Publisher
    def safe_ensure_connected
      @mutex.synchronize do
        ensure_connection! unless connected?
      end
    end
  end
end

module SneakersJobsManager
  def setup
    ActiveJob::Base.queue_adapter = :sneakers
    Sneakers.configure  heartbeat: 2,
                        amqp: "amqp://guest:guest@localhost:5672",
                        vhost: "/",
                        exchange: "active_jobs_sneakers_int_test",
                        exchange_type: :direct,
                        daemonize: true,
                        threads: 1,
                        workers: 1,
                        pid_path: Rails.root.join("tmp/sneakers.pid").to_s,
                        log: Rails.root.join("log/sneakers.log").to_s
    unless can_run?
      puts "Cannot run integration tests for sneakers. To be able to run integration tests for sneakers you need to install and start rabbitmq.\n"
      exit
    end
  end

  def clear_jobs
    bunny_queue.purge
  end

  def start_workers
    @pid = fork do
      queues = %w(integration_tests)
      workers = queues.map do |q|
        worker_klass = "ActiveJobWorker"+Digest::MD5.hexdigest(q)
        Sneakers.const_set(worker_klass, Class.new(ActiveJob::QueueAdapters::SneakersAdapter::JobWrapper) do
          from_queue q
        end)
      end
      Sneakers::Runner.new(workers).run
    end
    begin
      Timeout.timeout(10) do
        while bunny_queue.status[:consumer_count] == 0
          sleep 0.5
        end
      end
    rescue Timeout::Error
      stop_workers
      raise "Failed to start sneakers worker"
    end
  end

  def stop_workers
    Process.kill "TERM", @pid
    Process.kill "TERM", File.open(Rails.root.join("tmp/sneakers.pid").to_s).read.to_i
  rescue
  end

  def can_run?
    begin
      bunny_publisher
    rescue
      return false
    end
    true
  end

  protected
    def bunny_publisher
      @bunny_publisher ||= begin
        p = ActiveJob::QueueAdapters::SneakersAdapter::JobWrapper.send(:publisher)
        p.safe_ensure_connected
        p
      end
    end

    def bunny_queue
      @queue ||= bunny_publisher.exchange.channel.queue "integration_tests", durable: true
    end
end