diff options
Diffstat (limited to 'actioncable/lib/action_cable/storage_adapter')
-rw-r--r-- | actioncable/lib/action_cable/storage_adapter/base.rb | 10 | ||||
-rw-r--r-- | actioncable/lib/action_cable/storage_adapter/postgres.rb | 54 | ||||
-rw-r--r-- | actioncable/lib/action_cable/storage_adapter/redis.rb | 22 |
3 files changed, 41 insertions, 45 deletions
diff --git a/actioncable/lib/action_cable/storage_adapter/base.rb b/actioncable/lib/action_cable/storage_adapter/base.rb index 26b3ded676..4330bc28f1 100644 --- a/actioncable/lib/action_cable/storage_adapter/base.rb +++ b/actioncable/lib/action_cable/storage_adapter/base.rb @@ -8,13 +8,15 @@ module ActionCable @logger = @server.logger end - # Storage connection instance used for broadcasting. Not intended for direct user use. - def broadcast + def broadcast(channel, payload) raise NotImplementedError end - # Storage connection instance used for pubsub. - def pubsub + def subscribe(channel, message_callback, success_callback = nil) + raise NotImplementedError + end + + def unsubscribe(channel, message_callback) raise NotImplementedError end end diff --git a/actioncable/lib/action_cable/storage_adapter/postgres.rb b/actioncable/lib/action_cable/storage_adapter/postgres.rb index 67bc2cd77a..07c2c7ce6a 100644 --- a/actioncable/lib/action_cable/storage_adapter/postgres.rb +++ b/actioncable/lib/action_cable/storage_adapter/postgres.rb @@ -4,12 +4,24 @@ module ActionCable module StorageAdapter class Postgres < Base # The storage instance used for broadcasting. Not intended for direct user use. - def broadcast - @broadcast ||= PostgresWrapper.new + def broadcast(channel, payload) + ActiveRecord::Base.connection_pool.with_connection do |ar_conn| + pg_conn = ar_conn.raw_connection + + unless pg_conn.is_a?(PG::Connection) + raise 'ActiveRecord database must be Postgres in order to use the Postgres ActionCable storage adapter' + end + + pg_conn.exec("NOTIFY #{channel}, '#{payload}'") + end + end + + def subscribe(channel, message_callback, success_callback = nil) + Listener.instance.subscribe_to(channel, message_callback, success_callback) end - def pubsub - PostgresWrapper.new + def unsubscribe(channel, message_callback) + Listener.instance.unsubscribe_to(channel, message_callback) end class Listener @@ -37,6 +49,7 @@ module ActionCable value = @queue.pop(true) if value.first == :listen pg_conn.exec("LISTEN #{value[1]}") + ::EM.next_tick(&value[2]) if value[2] elsif value.first == :unlisten pg_conn.exec("UNLISTEN #{value[1]}") end @@ -51,10 +64,10 @@ module ActionCable end end - def subscribe_to(channel, callback) + def subscribe_to(channel, callback, success_callback) @sync.synchronize do if @subscribers[channel].empty? - @queue.push([:listen, channel]) + @queue.push([:listen, channel, success_callback]) end @subscribers[channel] << callback @@ -75,35 +88,6 @@ module ActionCable end end end - - class PostgresWrapper - def publish(channel, message) - ActiveRecord::Base.connection_pool.with_connection do |ar_conn| - pg_conn = ar_conn.raw_connection - - unless pg_conn.is_a?(PG::Connection) - raise 'ActiveRecord database must be Postgres in order to use the Postgres ActionCable storage adapter' - end - - pg_conn.exec("NOTIFY #{channel}, '#{message}'") - end - end - - def subscribe(channel, &callback) - Listener.instance.subscribe_to(channel, callback) - # Needed for channel/streams.rb#L79 - ::EM::DefaultDeferrable.new - end - - def unsubscribe(channel) - Listener.instance.unsubscribe_to(channel) - end - - def unsubscribe_proc(channel, block) - Listener.instance.unsubscribe_to(channel, block) - end - end - end end end diff --git a/actioncable/lib/action_cable/storage_adapter/redis.rb b/actioncable/lib/action_cable/storage_adapter/redis.rb index 7b712b9b03..3f0f6c4172 100644 --- a/actioncable/lib/action_cable/storage_adapter/redis.rb +++ b/actioncable/lib/action_cable/storage_adapter/redis.rb @@ -4,19 +4,29 @@ require 'redis' module ActionCable module StorageAdapter class Redis < Base - # The redis instance used for broadcasting. Not intended for direct user use. - def broadcast - @broadcast ||= ::Redis.new(@server.config.config_opts) + def broadcast(channel, payload) + redis_conn.publish(channel, payload) + end + + def subscribe(channel, message_callback, success_callback = nil) + hi_redis_conn.pubsub.subscribe(channel, &message_callback).tap do |result| + result.callback(&success_callback) if success_callback + end end - def pubsub - redis.pubsub + def unsubscribe(channel, message_callback) + hi_redis_conn.pubsub.unsubscribe_proc(channel, message_callback) end private + # The redis instance used for broadcasting. Not intended for direct user use. + def redis_conn + @broadcast ||= ::Redis.new(@server.config.config_opts) + end + # The EventMachine Redis instance used by the pubsub adapter. - def redis + def hi_redis_conn @redis ||= EM::Hiredis.connect(@server.config.config_opts[:url]).tap do |redis| redis.on(:reconnect_failed) do @logger.info "[ActionCable] Redis reconnect failed." |