diff options
8 files changed, 51 insertions, 56 deletions
diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index b5ffa17f72..89dcbdfa27 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -76,10 +76,10 @@ module ActionCable streams << [ broadcasting, callback ] EM.next_tick do - pubsub.subscribe(broadcasting, &callback).callback do |reply| + adapter.subscribe(broadcasting, callback, lambda do |reply| transmit_subscription_confirmation logger.info "#{self.class.name} is streaming from #{broadcasting}" - end + end) end end @@ -92,13 +92,13 @@ module ActionCable def stop_all_streams streams.each do |broadcasting, callback| - pubsub.unsubscribe_proc broadcasting, callback + adapter.unsubscribe broadcasting, callback logger.info "#{self.class.name} stopped streaming from #{broadcasting}" end.clear end private - delegate :pubsub, to: :connection + delegate :adapter, to: :connection def streams @_streams ||= [] diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb index bb8850aaa0..2d7f99b09a 100644 --- a/actioncable/lib/action_cable/connection/base.rb +++ b/actioncable/lib/action_cable/connection/base.rb @@ -49,7 +49,7 @@ module ActionCable include Authorization attr_reader :server, :env, :subscriptions, :logger - delegate :worker_pool, :pubsub, to: :server + delegate :worker_pool, :adapter, to: :server def initialize(server, env) @server, @env = server, env diff --git a/actioncable/lib/action_cable/connection/internal_channel.rb b/actioncable/lib/action_cable/connection/internal_channel.rb index 63ba293877..c618e9d087 100644 --- a/actioncable/lib/action_cable/connection/internal_channel.rb +++ b/actioncable/lib/action_cable/connection/internal_channel.rb @@ -15,14 +15,14 @@ module ActionCable @_internal_subscriptions ||= [] @_internal_subscriptions << [ internal_channel, callback ] - EM.next_tick { pubsub.subscribe(internal_channel, &callback) } + EM.next_tick { adapter.subscribe(internal_channel, callback) } logger.info "Registered connection (#{connection_identifier})" end end def unsubscribe_from_internal_channel if @_internal_subscriptions.present? - @_internal_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe_proc(channel, callback) } } + @_internal_subscriptions.each { |channel, callback| EM.next_tick { adapter.unsubscribe(channel, callback) } } end end diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb index 6539745c79..e0703101aa 100644 --- a/actioncable/lib/action_cable/server/base.rb +++ b/actioncable/lib/action_cable/server/base.rb @@ -46,8 +46,8 @@ module ActionCable end # The pubsub adapter used for all streams/broadcasting. - def pubsub - @pubsub ||= config.storage_adapter.new(self).pubsub + def adapter + @adapter ||= config.storage_adapter.new(self) end # All the identifiers applied to the connection class associated with this server. diff --git a/actioncable/lib/action_cable/server/broadcasting.rb b/actioncable/lib/action_cable/server/broadcasting.rb index 847ef50971..021589b82d 100644 --- a/actioncable/lib/action_cable/server/broadcasting.rb +++ b/actioncable/lib/action_cable/server/broadcasting.rb @@ -39,8 +39,7 @@ module ActionCable def broadcast(message) server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message}" - broadcast_storage_adapter = server.config.storage_adapter.new(server).broadcast - broadcast_storage_adapter.publish broadcasting, ActiveSupport::JSON.encode(message) + server.adapter.broadcast broadcasting, ActiveSupport::JSON.encode(message) end end end diff --git a/actioncable/lib/action_cable/storage_adapter/base.rb b/actioncable/lib/action_cable/storage_adapter/base.rb index 26b3ded676..4330bc28f1 100644 --- a/actioncable/lib/action_cable/storage_adapter/base.rb +++ b/actioncable/lib/action_cable/storage_adapter/base.rb @@ -8,13 +8,15 @@ module ActionCable @logger = @server.logger end - # Storage connection instance used for broadcasting. Not intended for direct user use. - def broadcast + def broadcast(channel, payload) raise NotImplementedError end - # Storage connection instance used for pubsub. - def pubsub + def subscribe(channel, message_callback, success_callback = nil) + raise NotImplementedError + end + + def unsubscribe(channel, message_callback) raise NotImplementedError end end diff --git a/actioncable/lib/action_cable/storage_adapter/postgres.rb b/actioncable/lib/action_cable/storage_adapter/postgres.rb index 67bc2cd77a..07c2c7ce6a 100644 --- a/actioncable/lib/action_cable/storage_adapter/postgres.rb +++ b/actioncable/lib/action_cable/storage_adapter/postgres.rb @@ -4,12 +4,24 @@ module ActionCable module StorageAdapter class Postgres < Base # The storage instance used for broadcasting. Not intended for direct user use. - def broadcast - @broadcast ||= PostgresWrapper.new + def broadcast(channel, payload) + ActiveRecord::Base.connection_pool.with_connection do |ar_conn| + pg_conn = ar_conn.raw_connection + + unless pg_conn.is_a?(PG::Connection) + raise 'ActiveRecord database must be Postgres in order to use the Postgres ActionCable storage adapter' + end + + pg_conn.exec("NOTIFY #{channel}, '#{payload}'") + end + end + + def subscribe(channel, message_callback, success_callback = nil) + Listener.instance.subscribe_to(channel, message_callback, success_callback) end - def pubsub - PostgresWrapper.new + def unsubscribe(channel, message_callback) + Listener.instance.unsubscribe_to(channel, message_callback) end class Listener @@ -37,6 +49,7 @@ module ActionCable value = @queue.pop(true) if value.first == :listen pg_conn.exec("LISTEN #{value[1]}") + ::EM.next_tick(&value[2]) if value[2] elsif value.first == :unlisten pg_conn.exec("UNLISTEN #{value[1]}") end @@ -51,10 +64,10 @@ module ActionCable end end - def subscribe_to(channel, callback) + def subscribe_to(channel, callback, success_callback) @sync.synchronize do if @subscribers[channel].empty? - @queue.push([:listen, channel]) + @queue.push([:listen, channel, success_callback]) end @subscribers[channel] << callback @@ -75,35 +88,6 @@ module ActionCable end end end - - class PostgresWrapper - def publish(channel, message) - ActiveRecord::Base.connection_pool.with_connection do |ar_conn| - pg_conn = ar_conn.raw_connection - - unless pg_conn.is_a?(PG::Connection) - raise 'ActiveRecord database must be Postgres in order to use the Postgres ActionCable storage adapter' - end - - pg_conn.exec("NOTIFY #{channel}, '#{message}'") - end - end - - def subscribe(channel, &callback) - Listener.instance.subscribe_to(channel, callback) - # Needed for channel/streams.rb#L79 - ::EM::DefaultDeferrable.new - end - - def unsubscribe(channel) - Listener.instance.unsubscribe_to(channel) - end - - def unsubscribe_proc(channel, block) - Listener.instance.unsubscribe_to(channel, block) - end - end - end end end diff --git a/actioncable/lib/action_cable/storage_adapter/redis.rb b/actioncable/lib/action_cable/storage_adapter/redis.rb index 7b712b9b03..3f0f6c4172 100644 --- a/actioncable/lib/action_cable/storage_adapter/redis.rb +++ b/actioncable/lib/action_cable/storage_adapter/redis.rb @@ -4,19 +4,29 @@ require 'redis' module ActionCable module StorageAdapter class Redis < Base - # The redis instance used for broadcasting. Not intended for direct user use. - def broadcast - @broadcast ||= ::Redis.new(@server.config.config_opts) + def broadcast(channel, payload) + redis_conn.publish(channel, payload) + end + + def subscribe(channel, message_callback, success_callback = nil) + hi_redis_conn.pubsub.subscribe(channel, &message_callback).tap do |result| + result.callback(&success_callback) if success_callback + end end - def pubsub - redis.pubsub + def unsubscribe(channel, message_callback) + hi_redis_conn.pubsub.unsubscribe_proc(channel, message_callback) end private + # The redis instance used for broadcasting. Not intended for direct user use. + def redis_conn + @broadcast ||= ::Redis.new(@server.config.config_opts) + end + # The EventMachine Redis instance used by the pubsub adapter. - def redis + def hi_redis_conn @redis ||= EM::Hiredis.connect(@server.config.config_opts[:url]).tap do |redis| redis.on(:reconnect_failed) do @logger.info "[ActionCable] Redis reconnect failed." |