aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable/subscription_adapter
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable/lib/action_cable/subscription_adapter')
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/base.rb24
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/postgresql.rb101
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/redis.rb45
3 files changed, 170 insertions, 0 deletions
diff --git a/actioncable/lib/action_cable/subscription_adapter/base.rb b/actioncable/lib/action_cable/subscription_adapter/base.rb
new file mode 100644
index 0000000000..11910803e8
--- /dev/null
+++ b/actioncable/lib/action_cable/subscription_adapter/base.rb
@@ -0,0 +1,24 @@
+module ActionCable
+ module SubscriptionAdapter
+ class Base
+ attr_reader :logger, :server
+
+ def initialize(server)
+ @server = server
+ @logger = @server.logger
+ end
+
+ def broadcast(channel, payload)
+ raise NotImplementedError
+ end
+
+ def subscribe(channel, message_callback, success_callback = nil)
+ raise NotImplementedError
+ end
+
+ def unsubscribe(channel, message_callback)
+ raise NotImplementedError
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
new file mode 100644
index 0000000000..f55b56a2b5
--- /dev/null
+++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
@@ -0,0 +1,101 @@
+require 'thread'
+
+begin
+ require 'pg'
+rescue Gem::LoadError => e
+ raise Gem::LoadError, "You are trying to use the PostgreSQL ActionCable adapter, but do not have the proper gems installed. Add `gem 'pg'` to your Gemfile (and ensure its version is at the minimum required by ActionCable)."
+end
+
+module ActionCable
+ module SubscriptionAdapter
+ class PostgreSQL < Base
+ # The storage instance used for broadcasting. Not intended for direct user use.
+ def broadcast(channel, payload)
+ with_connection do |pg_conn|
+ pg_conn.exec("NOTIFY #{channel}, '#{payload}'")
+ end
+ end
+
+ def subscribe(channel, callback, success_callback = nil)
+ listener.subscribe_to(channel, callback, success_callback)
+ end
+
+ def unsubscribe(channel, callback)
+ listener.unsubscribe_to(channel, callback)
+ end
+
+ def with_connection(&block) # :nodoc:
+ ActiveRecord::Base.connection_pool.with_connection do |ar_conn|
+ pg_conn = ar_conn.raw_connection
+
+ unless pg_conn.is_a?(PG::Connection)
+ raise 'ActiveRecord database must be Postgres in order to use the Postgres ActionCable storage adapter'
+ end
+
+ yield pg_conn
+ end
+ end
+
+ private
+ def listener
+ @listener ||= Listener.new(self)
+ end
+
+ class Listener
+ def initialize(adapter)
+ @adapter = adapter
+ @subscribers = Hash.new { |h,k| h[k] = [] }
+ @sync = Mutex.new
+ @queue = Queue.new
+
+ Thread.new do
+ Thread.current.abort_on_exception = true
+ listen
+ end
+ end
+
+ def listen
+ @adapter.with_connection do |pg_conn|
+ loop do
+ until @queue.empty?
+ value = @queue.pop(true)
+ if value.first == :listen
+ pg_conn.exec("LISTEN #{value[1]}")
+ ::EM.next_tick(&value[2]) if value[2]
+ elsif value.first == :unlisten
+ pg_conn.exec("UNLISTEN #{value[1]}")
+ end
+
+ pg_conn.wait_for_notify(1) do |chan, pid, message|
+ @subscribers[chan].each do |callback|
+ ::EM.next_tick { callback.call(message) }
+ end
+ end
+ end
+ end
+ end
+ end
+
+ def subscribe_to(channel, callback, success_callback)
+ @sync.synchronize do
+ if @subscribers[channel].empty?
+ @queue.push([:listen, channel, success_callback])
+ end
+
+ @subscribers[channel] << callback
+ end
+ end
+
+ def unsubscribe_to(channel, callback)
+ @sync.synchronize do
+ @subscribers[channel].delete(callback)
+
+ if @subscribers[channel].empty?
+ @queue.push([:unlisten, channel])
+ end
+ end
+ end
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb
new file mode 100644
index 0000000000..c6d8371f16
--- /dev/null
+++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb
@@ -0,0 +1,45 @@
+begin
+ require 'em-hiredis'
+ require 'redis'
+rescue Gem::LoadError => e
+ raise Gem::LoadError, "You are trying to use the Redis ActionCable adapter, but do not have the proper gems installed. Add `gem 'em-hiredis'` and `gem 'redis'` to your Gemfile (and ensure its version is at the minimum required by ActionCable)."
+end
+
+module ActionCable
+ module SubscriptionAdapter
+ class Redis < Base
+ def broadcast(channel, payload)
+ redis_conn.publish(channel, payload)
+ end
+
+ def subscribe(channel, message_callback, success_callback = nil)
+ hi_redis_conn.pubsub.subscribe(channel, &message_callback).tap do |result|
+ result.callback(&success_callback) if success_callback
+ end
+ end
+
+ def unsubscribe(channel, message_callback)
+ hi_redis_conn.pubsub.unsubscribe_proc(channel, message_callback)
+ end
+
+ private
+
+ # The redis instance used for broadcasting. Not intended for direct user use.
+ def redis_conn
+ @broadcast ||= ::Redis.new(@server.config.config_opts)
+ end
+
+ # The EventMachine Redis instance used by the pubsub adapter.
+ def hi_redis_conn
+ @hi_redis_conn ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis|
+ redis.on(:reconnect_failed) do
+ @logger.info "[ActionCable] Redis reconnect failed."
+ end
+ end
+ end
+ def redis_conn
+ @redis_conn ||= ::Redis.new(@server.config.cable)
+ end
+ end
+ end
+end