require 'test_helper'
require 'concurrent'
require 'action_cable/process/logging'
require 'active_support/core_ext/hash/indifferent_access'
require 'pathname'
module CommonSubscriptionAdapterTest
WAIT_WHEN_EXPECTING_EVENT = 3
WAIT_WHEN_NOT_EXPECTING_EVENT = 0.2
def setup
# TODO: ActionCable requires a *lot* of setup at the moment...
::Object.const_set(:ApplicationCable, Module.new)
::ApplicationCable.const_set(:Connection, Class.new(ActionCable::Connection::Base))
::Object.const_set(:Rails, Module.new)
::Rails.singleton_class.send(:define_method, :root) { Pathname.new(__dir__) }
server = ActionCable::Server::Base.new
server.config = ActionCable::Server::Configuration.new
inner_logger = Logger.new(StringIO.new).tap { |l| l.level = Logger::UNKNOWN }
server.config.logger = ActionCable::Connection::TaggedLoggerProxy.new(inner_logger, tags: [])
# and now the "real" setup for our test:
spawn_eventmachine
server.config.cable = cable_config.with_indifferent_access
adapter_klass = server.config.pubsub_adapter
@rx_adapter = adapter_klass.new(server)
@tx_adapter = adapter_klass.new(server)
end
def teardown
@tx_adapter.shutdown if @tx_adapter && @tx_adapter != @rx_adapter
@rx_adapter.shutdown if @rx_adapter
begin
::Object.send(:remove_const, :ApplicationCable)
rescue NameError
end
begin
::Object.send(:remove_const, :Rails)
rescue NameError
end
end
def subscribe_as_queue(channel, adapter = @rx_adapter)
queue = Queue.new
callback = -> data { queue << data }
subscribed = Concurrent::Event.new
adapter.subscribe(channel, callback, Proc.new { subscribed.set })
subscribed.wait(WAIT_WHEN_EXPECTING_EVENT)
assert subscribed.set?
yield queue
sleep WAIT_WHEN_NOT_EXPECTING_EVENT
assert_empty queue
ensure
adapter.unsubscribe(channel, callback) if subscribed.set?
end
def test_subscribe_and_unsubscribe
subscribe_as_queue('channel') do |queue|
end
end
def test_basic_broadcast
subscribe_as_queue('channel') do |queue|
@tx_adapter.broadcast('channel', 'hello world')
assert_equal 'hello world', queue.pop
end
end
def test_broadcast_after_unsubscribe
keep_queue = nil
subscribe_as_queue('channel') do |queue|
keep_queue = queue
@tx_adapter.broadcast('channel', 'hello world')
assert_equal 'hello world', queue.pop
end
@tx_adapter.broadcast('channel', 'hello void')
sleep WAIT_WHEN_NOT_EXPECTING_EVENT
assert_empty keep_queue
end
def test_multiple_broadcast
subscribe_as_queue('channel') do |queue|
@tx_adapter.broadcast('channel', 'bananas')
@tx_adapter.broadcast('channel', 'apples')
received = []
2.times { received << queue.pop }
assert_equal ['apples', 'bananas'], received.sort
end
end
def test_identical_subscriptions
subscribe_as_queue('channel') do |queue|
subscribe_as_queue('channel') do |queue_2|
@tx_adapter.broadcast('channel', 'hello')
assert_equal 'hello', queue_2.pop
end
assert_equal 'hello', queue.pop
end
end
def test_simultaneous_subscriptions
subscribe_as_queue('channel') do |queue|
subscribe_as_queue('other channel') do |queue_2|
@tx_adapter.broadcast('channel', 'apples')
@tx_adapter.broadcast('other channel', 'oranges')
assert_equal 'apples', queue.pop
assert_equal 'oranges', queue_2.pop
end
end
end
def test_channel_filtered_broadcast
subscribe_as_queue('channel') do |queue|
@tx_adapter.broadcast('other channel', 'one')
@tx_adapter.broadcast('channel', 'two')
assert_equal 'two', queue.pop
end
end
end