aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/test/subscription_adapter/common.rb
blob: 361858784e60f4914ac5d2a3ec7791256d7f78e3 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
require 'test_helper'
require 'concurrent'

require 'active_support/core_ext/hash/indifferent_access'
require 'pathname'

module CommonSubscriptionAdapterTest
  WAIT_WHEN_EXPECTING_EVENT = 3
  WAIT_WHEN_NOT_EXPECTING_EVENT = 0.2

  def setup
    # TODO: ActionCable requires a *lot* of setup at the moment...
    ::Object.const_set(:ApplicationCable, Module.new)
    ::ApplicationCable.const_set(:Connection, Class.new(ActionCable::Connection::Base))

    ::Object.const_set(:Rails, Module.new)
    ::Rails.singleton_class.send(:define_method, :root) { Pathname.new(__dir__) }

    server = ActionCable::Server::Base.new
    server.config = ActionCable::Server::Configuration.new
    inner_logger = Logger.new(StringIO.new).tap { |l| l.level = Logger::UNKNOWN }
    server.config.logger = ActionCable::Connection::TaggedLoggerProxy.new(inner_logger, tags: [])


    # and now the "real" setup for our test:
    server.config.cable = cable_config.with_indifferent_access

    adapter_klass = server.config.pubsub_adapter

    @rx_adapter = adapter_klass.new(server)
    @tx_adapter = adapter_klass.new(server)
  end

  def teardown
    @tx_adapter.shutdown if @tx_adapter && @tx_adapter != @rx_adapter
    @rx_adapter.shutdown if @rx_adapter

    begin
      ::Object.send(:remove_const, :ApplicationCable)
    rescue NameError
    end
    begin
      ::Object.send(:remove_const, :Rails)
    rescue NameError
    end
  end


  def subscribe_as_queue(channel, adapter = @rx_adapter)
    queue = Queue.new

    callback = -> data { queue << data }
    subscribed = Concurrent::Event.new
    adapter.subscribe(channel, callback, Proc.new { subscribed.set })
    subscribed.wait(WAIT_WHEN_EXPECTING_EVENT)
    assert subscribed.set?

    yield queue

    sleep WAIT_WHEN_NOT_EXPECTING_EVENT
    assert_empty queue
  ensure
    adapter.unsubscribe(channel, callback) if subscribed.set?
  end


  def test_subscribe_and_unsubscribe
    subscribe_as_queue('channel') do |queue|
    end
  end

  def test_basic_broadcast
    subscribe_as_queue('channel') do |queue|
      @tx_adapter.broadcast('channel', 'hello world')

      assert_equal 'hello world', queue.pop
    end
  end

  def test_broadcast_after_unsubscribe
    keep_queue = nil
    subscribe_as_queue('channel') do |queue|
      keep_queue = queue

      @tx_adapter.broadcast('channel', 'hello world')

      assert_equal 'hello world', queue.pop
    end

    @tx_adapter.broadcast('channel', 'hello void')

    sleep WAIT_WHEN_NOT_EXPECTING_EVENT
    assert_empty keep_queue
  end

  def test_multiple_broadcast
    subscribe_as_queue('channel') do |queue|
      @tx_adapter.broadcast('channel', 'bananas')
      @tx_adapter.broadcast('channel', 'apples')

      received = []
      2.times { received << queue.pop }
      assert_equal ['apples', 'bananas'], received.sort
    end
  end

  def test_identical_subscriptions
    subscribe_as_queue('channel') do |queue|
      subscribe_as_queue('channel') do |queue_2|
        @tx_adapter.broadcast('channel', 'hello')

        assert_equal 'hello', queue_2.pop
      end

      assert_equal 'hello', queue.pop
    end
  end

  def test_simultaneous_subscriptions
    subscribe_as_queue('channel') do |queue|
      subscribe_as_queue('other channel') do |queue_2|
        @tx_adapter.broadcast('channel', 'apples')
        @tx_adapter.broadcast('other channel', 'oranges')

        assert_equal 'apples', queue.pop
        assert_equal 'oranges', queue_2.pop
      end
    end
  end

  def test_channel_filtered_broadcast
    subscribe_as_queue('channel') do |queue|
      @tx_adapter.broadcast('other channel', 'one')
      @tx_adapter.broadcast('channel', 'two')

      assert_equal 'two', queue.pop
    end
  end
end