aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable')
-rw-r--r--actioncable/lib/action_cable/channel/streams.rb2
-rw-r--r--actioncable/lib/action_cable/subscription_adapter.rb5
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/async.rb22
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/base.rb4
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/inline.rb26
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/postgresql.rb71
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/redis.rb7
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb53
-rw-r--r--actioncable/test/subscription_adapter/async_test.rb17
-rw-r--r--actioncable/test/subscription_adapter/common.rb142
-rw-r--r--actioncable/test/subscription_adapter/inline_test.rb17
-rw-r--r--actioncable/test/subscription_adapter/postgresql_test.rb32
-rw-r--r--actioncable/test/subscription_adapter/redis_test.rb10
-rw-r--r--actioncable/test/test_helper.rb4
-rw-r--r--actioncable/test/worker_test.rb5
15 files changed, 380 insertions, 37 deletions
diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb
index 589946c3db..e2876ef6fa 100644
--- a/actioncable/lib/action_cable/channel/streams.rb
+++ b/actioncable/lib/action_cable/channel/streams.rb
@@ -76,7 +76,7 @@ module ActionCable
streams << [ broadcasting, callback ]
EM.next_tick do
- pubsub.subscribe(broadcasting, callback, lambda do |reply|
+ pubsub.subscribe(broadcasting, callback, lambda do
transmit_subscription_confirmation
logger.info "#{self.class.name} is streaming from #{broadcasting}"
end)
diff --git a/actioncable/lib/action_cable/subscription_adapter.rb b/actioncable/lib/action_cable/subscription_adapter.rb
index e770f4fb00..72e62f3daf 100644
--- a/actioncable/lib/action_cable/subscription_adapter.rb
+++ b/actioncable/lib/action_cable/subscription_adapter.rb
@@ -1,5 +1,8 @@
module ActionCable
module SubscriptionAdapter
- autoload :Base, 'action_cable/subscription_adapter/base'
+ extend ActiveSupport::Autoload
+
+ autoload :Base
+ autoload :SubscriberMap
end
end
diff --git a/actioncable/lib/action_cable/subscription_adapter/async.rb b/actioncable/lib/action_cable/subscription_adapter/async.rb
new file mode 100644
index 0000000000..85d4892e4c
--- /dev/null
+++ b/actioncable/lib/action_cable/subscription_adapter/async.rb
@@ -0,0 +1,22 @@
+require 'action_cable/subscription_adapter/inline'
+
+module ActionCable
+ module SubscriptionAdapter
+ class Async < Inline # :nodoc:
+ private
+ def subscriber_map
+ @subscriber_map ||= AsyncSubscriberMap.new
+ end
+
+ class AsyncSubscriberMap < SubscriberMap
+ def add_subscriber(*)
+ ::EM.next_tick { super }
+ end
+
+ def invoke_callback(*)
+ ::EM.next_tick { super }
+ end
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/subscription_adapter/base.rb b/actioncable/lib/action_cable/subscription_adapter/base.rb
index 11910803e8..796db5ffa3 100644
--- a/actioncable/lib/action_cable/subscription_adapter/base.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/base.rb
@@ -19,6 +19,10 @@ module ActionCable
def unsubscribe(channel, message_callback)
raise NotImplementedError
end
+
+ def shutdown
+ raise NotImplementedError
+ end
end
end
end
diff --git a/actioncable/lib/action_cable/subscription_adapter/inline.rb b/actioncable/lib/action_cable/subscription_adapter/inline.rb
new file mode 100644
index 0000000000..4a2a8d23a2
--- /dev/null
+++ b/actioncable/lib/action_cable/subscription_adapter/inline.rb
@@ -0,0 +1,26 @@
+module ActionCable
+ module SubscriptionAdapter
+ class Inline < Base # :nodoc:
+ def broadcast(channel, payload)
+ subscriber_map.broadcast(channel, payload)
+ end
+
+ def subscribe(channel, callback, success_callback = nil)
+ subscriber_map.add_subscriber(channel, callback, success_callback)
+ end
+
+ def unsubscribe(channel, callback)
+ subscriber_map.remove_subscriber(channel, callback)
+ end
+
+ def shutdown
+ # nothing to do
+ end
+
+ private
+ def subscriber_map
+ @subscriber_map ||= SubscriberMap.new
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
index 6465663c97..78f8aeb599 100644
--- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
@@ -12,11 +12,15 @@ module ActionCable
end
def subscribe(channel, callback, success_callback = nil)
- listener.subscribe_to(channel, callback, success_callback)
+ listener.add_subscriber(channel, callback, success_callback)
end
def unsubscribe(channel, callback)
- listener.unsubscribe_from(channel, callback)
+ listener.remove_subscriber(channel, callback)
+ end
+
+ def shutdown
+ listener.shutdown
end
def with_connection(&block) # :nodoc:
@@ -36,14 +40,14 @@ module ActionCable
@listener ||= Listener.new(self)
end
- class Listener
+ class Listener < SubscriberMap
def initialize(adapter)
+ super()
+
@adapter = adapter
- @subscribers = Hash.new { |h,k| h[k] = [] }
- @sync = Mutex.new
@queue = Queue.new
- Thread.new do
+ @thread = Thread.new do
Thread.current.abort_on_exception = true
listen
end
@@ -51,46 +55,45 @@ module ActionCable
def listen
@adapter.with_connection do |pg_conn|
- loop do
- until @queue.empty?
- action, channel, callback = @queue.pop(true)
- escaped_channel = pg_conn.escape_identifier(channel)
-
- if action == :listen
- pg_conn.exec("LISTEN #{escaped_channel}")
- ::EM.next_tick(&callback) if callback
- elsif action == :unlisten
- pg_conn.exec("UNLISTEN #{escaped_channel}")
+ catch :shutdown do
+ loop do
+ until @queue.empty?
+ action, channel, callback = @queue.pop(true)
+
+ case action
+ when :listen
+ pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}")
+ ::EM.next_tick(&callback) if callback
+ when :unlisten
+ pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}")
+ when :shutdown
+ throw :shutdown
+ end
end
- end
- pg_conn.wait_for_notify(1) do |chan, pid, message|
- @subscribers[chan].each do |callback|
- ::EM.next_tick { callback.call(message) }
+ pg_conn.wait_for_notify(1) do |chan, pid, message|
+ broadcast(chan, message)
end
end
end
end
end
- def subscribe_to(channel, callback, success_callback)
- @sync.synchronize do
- if @subscribers[channel].empty?
- @queue.push([:listen, channel, success_callback])
- end
+ def shutdown
+ @queue.push([:shutdown])
+ Thread.pass while @thread.alive?
+ end
- @subscribers[channel] << callback
- end
+ def add_channel(channel, on_success)
+ @queue.push([:listen, channel, on_success])
end
- def unsubscribe_from(channel, callback)
- @sync.synchronize do
- @subscribers[channel].delete(callback)
+ def remove_channel(channel)
+ @queue.push([:unlisten, channel])
+ end
- if @subscribers[channel].empty?
- @queue.push([:unlisten, channel])
- end
- end
+ def invoke_callback(*)
+ ::EM.next_tick { super }
end
end
end
diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb
index e42ab2a03f..3b86354621 100644
--- a/actioncable/lib/action_cable/subscription_adapter/redis.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb
@@ -12,7 +12,7 @@ module ActionCable
def subscribe(channel, message_callback, success_callback = nil)
redis_connection_for_subscriptions.pubsub.subscribe(channel, &message_callback).tap do |result|
- result.callback(&success_callback) if success_callback
+ result.callback { |reply| success_callback.call } if success_callback
end
end
@@ -20,6 +20,11 @@ module ActionCable
redis_connection_for_subscriptions.pubsub.unsubscribe_proc(channel, message_callback)
end
+ def shutdown
+ redis_connection_for_subscriptions.pubsub.close_connection
+ @redis_connection_for_subscriptions = nil
+ end
+
private
def redis_connection_for_subscriptions
@redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis|
diff --git a/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb b/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb
new file mode 100644
index 0000000000..37eed09793
--- /dev/null
+++ b/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb
@@ -0,0 +1,53 @@
+module ActionCable
+ module SubscriptionAdapter
+ class SubscriberMap
+ def initialize
+ @subscribers = Hash.new { |h,k| h[k] = [] }
+ @sync = Mutex.new
+ end
+
+ def add_subscriber(channel, subscriber, on_success)
+ @sync.synchronize do
+ new_channel = !@subscribers.key?(channel)
+
+ @subscribers[channel] << subscriber
+
+ if new_channel
+ add_channel channel, on_success
+ elsif on_success
+ on_success.call
+ end
+ end
+ end
+
+ def remove_subscriber(channel, subscriber)
+ @sync.synchronize do
+ @subscribers[channel].delete(subscriber)
+
+ if @subscribers[channel].empty?
+ @subscribers.delete channel
+ remove_channel channel
+ end
+ end
+ end
+
+ def broadcast(channel, message)
+ list = @sync.synchronize { @subscribers[channel].dup }
+ list.each do |subscriber|
+ invoke_callback(subscriber, message)
+ end
+ end
+
+ def add_channel(channel, on_success)
+ on_success.call if on_success
+ end
+
+ def remove_channel(channel)
+ end
+
+ def invoke_callback(callback, message)
+ callback.call message
+ end
+ end
+ end
+end
diff --git a/actioncable/test/subscription_adapter/async_test.rb b/actioncable/test/subscription_adapter/async_test.rb
new file mode 100644
index 0000000000..8f413f14c2
--- /dev/null
+++ b/actioncable/test/subscription_adapter/async_test.rb
@@ -0,0 +1,17 @@
+require 'test_helper'
+require_relative './common'
+
+class AsyncAdapterTest < ActionCable::TestCase
+ include CommonSubscriptionAdapterTest
+
+ def setup
+ super
+
+ @tx_adapter.shutdown
+ @tx_adapter = @rx_adapter
+ end
+
+ def cable_config
+ { adapter: 'async' }
+ end
+end
diff --git a/actioncable/test/subscription_adapter/common.rb b/actioncable/test/subscription_adapter/common.rb
new file mode 100644
index 0000000000..d4a13be889
--- /dev/null
+++ b/actioncable/test/subscription_adapter/common.rb
@@ -0,0 +1,142 @@
+require 'test_helper'
+require 'concurrent'
+
+require 'action_cable/process/logging'
+require 'active_support/core_ext/hash/indifferent_access'
+require 'pathname'
+
+module CommonSubscriptionAdapterTest
+ WAIT_WHEN_EXPECTING_EVENT = 3
+ WAIT_WHEN_NOT_EXPECTING_EVENT = 0.2
+
+ def setup
+ # TODO: ActionCable requires a *lot* of setup at the moment...
+ ::Object.const_set(:ApplicationCable, Module.new)
+ ::ApplicationCable.const_set(:Connection, Class.new(ActionCable::Connection::Base))
+
+ ::Object.const_set(:Rails, Module.new)
+ ::Rails.singleton_class.send(:define_method, :root) { Pathname.new(__dir__) }
+
+ server = ActionCable::Server::Base.new
+ server.config = ActionCable::Server::Configuration.new
+ inner_logger = Logger.new(StringIO.new).tap { |l| l.level = Logger::UNKNOWN }
+ server.config.logger = ActionCable::Connection::TaggedLoggerProxy.new(inner_logger, tags: [])
+
+
+ # and now the "real" setup for our test:
+ spawn_eventmachine
+
+ server.config.cable = cable_config.with_indifferent_access
+
+ adapter_klass = server.config.pubsub_adapter
+
+ @rx_adapter = adapter_klass.new(server)
+ @tx_adapter = adapter_klass.new(server)
+ end
+
+ def teardown
+ @tx_adapter.shutdown if @tx_adapter && @tx_adapter != @rx_adapter
+ @rx_adapter.shutdown if @rx_adapter
+
+ begin
+ ::Object.send(:remove_const, :ApplicationCable)
+ rescue NameError
+ end
+ begin
+ ::Object.send(:remove_const, :Rails)
+ rescue NameError
+ end
+ end
+
+
+ def subscribe_as_queue(channel, adapter = @rx_adapter)
+ queue = Queue.new
+
+ callback = -> data { queue << data }
+ subscribed = Concurrent::Event.new
+ adapter.subscribe(channel, callback, Proc.new { subscribed.set })
+ subscribed.wait(WAIT_WHEN_EXPECTING_EVENT)
+ assert subscribed.set?
+
+ yield queue
+
+ sleep WAIT_WHEN_NOT_EXPECTING_EVENT
+ assert_empty queue
+ ensure
+ adapter.unsubscribe(channel, callback) if subscribed.set?
+ end
+
+
+ def test_subscribe_and_unsubscribe
+ subscribe_as_queue('channel') do |queue|
+ end
+ end
+
+ def test_basic_broadcast
+ subscribe_as_queue('channel') do |queue|
+ @tx_adapter.broadcast('channel', 'hello world')
+
+ assert_equal 'hello world', queue.pop
+ end
+ end
+
+ def test_broadcast_after_unsubscribe
+ keep_queue = nil
+ subscribe_as_queue('channel') do |queue|
+ keep_queue = queue
+
+ @tx_adapter.broadcast('channel', 'hello world')
+
+ assert_equal 'hello world', queue.pop
+ end
+
+ @tx_adapter.broadcast('channel', 'hello void')
+
+ sleep WAIT_WHEN_NOT_EXPECTING_EVENT
+ assert_empty keep_queue
+ end
+
+ def test_multiple_broadcast
+ subscribe_as_queue('channel') do |queue|
+ @tx_adapter.broadcast('channel', 'bananas')
+ @tx_adapter.broadcast('channel', 'apples')
+
+ received = []
+ 2.times { received << queue.pop }
+ assert_equal ['apples', 'bananas'], received.sort
+ end
+ end
+
+ def test_identical_subscriptions
+ subscribe_as_queue('channel') do |queue|
+ subscribe_as_queue('channel') do |queue_2|
+ @tx_adapter.broadcast('channel', 'hello')
+
+ assert_equal 'hello', queue_2.pop
+ end
+
+ assert_equal 'hello', queue.pop
+ end
+ end
+
+ def test_simultaneous_subscriptions
+ subscribe_as_queue('channel') do |queue|
+ subscribe_as_queue('other channel') do |queue_2|
+ @tx_adapter.broadcast('channel', 'apples')
+ @tx_adapter.broadcast('other channel', 'oranges')
+
+ assert_equal 'apples', queue.pop
+ assert_equal 'oranges', queue_2.pop
+ end
+ end
+ end
+
+ def test_channel_filtered_broadcast
+ subscribe_as_queue('channel') do |queue|
+ @tx_adapter.broadcast('other channel', 'one')
+ @tx_adapter.broadcast('channel', 'two')
+
+ assert_equal 'two', queue.pop
+ end
+ end
+end
diff --git a/actioncable/test/subscription_adapter/inline_test.rb b/actioncable/test/subscription_adapter/inline_test.rb
new file mode 100644
index 0000000000..75ea51e6b3
--- /dev/null
+++ b/actioncable/test/subscription_adapter/inline_test.rb
@@ -0,0 +1,17 @@
+require 'test_helper'
+require_relative './common'
+
+class InlineAdapterTest < ActionCable::TestCase
+ include CommonSubscriptionAdapterTest
+
+ def setup
+ super
+
+ @tx_adapter.shutdown
+ @tx_adapter = @rx_adapter
+ end
+
+ def cable_config
+ { adapter: 'inline' }
+ end
+end
diff --git a/actioncable/test/subscription_adapter/postgresql_test.rb b/actioncable/test/subscription_adapter/postgresql_test.rb
new file mode 100644
index 0000000000..64c632b0cd
--- /dev/null
+++ b/actioncable/test/subscription_adapter/postgresql_test.rb
@@ -0,0 +1,32 @@
+require 'test_helper'
+require_relative './common'
+
+require 'active_record'
+
+class PostgresqlAdapterTest < ActionCable::TestCase
+ include CommonSubscriptionAdapterTest
+
+ def setup
+ database_config = { 'adapter' => 'postgresql', 'database' => 'activerecord_unittest' }
+ ar_tests = File.expand_path('../../../activerecord/test', __dir__)
+ if Dir.exist?(ar_tests)
+ require File.join(ar_tests, 'config')
+ require File.join(ar_tests, 'support/config')
+ local_config = ARTest.config['arunit']
+ database_config.update local_config if local_config
+ end
+ ActiveRecord::Base.establish_connection database_config
+
+ super
+ end
+
+ def teardown
+ super
+
+ ActiveRecord::Base.clear_all_connections!
+ end
+
+ def cable_config
+ { adapter: 'postgresql' }
+ end
+end
diff --git a/actioncable/test/subscription_adapter/redis_test.rb b/actioncable/test/subscription_adapter/redis_test.rb
new file mode 100644
index 0000000000..8d52832c87
--- /dev/null
+++ b/actioncable/test/subscription_adapter/redis_test.rb
@@ -0,0 +1,10 @@
+require 'test_helper'
+require_relative './common'
+
+class RedisAdapterTest < ActionCable::TestCase
+ include CommonSubscriptionAdapterTest
+
+ def cable_config
+ { adapter: 'redis', url: 'redis://127.0.0.1:6379/12' }
+ end
+end
diff --git a/actioncable/test/test_helper.rb b/actioncable/test/test_helper.rb
index 65b45e0c89..6636ce078b 100644
--- a/actioncable/test/test_helper.rb
+++ b/actioncable/test/test_helper.rb
@@ -33,4 +33,8 @@ class ActionCable::TestCase < ActiveSupport::TestCase
EM.stop
end
end
+
+ def spawn_eventmachine
+ Thread.new { EventMachine.run } unless EventMachine.reactor_running?
+ end
end
diff --git a/actioncable/test/worker_test.rb b/actioncable/test/worker_test.rb
index 9911a3b98b..4a699cde27 100644
--- a/actioncable/test/worker_test.rb
+++ b/actioncable/test/worker_test.rb
@@ -13,6 +13,11 @@ class WorkerTest < ActiveSupport::TestCase
end
def connection
+ self
+ end
+
+ def logger
+ ActionCable.server.logger
end
end