1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
|
require 'test_helper'
require 'concurrent'
require 'action_cable/process/logging'
require 'active_support/core_ext/hash/indifferent_access'
require 'pathname'
module CommonSubscriptionAdapterTest
WAIT_WHEN_EXPECTING_EVENT = 3
WAIT_WHEN_NOT_EXPECTING_EVENT = 0.2
def setup
# TODO: ActionCable requires a *lot* of setup at the moment...
::Object.const_set(:ApplicationCable, Module.new)
::ApplicationCable.const_set(:Connection, Class.new(ActionCable::Connection::Base))
::Object.const_set(:Rails, Module.new)
::Rails.singleton_class.send(:define_method, :root) { Pathname.new(__dir__) }
server = ActionCable::Server::Base.new
server.config = ActionCable::Server::Configuration.new
inner_logger = Logger.new(StringIO.new).tap { |l| l.level = Logger::UNKNOWN }
server.config.logger = ActionCable::Connection::TaggedLoggerProxy.new(inner_logger, tags: [])
# and now the "real" setup for our test:
spawn_eventmachine
server.config.cable = cable_config.with_indifferent_access
adapter_klass = server.config.pubsub_adapter
@rx_adapter = adapter_klass.new(server)
@tx_adapter = adapter_klass.new(server)
end
def teardown
@tx_adapter.shutdown if @tx_adapter && @tx_adapter != @rx_adapter
@rx_adapter.shutdown if @rx_adapter
begin
::Object.send(:remove_const, :ApplicationCable)
rescue NameError
end
begin
::Object.send(:remove_const, :Rails)
rescue NameError
end
end
def subscribe_as_queue(channel, adapter = @rx_adapter)
queue = Queue.new
callback = -> data { queue << data }
subscribed = Concurrent::Event.new
adapter.subscribe(channel, callback, Proc.new { subscribed.set })
subscribed.wait(WAIT_WHEN_EXPECTING_EVENT)
assert subscribed.set?
yield queue
sleep WAIT_WHEN_NOT_EXPECTING_EVENT
assert_empty queue
ensure
adapter.unsubscribe(channel, callback) if subscribed.set?
end
def test_subscribe_and_unsubscribe
subscribe_as_queue('channel') do |queue|
end
end
def test_basic_broadcast
subscribe_as_queue('channel') do |queue|
@tx_adapter.broadcast('channel', 'hello world')
assert_equal 'hello world', queue.pop
end
end
def test_broadcast_after_unsubscribe
keep_queue = nil
subscribe_as_queue('channel') do |queue|
keep_queue = queue
@tx_adapter.broadcast('channel', 'hello world')
assert_equal 'hello world', queue.pop
end
@tx_adapter.broadcast('channel', 'hello void')
sleep WAIT_WHEN_NOT_EXPECTING_EVENT
assert_empty keep_queue
end
def test_multiple_broadcast
subscribe_as_queue('channel') do |queue|
@tx_adapter.broadcast('channel', 'bananas')
@tx_adapter.broadcast('channel', 'apples')
received = []
2.times { received << queue.pop }
assert_equal ['apples', 'bananas'], received.sort
end
end
def test_identical_subscriptions
subscribe_as_queue('channel') do |queue|
subscribe_as_queue('channel') do |queue_2|
@tx_adapter.broadcast('channel', 'hello')
assert_equal 'hello', queue_2.pop
end
assert_equal 'hello', queue.pop
end
end
def test_simultaneous_subscriptions
subscribe_as_queue('channel') do |queue|
subscribe_as_queue('other channel') do |queue_2|
@tx_adapter.broadcast('channel', 'apples')
@tx_adapter.broadcast('other channel', 'oranges')
assert_equal 'apples', queue.pop
assert_equal 'oranges', queue_2.pop
end
end
end
def test_channel_filtered_broadcast
subscribe_as_queue('channel') do |queue|
@tx_adapter.broadcast('other channel', 'one')
@tx_adapter.broadcast('channel', 'two')
assert_equal 'two', queue.pop
end
end
end
|