aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable/subscription_adapter
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable/lib/action_cable/subscription_adapter')
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/async.rb22
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/base.rb4
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/inline.rb26
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/postgresql.rb71
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/redis.rb23
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb53
6 files changed, 164 insertions, 35 deletions
diff --git a/actioncable/lib/action_cable/subscription_adapter/async.rb b/actioncable/lib/action_cable/subscription_adapter/async.rb
new file mode 100644
index 0000000000..c88b03947a
--- /dev/null
+++ b/actioncable/lib/action_cable/subscription_adapter/async.rb
@@ -0,0 +1,22 @@
+require 'action_cable/subscription_adapter/inline'
+
+module ActionCable
+ module SubscriptionAdapter
+ class Async < Inline # :nodoc:
+ private
+ def subscriber_map
+ @subscriber_map ||= AsyncSubscriberMap.new
+ end
+
+ class AsyncSubscriberMap < SubscriberMap
+ def add_subscriber(*)
+ Concurrent.global_io_executor.post { super }
+ end
+
+ def invoke_callback(*)
+ Concurrent.global_io_executor.post { super }
+ end
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/subscription_adapter/base.rb b/actioncable/lib/action_cable/subscription_adapter/base.rb
index 11910803e8..796db5ffa3 100644
--- a/actioncable/lib/action_cable/subscription_adapter/base.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/base.rb
@@ -19,6 +19,10 @@ module ActionCable
def unsubscribe(channel, message_callback)
raise NotImplementedError
end
+
+ def shutdown
+ raise NotImplementedError
+ end
end
end
end
diff --git a/actioncable/lib/action_cable/subscription_adapter/inline.rb b/actioncable/lib/action_cable/subscription_adapter/inline.rb
new file mode 100644
index 0000000000..4a2a8d23a2
--- /dev/null
+++ b/actioncable/lib/action_cable/subscription_adapter/inline.rb
@@ -0,0 +1,26 @@
+module ActionCable
+ module SubscriptionAdapter
+ class Inline < Base # :nodoc:
+ def broadcast(channel, payload)
+ subscriber_map.broadcast(channel, payload)
+ end
+
+ def subscribe(channel, callback, success_callback = nil)
+ subscriber_map.add_subscriber(channel, callback, success_callback)
+ end
+
+ def unsubscribe(channel, callback)
+ subscriber_map.remove_subscriber(channel, callback)
+ end
+
+ def shutdown
+ # nothing to do
+ end
+
+ private
+ def subscriber_map
+ @subscriber_map ||= SubscriberMap.new
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
index 6465663c97..3ce1bbed68 100644
--- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
@@ -12,11 +12,15 @@ module ActionCable
end
def subscribe(channel, callback, success_callback = nil)
- listener.subscribe_to(channel, callback, success_callback)
+ listener.add_subscriber(channel, callback, success_callback)
end
def unsubscribe(channel, callback)
- listener.unsubscribe_from(channel, callback)
+ listener.remove_subscriber(channel, callback)
+ end
+
+ def shutdown
+ listener.shutdown
end
def with_connection(&block) # :nodoc:
@@ -36,14 +40,14 @@ module ActionCable
@listener ||= Listener.new(self)
end
- class Listener
+ class Listener < SubscriberMap
def initialize(adapter)
+ super()
+
@adapter = adapter
- @subscribers = Hash.new { |h,k| h[k] = [] }
- @sync = Mutex.new
@queue = Queue.new
- Thread.new do
+ @thread = Thread.new do
Thread.current.abort_on_exception = true
listen
end
@@ -51,46 +55,45 @@ module ActionCable
def listen
@adapter.with_connection do |pg_conn|
- loop do
- until @queue.empty?
- action, channel, callback = @queue.pop(true)
- escaped_channel = pg_conn.escape_identifier(channel)
-
- if action == :listen
- pg_conn.exec("LISTEN #{escaped_channel}")
- ::EM.next_tick(&callback) if callback
- elsif action == :unlisten
- pg_conn.exec("UNLISTEN #{escaped_channel}")
+ catch :shutdown do
+ loop do
+ until @queue.empty?
+ action, channel, callback = @queue.pop(true)
+
+ case action
+ when :listen
+ pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}")
+ Concurrent.global_io_executor << callback if callback
+ when :unlisten
+ pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}")
+ when :shutdown
+ throw :shutdown
+ end
end
- end
- pg_conn.wait_for_notify(1) do |chan, pid, message|
- @subscribers[chan].each do |callback|
- ::EM.next_tick { callback.call(message) }
+ pg_conn.wait_for_notify(1) do |chan, pid, message|
+ broadcast(chan, message)
end
end
end
end
end
- def subscribe_to(channel, callback, success_callback)
- @sync.synchronize do
- if @subscribers[channel].empty?
- @queue.push([:listen, channel, success_callback])
- end
+ def shutdown
+ @queue.push([:shutdown])
+ Thread.pass while @thread.alive?
+ end
- @subscribers[channel] << callback
- end
+ def add_channel(channel, on_success)
+ @queue.push([:listen, channel, on_success])
end
- def unsubscribe_from(channel, callback)
- @sync.synchronize do
- @subscribers[channel].delete(callback)
+ def remove_channel(channel)
+ @queue.push([:unlisten, channel])
+ end
- if @subscribers[channel].empty?
- @queue.push([:unlisten, channel])
- end
- end
+ def invoke_callback(*)
+ Concurrent.global_io_executor.post { super }
end
end
end
diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb
index e42ab2a03f..a035e3988d 100644
--- a/actioncable/lib/action_cable/subscription_adapter/redis.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb
@@ -1,18 +1,25 @@
+require 'thread'
+
gem 'em-hiredis', '~> 0.3.0'
gem 'redis', '~> 3.0'
require 'em-hiredis'
require 'redis'
+EventMachine.epoll if EventMachine.epoll?
+EventMachine.kqueue if EventMachine.kqueue?
+
module ActionCable
module SubscriptionAdapter
class Redis < Base # :nodoc:
+ @@mutex = Mutex.new
+
def broadcast(channel, payload)
redis_connection_for_broadcasts.publish(channel, payload)
end
def subscribe(channel, message_callback, success_callback = nil)
redis_connection_for_subscriptions.pubsub.subscribe(channel, &message_callback).tap do |result|
- result.callback(&success_callback) if success_callback
+ result.callback { |reply| success_callback.call } if success_callback
end
end
@@ -20,8 +27,14 @@ module ActionCable
redis_connection_for_subscriptions.pubsub.unsubscribe_proc(channel, message_callback)
end
+ def shutdown
+ redis_connection_for_subscriptions.pubsub.close_connection
+ @redis_connection_for_subscriptions = nil
+ end
+
private
def redis_connection_for_subscriptions
+ ensure_reactor_running
@redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis|
redis.on(:reconnect_failed) do
@logger.info "[ActionCable] Redis reconnect failed."
@@ -32,6 +45,14 @@ module ActionCable
def redis_connection_for_broadcasts
@redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable)
end
+
+ def ensure_reactor_running
+ return if EventMachine.reactor_running?
+ @@mutex.synchronize do
+ Thread.new { EventMachine.run } unless EventMachine.reactor_running?
+ Thread.pass until EventMachine.reactor_running?
+ end
+ end
end
end
end
diff --git a/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb b/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb
new file mode 100644
index 0000000000..37eed09793
--- /dev/null
+++ b/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb
@@ -0,0 +1,53 @@
+module ActionCable
+ module SubscriptionAdapter
+ class SubscriberMap
+ def initialize
+ @subscribers = Hash.new { |h,k| h[k] = [] }
+ @sync = Mutex.new
+ end
+
+ def add_subscriber(channel, subscriber, on_success)
+ @sync.synchronize do
+ new_channel = !@subscribers.key?(channel)
+
+ @subscribers[channel] << subscriber
+
+ if new_channel
+ add_channel channel, on_success
+ elsif on_success
+ on_success.call
+ end
+ end
+ end
+
+ def remove_subscriber(channel, subscriber)
+ @sync.synchronize do
+ @subscribers[channel].delete(subscriber)
+
+ if @subscribers[channel].empty?
+ @subscribers.delete channel
+ remove_channel channel
+ end
+ end
+ end
+
+ def broadcast(channel, message)
+ list = @sync.synchronize { @subscribers[channel].dup }
+ list.each do |subscriber|
+ invoke_callback(subscriber, message)
+ end
+ end
+
+ def add_channel(channel, on_success)
+ on_success.call if on_success
+ end
+
+ def remove_channel(channel)
+ end
+
+ def invoke_callback(callback, message)
+ callback.call message
+ end
+ end
+ end
+end