aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable/storage_adapter
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable/lib/action_cable/storage_adapter')
-rw-r--r--actioncable/lib/action_cable/storage_adapter/base.rb10
-rw-r--r--actioncable/lib/action_cable/storage_adapter/postgres.rb54
-rw-r--r--actioncable/lib/action_cable/storage_adapter/redis.rb22
3 files changed, 41 insertions, 45 deletions
diff --git a/actioncable/lib/action_cable/storage_adapter/base.rb b/actioncable/lib/action_cable/storage_adapter/base.rb
index 26b3ded676..4330bc28f1 100644
--- a/actioncable/lib/action_cable/storage_adapter/base.rb
+++ b/actioncable/lib/action_cable/storage_adapter/base.rb
@@ -8,13 +8,15 @@ module ActionCable
@logger = @server.logger
end
- # Storage connection instance used for broadcasting. Not intended for direct user use.
- def broadcast
+ def broadcast(channel, payload)
raise NotImplementedError
end
- # Storage connection instance used for pubsub.
- def pubsub
+ def subscribe(channel, message_callback, success_callback = nil)
+ raise NotImplementedError
+ end
+
+ def unsubscribe(channel, message_callback)
raise NotImplementedError
end
end
diff --git a/actioncable/lib/action_cable/storage_adapter/postgres.rb b/actioncable/lib/action_cable/storage_adapter/postgres.rb
index 67bc2cd77a..07c2c7ce6a 100644
--- a/actioncable/lib/action_cable/storage_adapter/postgres.rb
+++ b/actioncable/lib/action_cable/storage_adapter/postgres.rb
@@ -4,12 +4,24 @@ module ActionCable
module StorageAdapter
class Postgres < Base
# The storage instance used for broadcasting. Not intended for direct user use.
- def broadcast
- @broadcast ||= PostgresWrapper.new
+ def broadcast(channel, payload)
+ ActiveRecord::Base.connection_pool.with_connection do |ar_conn|
+ pg_conn = ar_conn.raw_connection
+
+ unless pg_conn.is_a?(PG::Connection)
+ raise 'ActiveRecord database must be Postgres in order to use the Postgres ActionCable storage adapter'
+ end
+
+ pg_conn.exec("NOTIFY #{channel}, '#{payload}'")
+ end
+ end
+
+ def subscribe(channel, message_callback, success_callback = nil)
+ Listener.instance.subscribe_to(channel, message_callback, success_callback)
end
- def pubsub
- PostgresWrapper.new
+ def unsubscribe(channel, message_callback)
+ Listener.instance.unsubscribe_to(channel, message_callback)
end
class Listener
@@ -37,6 +49,7 @@ module ActionCable
value = @queue.pop(true)
if value.first == :listen
pg_conn.exec("LISTEN #{value[1]}")
+ ::EM.next_tick(&value[2]) if value[2]
elsif value.first == :unlisten
pg_conn.exec("UNLISTEN #{value[1]}")
end
@@ -51,10 +64,10 @@ module ActionCable
end
end
- def subscribe_to(channel, callback)
+ def subscribe_to(channel, callback, success_callback)
@sync.synchronize do
if @subscribers[channel].empty?
- @queue.push([:listen, channel])
+ @queue.push([:listen, channel, success_callback])
end
@subscribers[channel] << callback
@@ -75,35 +88,6 @@ module ActionCable
end
end
end
-
- class PostgresWrapper
- def publish(channel, message)
- ActiveRecord::Base.connection_pool.with_connection do |ar_conn|
- pg_conn = ar_conn.raw_connection
-
- unless pg_conn.is_a?(PG::Connection)
- raise 'ActiveRecord database must be Postgres in order to use the Postgres ActionCable storage adapter'
- end
-
- pg_conn.exec("NOTIFY #{channel}, '#{message}'")
- end
- end
-
- def subscribe(channel, &callback)
- Listener.instance.subscribe_to(channel, callback)
- # Needed for channel/streams.rb#L79
- ::EM::DefaultDeferrable.new
- end
-
- def unsubscribe(channel)
- Listener.instance.unsubscribe_to(channel)
- end
-
- def unsubscribe_proc(channel, block)
- Listener.instance.unsubscribe_to(channel, block)
- end
- end
-
end
end
end
diff --git a/actioncable/lib/action_cable/storage_adapter/redis.rb b/actioncable/lib/action_cable/storage_adapter/redis.rb
index 7b712b9b03..3f0f6c4172 100644
--- a/actioncable/lib/action_cable/storage_adapter/redis.rb
+++ b/actioncable/lib/action_cable/storage_adapter/redis.rb
@@ -4,19 +4,29 @@ require 'redis'
module ActionCable
module StorageAdapter
class Redis < Base
- # The redis instance used for broadcasting. Not intended for direct user use.
- def broadcast
- @broadcast ||= ::Redis.new(@server.config.config_opts)
+ def broadcast(channel, payload)
+ redis_conn.publish(channel, payload)
+ end
+
+ def subscribe(channel, message_callback, success_callback = nil)
+ hi_redis_conn.pubsub.subscribe(channel, &message_callback).tap do |result|
+ result.callback(&success_callback) if success_callback
+ end
end
- def pubsub
- redis.pubsub
+ def unsubscribe(channel, message_callback)
+ hi_redis_conn.pubsub.unsubscribe_proc(channel, message_callback)
end
private
+ # The redis instance used for broadcasting. Not intended for direct user use.
+ def redis_conn
+ @broadcast ||= ::Redis.new(@server.config.config_opts)
+ end
+
# The EventMachine Redis instance used by the pubsub adapter.
- def redis
+ def hi_redis_conn
@redis ||= EM::Hiredis.connect(@server.config.config_opts[:url]).tap do |redis|
redis.on(:reconnect_failed) do
@logger.info "[ActionCable] Redis reconnect failed."