1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
|
# frozen_string_literal: true
require "test_helper"
require "concurrent"
require "active_support/core_ext/hash/indifferent_access"
require "pathname"
module CommonSubscriptionAdapterTest
WAIT_WHEN_EXPECTING_EVENT = 3
WAIT_WHEN_NOT_EXPECTING_EVENT = 0.2
def setup
server = ActionCable::Server::Base.new
server.config.cable = cable_config.with_indifferent_access
server.config.logger = Logger.new(StringIO.new).tap { |l| l.level = Logger::UNKNOWN }
adapter_klass = server.config.pubsub_adapter
@rx_adapter = adapter_klass.new(server)
@tx_adapter = adapter_klass.new(server)
end
def teardown
[@rx_adapter, @tx_adapter].uniq.compact.each(&:shutdown)
end
def subscribe_as_queue(channel, adapter = @rx_adapter)
queue = Queue.new
callback = -> data { queue << data }
subscribed = Concurrent::Event.new
adapter.subscribe(channel, callback, Proc.new { subscribed.set })
subscribed.wait(WAIT_WHEN_EXPECTING_EVENT)
assert_predicate subscribed, :set?
yield queue
sleep WAIT_WHEN_NOT_EXPECTING_EVENT
assert_empty queue
ensure
adapter.unsubscribe(channel, callback) if subscribed.set?
end
def test_subscribe_and_unsubscribe
subscribe_as_queue("channel") do |queue|
end
end
def test_basic_broadcast
subscribe_as_queue("channel") do |queue|
@tx_adapter.broadcast("channel", "hello world")
assert_equal "hello world", queue.pop
end
end
def test_broadcast_after_unsubscribe
keep_queue = nil
subscribe_as_queue("channel") do |queue|
keep_queue = queue
@tx_adapter.broadcast("channel", "hello world")
assert_equal "hello world", queue.pop
end
@tx_adapter.broadcast("channel", "hello void")
sleep WAIT_WHEN_NOT_EXPECTING_EVENT
assert_empty keep_queue
end
def test_multiple_broadcast
subscribe_as_queue("channel") do |queue|
@tx_adapter.broadcast("channel", "bananas")
@tx_adapter.broadcast("channel", "apples")
received = []
2.times { received << queue.pop }
assert_equal ["apples", "bananas"], received.sort
end
end
def test_identical_subscriptions
subscribe_as_queue("channel") do |queue|
subscribe_as_queue("channel") do |queue_2|
@tx_adapter.broadcast("channel", "hello")
assert_equal "hello", queue_2.pop
end
assert_equal "hello", queue.pop
end
end
def test_simultaneous_subscriptions
subscribe_as_queue("channel") do |queue|
subscribe_as_queue("other channel") do |queue_2|
@tx_adapter.broadcast("channel", "apples")
@tx_adapter.broadcast("other channel", "oranges")
assert_equal "apples", queue.pop
assert_equal "oranges", queue_2.pop
end
end
end
def test_channel_filtered_broadcast
subscribe_as_queue("channel") do |queue|
@tx_adapter.broadcast("other channel", "one")
@tx_adapter.broadcast("channel", "two")
assert_equal "two", queue.pop
end
end
def test_long_identifiers
channel_1 = "a" * 100 + "1"
channel_2 = "a" * 100 + "2"
subscribe_as_queue(channel_1) do |queue|
subscribe_as_queue(channel_2) do |queue_2|
@tx_adapter.broadcast(channel_1, "apples")
@tx_adapter.broadcast(channel_2, "oranges")
assert_equal "apples", queue.pop
assert_equal "oranges", queue_2.pop
end
end
end
end
|