path: root/actioncable/lib/action_cable/subscription_adapter
diff options
Diffstat (limited to 'actioncable/lib/action_cable/subscription_adapter')
6 files changed, 288 insertions, 0 deletions
diff --git a/actioncable/lib/action_cable/subscription_adapter/async.rb b/actioncable/lib/action_cable/subscription_adapter/async.rb
new file mode 100644
index 0000000000..c88b03947a
--- /dev/null
+++ b/actioncable/lib/action_cable/subscription_adapter/async.rb
@@ -0,0 +1,22 @@
+require 'action_cable/subscription_adapter/inline'
+module ActionCable
+ module SubscriptionAdapter
+ class Async < Inline # :nodoc:
+ private
+ def subscriber_map
+ @subscriber_map ||= AsyncSubscriberMap.new
+ end
+ class AsyncSubscriberMap < SubscriberMap
+ def add_subscriber(*)
+ Concurrent.global_io_executor.post { super }
+ end
+ def invoke_callback(*)
+ Concurrent.global_io_executor.post { super }
+ end
+ end
+ end
+ end
diff --git a/actioncable/lib/action_cable/subscription_adapter/base.rb b/actioncable/lib/action_cable/subscription_adapter/base.rb
new file mode 100644
index 0000000000..796db5ffa3
--- /dev/null
+++ b/actioncable/lib/action_cable/subscription_adapter/base.rb
@@ -0,0 +1,28 @@
+module ActionCable
+ module SubscriptionAdapter
+ class Base
+ attr_reader :logger, :server
+ def initialize(server)
+ @server = server
+ @logger = @server.logger
+ end
+ def broadcast(channel, payload)
+ raise NotImplementedError
+ end
+ def subscribe(channel, message_callback, success_callback = nil)
+ raise NotImplementedError
+ end
+ def unsubscribe(channel, message_callback)
+ raise NotImplementedError
+ end
+ def shutdown
+ raise NotImplementedError
+ end
+ end
+ end
diff --git a/actioncable/lib/action_cable/subscription_adapter/inline.rb b/actioncable/lib/action_cable/subscription_adapter/inline.rb
new file mode 100644
index 0000000000..4a2a8d23a2
--- /dev/null
+++ b/actioncable/lib/action_cable/subscription_adapter/inline.rb
@@ -0,0 +1,26 @@
+module ActionCable
+ module SubscriptionAdapter
+ class Inline < Base # :nodoc:
+ def broadcast(channel, payload)
+ subscriber_map.broadcast(channel, payload)
+ end
+ def subscribe(channel, callback, success_callback = nil)
+ subscriber_map.add_subscriber(channel, callback, success_callback)
+ end
+ def unsubscribe(channel, callback)
+ subscriber_map.remove_subscriber(channel, callback)
+ end
+ def shutdown
+ # nothing to do
+ end
+ private
+ def subscriber_map
+ @subscriber_map ||= SubscriberMap.new
+ end
+ end
+ end
diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
new file mode 100644
index 0000000000..3ce1bbed68
--- /dev/null
+++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
@@ -0,0 +1,101 @@
+gem 'pg', '~> 0.18'
+require 'pg'
+require 'thread'
+module ActionCable
+ module SubscriptionAdapter
+ class PostgreSQL < Base # :nodoc:
+ def broadcast(channel, payload)
+ with_connection do |pg_conn|
+ pg_conn.exec("NOTIFY #{pg_conn.escape_identifier(channel)}, '#{pg_conn.escape_string(payload)}'")
+ end
+ end
+ def subscribe(channel, callback, success_callback = nil)
+ listener.add_subscriber(channel, callback, success_callback)
+ end
+ def unsubscribe(channel, callback)
+ listener.remove_subscriber(channel, callback)
+ end
+ def shutdown
+ listener.shutdown
+ end
+ def with_connection(&block) # :nodoc:
+ ActiveRecord::Base.connection_pool.with_connection do |ar_conn|
+ pg_conn = ar_conn.raw_connection
+ unless pg_conn.is_a?(PG::Connection)
+ raise 'ActiveRecord database must be Postgres in order to use the Postgres ActionCable storage adapter'
+ end
+ yield pg_conn
+ end
+ end
+ private
+ def listener
+ @listener ||= Listener.new(self)
+ end
+ class Listener < SubscriberMap
+ def initialize(adapter)
+ super()
+ @adapter = adapter
+ @queue = Queue.new
+ @thread = Thread.new do
+ Thread.current.abort_on_exception = true
+ listen
+ end
+ end
+ def listen
+ @adapter.with_connection do |pg_conn|
+ catch :shutdown do
+ loop do
+ until @queue.empty?
+ action, channel, callback = @queue.pop(true)
+ case action
+ when :listen
+ pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}")
+ Concurrent.global_io_executor << callback if callback
+ when :unlisten
+ pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}")
+ when :shutdown
+ throw :shutdown
+ end
+ end
+ pg_conn.wait_for_notify(1) do |chan, pid, message|
+ broadcast(chan, message)
+ end
+ end
+ end
+ end
+ end
+ def shutdown
+ @queue.push([:shutdown])
+ Thread.pass while @thread.alive?
+ end
+ def add_channel(channel, on_success)
+ @queue.push([:listen, channel, on_success])
+ end
+ def remove_channel(channel)
+ @queue.push([:unlisten, channel])
+ end
+ def invoke_callback(*)
+ Concurrent.global_io_executor.post { super }
+ end
+ end
+ end
+ end
diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb
new file mode 100644
index 0000000000..a035e3988d
--- /dev/null
+++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb
@@ -0,0 +1,58 @@
+require 'thread'
+gem 'em-hiredis', '~> 0.3.0'
+gem 'redis', '~> 3.0'
+require 'em-hiredis'
+require 'redis'
+EventMachine.epoll if EventMachine.epoll?
+EventMachine.kqueue if EventMachine.kqueue?
+module ActionCable
+ module SubscriptionAdapter
+ class Redis < Base # :nodoc:
+ @@mutex = Mutex.new
+ def broadcast(channel, payload)
+ redis_connection_for_broadcasts.publish(channel, payload)
+ end
+ def subscribe(channel, message_callback, success_callback = nil)
+ redis_connection_for_subscriptions.pubsub.subscribe(channel, &message_callback).tap do |result|
+ result.callback { |reply| success_callback.call } if success_callback
+ end
+ end
+ def unsubscribe(channel, message_callback)
+ redis_connection_for_subscriptions.pubsub.unsubscribe_proc(channel, message_callback)
+ end
+ def shutdown
+ redis_connection_for_subscriptions.pubsub.close_connection
+ @redis_connection_for_subscriptions = nil
+ end
+ private
+ def redis_connection_for_subscriptions
+ ensure_reactor_running
+ @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis|
+ redis.on(:reconnect_failed) do
+ @logger.info "[ActionCable] Redis reconnect failed."
+ end
+ end
+ end
+ def redis_connection_for_broadcasts
+ @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable)
+ end
+ def ensure_reactor_running
+ return if EventMachine.reactor_running?
+ @@mutex.synchronize do
+ Thread.new { EventMachine.run } unless EventMachine.reactor_running?
+ Thread.pass until EventMachine.reactor_running?
+ end
+ end
+ end
+ end
diff --git a/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb b/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb
new file mode 100644
index 0000000000..37eed09793
--- /dev/null
+++ b/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb
@@ -0,0 +1,53 @@
+module ActionCable
+ module SubscriptionAdapter
+ class SubscriberMap
+ def initialize
+ @subscribers = Hash.new { |h,k| h[k] = [] }
+ @sync = Mutex.new
+ end
+ def add_subscriber(channel, subscriber, on_success)
+ @sync.synchronize do
+ new_channel = !@subscribers.key?(channel)
+ @subscribers[channel] << subscriber
+ if new_channel
+ add_channel channel, on_success
+ elsif on_success
+ on_success.call
+ end
+ end
+ end
+ def remove_subscriber(channel, subscriber)
+ @sync.synchronize do
+ @subscribers[channel].delete(subscriber)
+ if @subscribers[channel].empty?
+ @subscribers.delete channel
+ remove_channel channel
+ end
+ end
+ end
+ def broadcast(channel, message)
+ list = @sync.synchronize { @subscribers[channel].dup }
+ list.each do |subscriber|
+ invoke_callback(subscriber, message)
+ end
+ end
+ def add_channel(channel, on_success)
+ on_success.call if on_success
+ end
+ def remove_channel(channel)
+ end
+ def invoke_callback(callback, message)
+ callback.call message
+ end
+ end
+ end