aboutsummaryrefslogtreecommitdiffstats
path: root/activesupport/test/queueing/test_queue_test.rb
blob: 451fb68d3ec310ab91e173cca61ec276e4e34042 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
require 'abstract_unit'
require 'active_support/queueing'

class TestQueueTest < ActiveSupport::TestCase
  def setup
    @queue = ActiveSupport::TestQueue.new
  end

  class ExceptionRaisingJob
    def run
      raise
    end
  end

  def test_drain_raises_exceptions_from_running_jobs
    @queue.push ExceptionRaisingJob.new
    assert_raises(RuntimeError) { @queue.drain }
  end

  def test_jobs
    @queue.push 1
    @queue.push 2
    assert_equal [1,2], @queue.jobs
  end

  class EquivalentJob
    def initialize
      @initial_id = self.object_id
    end

    def run
    end

    def ==(other)
      other.same_initial_id?(@initial_id)
    end

    def same_initial_id?(other_id)
      other_id == @initial_id
    end
  end

  def test_contents
    job = EquivalentJob.new
    assert @queue.empty?
    @queue.push job
    refute @queue.empty?
    assert_equal job, @queue.pop
  end

  class ProcessingJob
    def self.clear_processed
      @processed = []
    end

    def self.processed
      @processed
    end

    def initialize(object)
      @object = object
    end

    def run
      self.class.processed << @object
    end
  end

  def test_order
    ProcessingJob.clear_processed
    job1 = ProcessingJob.new(1)
    job2 = ProcessingJob.new(2)

    @queue.push job1
    @queue.push job2
    @queue.drain

    assert_equal [1,2], ProcessingJob.processed
  end

  class ThreadTrackingJob
    attr_reader :thread_id

    def run
      @thread_id = Thread.current.object_id
    end

    def ran?
      @thread_id
    end
  end

  def test_drain
    @queue.push ThreadTrackingJob.new
    job = @queue.jobs.last
    @queue.drain

    assert @queue.empty?
    assert job.ran?, "The job runs synchronously when the queue is drained"
    assert_equal job.thread_id, Thread.current.object_id
  end

  class IdentifiableJob
    def initialize(id)
      @id = id
    end

    def ==(other)
      other.same_id?(@id)
    end

    def same_id?(other_id)
      other_id == @id
    end

    def run
    end
  end

  def test_queue_can_be_observed
    jobs = (1..10).map do |id|
      IdentifiableJob.new(id)
    end

    jobs.each do |job|
      @queue.push job
    end

    assert_equal jobs, @queue.jobs
  end

  def test_adding_an_unmarshallable_job
    anonymous_class_instance = Struct.new(:run).new

    assert_raises TypeError do
      @queue.push anonymous_class_instance
    end
  end

  def test_attempting_to_add_a_reference_to_itself
    job = {reference: @queue}
    assert_raises TypeError do
      @queue.push job
    end
  end
end