From 4c5d5b75abe85d59e5cc9de9904fdef3b23ec25b Mon Sep 17 00:00:00 2001 From: Jon Moss Date: Fri, 15 Jan 2016 19:07:18 -0500 Subject: ActionCable::StorageAdapter ==> ActionCable::SubscriptionAdapter --- .../lib/action_cable/subscription_adapter/base.rb | 24 +++++ .../subscription_adapter/postgresql.rb | 101 +++++++++++++++++++++ .../lib/action_cable/subscription_adapter/redis.rb | 45 +++++++++ 3 files changed, 170 insertions(+) create mode 100644 actioncable/lib/action_cable/subscription_adapter/base.rb create mode 100644 actioncable/lib/action_cable/subscription_adapter/postgresql.rb create mode 100644 actioncable/lib/action_cable/subscription_adapter/redis.rb (limited to 'actioncable/lib/action_cable/subscription_adapter') diff --git a/actioncable/lib/action_cable/subscription_adapter/base.rb b/actioncable/lib/action_cable/subscription_adapter/base.rb new file mode 100644 index 0000000000..11910803e8 --- /dev/null +++ b/actioncable/lib/action_cable/subscription_adapter/base.rb @@ -0,0 +1,24 @@ +module ActionCable + module SubscriptionAdapter + class Base + attr_reader :logger, :server + + def initialize(server) + @server = server + @logger = @server.logger + end + + def broadcast(channel, payload) + raise NotImplementedError + end + + def subscribe(channel, message_callback, success_callback = nil) + raise NotImplementedError + end + + def unsubscribe(channel, message_callback) + raise NotImplementedError + end + end + end +end diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb new file mode 100644 index 0000000000..f55b56a2b5 --- /dev/null +++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb @@ -0,0 +1,101 @@ +require 'thread' + +begin + require 'pg' +rescue Gem::LoadError => e + raise Gem::LoadError, "You are trying to use the PostgreSQL ActionCable adapter, but do not have the proper gems installed. Add `gem 'pg'` to your Gemfile (and ensure its version is at the minimum required by ActionCable)." +end + +module ActionCable + module SubscriptionAdapter + class PostgreSQL < Base + # The storage instance used for broadcasting. Not intended for direct user use. + def broadcast(channel, payload) + with_connection do |pg_conn| + pg_conn.exec("NOTIFY #{channel}, '#{payload}'") + end + end + + def subscribe(channel, callback, success_callback = nil) + listener.subscribe_to(channel, callback, success_callback) + end + + def unsubscribe(channel, callback) + listener.unsubscribe_to(channel, callback) + end + + def with_connection(&block) # :nodoc: + ActiveRecord::Base.connection_pool.with_connection do |ar_conn| + pg_conn = ar_conn.raw_connection + + unless pg_conn.is_a?(PG::Connection) + raise 'ActiveRecord database must be Postgres in order to use the Postgres ActionCable storage adapter' + end + + yield pg_conn + end + end + + private + def listener + @listener ||= Listener.new(self) + end + + class Listener + def initialize(adapter) + @adapter = adapter + @subscribers = Hash.new { |h,k| h[k] = [] } + @sync = Mutex.new + @queue = Queue.new + + Thread.new do + Thread.current.abort_on_exception = true + listen + end + end + + def listen + @adapter.with_connection do |pg_conn| + loop do + until @queue.empty? + value = @queue.pop(true) + if value.first == :listen + pg_conn.exec("LISTEN #{value[1]}") + ::EM.next_tick(&value[2]) if value[2] + elsif value.first == :unlisten + pg_conn.exec("UNLISTEN #{value[1]}") + end + + pg_conn.wait_for_notify(1) do |chan, pid, message| + @subscribers[chan].each do |callback| + ::EM.next_tick { callback.call(message) } + end + end + end + end + end + end + + def subscribe_to(channel, callback, success_callback) + @sync.synchronize do + if @subscribers[channel].empty? + @queue.push([:listen, channel, success_callback]) + end + + @subscribers[channel] << callback + end + end + + def unsubscribe_to(channel, callback) + @sync.synchronize do + @subscribers[channel].delete(callback) + + if @subscribers[channel].empty? + @queue.push([:unlisten, channel]) + end + end + end + end + end + end +end diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb new file mode 100644 index 0000000000..c6d8371f16 --- /dev/null +++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb @@ -0,0 +1,45 @@ +begin + require 'em-hiredis' + require 'redis' +rescue Gem::LoadError => e + raise Gem::LoadError, "You are trying to use the Redis ActionCable adapter, but do not have the proper gems installed. Add `gem 'em-hiredis'` and `gem 'redis'` to your Gemfile (and ensure its version is at the minimum required by ActionCable)." +end + +module ActionCable + module SubscriptionAdapter + class Redis < Base + def broadcast(channel, payload) + redis_conn.publish(channel, payload) + end + + def subscribe(channel, message_callback, success_callback = nil) + hi_redis_conn.pubsub.subscribe(channel, &message_callback).tap do |result| + result.callback(&success_callback) if success_callback + end + end + + def unsubscribe(channel, message_callback) + hi_redis_conn.pubsub.unsubscribe_proc(channel, message_callback) + end + + private + + # The redis instance used for broadcasting. Not intended for direct user use. + def redis_conn + @broadcast ||= ::Redis.new(@server.config.config_opts) + end + + # The EventMachine Redis instance used by the pubsub adapter. + def hi_redis_conn + @hi_redis_conn ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis| + redis.on(:reconnect_failed) do + @logger.info "[ActionCable] Redis reconnect failed." + end + end + end + def redis_conn + @redis_conn ||= ::Redis.new(@server.config.cable) + end + end + end +end -- cgit v1.2.3 From 67af248d51e9f5f56911c80855d4b14642582ea8 Mon Sep 17 00:00:00 2001 From: Jon Moss Date: Fri, 15 Jan 2016 21:07:18 -0500 Subject: Small PostgreSQL adapter refactors / cleanup - Escape the channel name when subscribing in PG - Refactor popping the queue to make it easier to read --- .../subscription_adapter/postgresql.rb | 30 ++++++++++++---------- 1 file changed, 16 insertions(+), 14 deletions(-) (limited to 'actioncable/lib/action_cable/subscription_adapter') diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb index f55b56a2b5..afa99355e8 100644 --- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb +++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb @@ -8,11 +8,11 @@ end module ActionCable module SubscriptionAdapter - class PostgreSQL < Base + class PostgreSQL < Base # :nodoc: # The storage instance used for broadcasting. Not intended for direct user use. def broadcast(channel, payload) with_connection do |pg_conn| - pg_conn.exec("NOTIFY #{channel}, '#{payload}'") + pg_conn.exec("NOTIFY #{pg_conn.escape_identifier(channel)}, '#{pg_conn.escape_string(payload)}'") end end @@ -21,7 +21,7 @@ module ActionCable end def unsubscribe(channel, callback) - listener.unsubscribe_to(channel, callback) + listener.unsubscribe_from(channel, callback) end def with_connection(&block) # :nodoc: @@ -58,18 +58,20 @@ module ActionCable @adapter.with_connection do |pg_conn| loop do until @queue.empty? - value = @queue.pop(true) - if value.first == :listen - pg_conn.exec("LISTEN #{value[1]}") - ::EM.next_tick(&value[2]) if value[2] - elsif value.first == :unlisten - pg_conn.exec("UNLISTEN #{value[1]}") + action, channel, callback = @queue.pop(true) + escaped_channel = pg_conn.escape_identifier(channel) + + if action == :listen + pg_conn.exec("LISTEN #{escaped_channel}") + ::EM.next_tick(&callback) if callback + elsif action == :unlisten + pg_conn.exec("UNLISTEN #{escaped_channel}") end + end - pg_conn.wait_for_notify(1) do |chan, pid, message| - @subscribers[chan].each do |callback| - ::EM.next_tick { callback.call(message) } - end + pg_conn.wait_for_notify(1) do |chan, pid, message| + @subscribers[chan].each do |callback| + ::EM.next_tick { callback.call(message) } end end end @@ -86,7 +88,7 @@ module ActionCable end end - def unsubscribe_to(channel, callback) + def unsubscribe_from(channel, callback) @sync.synchronize do @subscribers[channel].delete(callback) -- cgit v1.2.3 From ae31da20cd250154c951b67d5625fc71ac27e2f1 Mon Sep 17 00:00:00 2001 From: Jon Moss Date: Sat, 16 Jan 2016 10:33:50 -0500 Subject: Fix code review comments - adapter -> pubsub (re)rename internally - Change variable names to match method names - Add EventMachine `~> 1.0` as a runtime dependency of ActionCable - Refactor dependency loading for adapters --- .../subscription_adapter/postgresql.rb | 8 ++--- .../lib/action_cable/subscription_adapter/redis.rb | 41 +++++++++------------- 2 files changed, 19 insertions(+), 30 deletions(-) (limited to 'actioncable/lib/action_cable/subscription_adapter') diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb index afa99355e8..64c519beed 100644 --- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb +++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb @@ -1,11 +1,7 @@ +gem 'pg', '~> 0.18' +require 'pg' require 'thread' -begin - require 'pg' -rescue Gem::LoadError => e - raise Gem::LoadError, "You are trying to use the PostgreSQL ActionCable adapter, but do not have the proper gems installed. Add `gem 'pg'` to your Gemfile (and ensure its version is at the minimum required by ActionCable)." -end - module ActionCable module SubscriptionAdapter class PostgreSQL < Base # :nodoc: diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb index c6d8371f16..9615430be4 100644 --- a/actioncable/lib/action_cable/subscription_adapter/redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb @@ -1,19 +1,18 @@ -begin - require 'em-hiredis' - require 'redis' -rescue Gem::LoadError => e - raise Gem::LoadError, "You are trying to use the Redis ActionCable adapter, but do not have the proper gems installed. Add `gem 'em-hiredis'` and `gem 'redis'` to your Gemfile (and ensure its version is at the minimum required by ActionCable)." -end +gem 'em-hiredis', '~> 0.3.0' +gem 'redis', '~> 3.0' +require 'em-hiredis' +require 'redis' module ActionCable module SubscriptionAdapter - class Redis < Base + class Redis < Base # :nodoc: + # The redis instance used for broadcasting. Not intended for direct user use. def broadcast(channel, payload) - redis_conn.publish(channel, payload) + broadcast_redis_connection.publish(channel, payload) end def subscribe(channel, message_callback, success_callback = nil) - hi_redis_conn.pubsub.subscribe(channel, &message_callback).tap do |result| + subscription_redis_connection.pubsub.subscribe(channel, &message_callback).tap do |result| result.callback(&success_callback) if success_callback end end @@ -23,23 +22,17 @@ module ActionCable end private - - # The redis instance used for broadcasting. Not intended for direct user use. - def redis_conn - @broadcast ||= ::Redis.new(@server.config.config_opts) - end - - # The EventMachine Redis instance used by the pubsub adapter. - def hi_redis_conn - @hi_redis_conn ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis| - redis.on(:reconnect_failed) do - @logger.info "[ActionCable] Redis reconnect failed." + def subscription_redis_connection + @subscription_redis_connection ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis| + redis.on(:reconnect_failed) do + @logger.info "[ActionCable] Redis reconnect failed." + end end end - end - def redis_conn - @redis_conn ||= ::Redis.new(@server.config.cable) - end + + def broadcast_redis_connection + @broadcast_redis_connection ||= ::Redis.new(@server.config.cable) + end end end end -- cgit v1.2.3