aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable
diff options
context:
space:
mode:
authorMatthew Draper <matthew@trebex.net>2016-01-14 15:55:41 +1030
committerJon Moss <me@jonathanmoss.me>2016-01-18 18:59:00 -0500
commit2815db356977b506f63d155aecf71ee010a64c62 (patch)
tree6ac39a82985340ff7cea0b8a976e88799d895e24 /actioncable
parent7b79ae0335b67377636cf2ba7be70a4119ca90cd (diff)
downloadrails-2815db356977b506f63d155aecf71ee010a64c62.tar.gz
rails-2815db356977b506f63d155aecf71ee010a64c62.tar.bz2
rails-2815db356977b506f63d155aecf71ee010a64c62.zip
Pull the action methods directly onto the adapter
Diffstat (limited to 'actioncable')
-rw-r--r--actioncable/lib/action_cable/channel/streams.rb8
-rw-r--r--actioncable/lib/action_cable/connection/base.rb2
-rw-r--r--actioncable/lib/action_cable/connection/internal_channel.rb4
-rw-r--r--actioncable/lib/action_cable/server/base.rb4
-rw-r--r--actioncable/lib/action_cable/server/broadcasting.rb3
-rw-r--r--actioncable/lib/action_cable/storage_adapter/base.rb10
-rw-r--r--actioncable/lib/action_cable/storage_adapter/postgres.rb54
-rw-r--r--actioncable/lib/action_cable/storage_adapter/redis.rb22
8 files changed, 51 insertions, 56 deletions
diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb
index b5ffa17f72..89dcbdfa27 100644
--- a/actioncable/lib/action_cable/channel/streams.rb
+++ b/actioncable/lib/action_cable/channel/streams.rb
@@ -76,10 +76,10 @@ module ActionCable
streams << [ broadcasting, callback ]
EM.next_tick do
- pubsub.subscribe(broadcasting, &callback).callback do |reply|
+ adapter.subscribe(broadcasting, callback, lambda do |reply|
transmit_subscription_confirmation
logger.info "#{self.class.name} is streaming from #{broadcasting}"
- end
+ end)
end
end
@@ -92,13 +92,13 @@ module ActionCable
def stop_all_streams
streams.each do |broadcasting, callback|
- pubsub.unsubscribe_proc broadcasting, callback
+ adapter.unsubscribe broadcasting, callback
logger.info "#{self.class.name} stopped streaming from #{broadcasting}"
end.clear
end
private
- delegate :pubsub, to: :connection
+ delegate :adapter, to: :connection
def streams
@_streams ||= []
diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb
index bb8850aaa0..2d7f99b09a 100644
--- a/actioncable/lib/action_cable/connection/base.rb
+++ b/actioncable/lib/action_cable/connection/base.rb
@@ -49,7 +49,7 @@ module ActionCable
include Authorization
attr_reader :server, :env, :subscriptions, :logger
- delegate :worker_pool, :pubsub, to: :server
+ delegate :worker_pool, :adapter, to: :server
def initialize(server, env)
@server, @env = server, env
diff --git a/actioncable/lib/action_cable/connection/internal_channel.rb b/actioncable/lib/action_cable/connection/internal_channel.rb
index 63ba293877..c618e9d087 100644
--- a/actioncable/lib/action_cable/connection/internal_channel.rb
+++ b/actioncable/lib/action_cable/connection/internal_channel.rb
@@ -15,14 +15,14 @@ module ActionCable
@_internal_subscriptions ||= []
@_internal_subscriptions << [ internal_channel, callback ]
- EM.next_tick { pubsub.subscribe(internal_channel, &callback) }
+ EM.next_tick { adapter.subscribe(internal_channel, callback) }
logger.info "Registered connection (#{connection_identifier})"
end
end
def unsubscribe_from_internal_channel
if @_internal_subscriptions.present?
- @_internal_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe_proc(channel, callback) } }
+ @_internal_subscriptions.each { |channel, callback| EM.next_tick { adapter.unsubscribe(channel, callback) } }
end
end
diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb
index 6539745c79..e0703101aa 100644
--- a/actioncable/lib/action_cable/server/base.rb
+++ b/actioncable/lib/action_cable/server/base.rb
@@ -46,8 +46,8 @@ module ActionCable
end
# The pubsub adapter used for all streams/broadcasting.
- def pubsub
- @pubsub ||= config.storage_adapter.new(self).pubsub
+ def adapter
+ @adapter ||= config.storage_adapter.new(self)
end
# All the identifiers applied to the connection class associated with this server.
diff --git a/actioncable/lib/action_cable/server/broadcasting.rb b/actioncable/lib/action_cable/server/broadcasting.rb
index 847ef50971..021589b82d 100644
--- a/actioncable/lib/action_cable/server/broadcasting.rb
+++ b/actioncable/lib/action_cable/server/broadcasting.rb
@@ -39,8 +39,7 @@ module ActionCable
def broadcast(message)
server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message}"
- broadcast_storage_adapter = server.config.storage_adapter.new(server).broadcast
- broadcast_storage_adapter.publish broadcasting, ActiveSupport::JSON.encode(message)
+ server.adapter.broadcast broadcasting, ActiveSupport::JSON.encode(message)
end
end
end
diff --git a/actioncable/lib/action_cable/storage_adapter/base.rb b/actioncable/lib/action_cable/storage_adapter/base.rb
index 26b3ded676..4330bc28f1 100644
--- a/actioncable/lib/action_cable/storage_adapter/base.rb
+++ b/actioncable/lib/action_cable/storage_adapter/base.rb
@@ -8,13 +8,15 @@ module ActionCable
@logger = @server.logger
end
- # Storage connection instance used for broadcasting. Not intended for direct user use.
- def broadcast
+ def broadcast(channel, payload)
raise NotImplementedError
end
- # Storage connection instance used for pubsub.
- def pubsub
+ def subscribe(channel, message_callback, success_callback = nil)
+ raise NotImplementedError
+ end
+
+ def unsubscribe(channel, message_callback)
raise NotImplementedError
end
end
diff --git a/actioncable/lib/action_cable/storage_adapter/postgres.rb b/actioncable/lib/action_cable/storage_adapter/postgres.rb
index 67bc2cd77a..07c2c7ce6a 100644
--- a/actioncable/lib/action_cable/storage_adapter/postgres.rb
+++ b/actioncable/lib/action_cable/storage_adapter/postgres.rb
@@ -4,12 +4,24 @@ module ActionCable
module StorageAdapter
class Postgres < Base
# The storage instance used for broadcasting. Not intended for direct user use.
- def broadcast
- @broadcast ||= PostgresWrapper.new
+ def broadcast(channel, payload)
+ ActiveRecord::Base.connection_pool.with_connection do |ar_conn|
+ pg_conn = ar_conn.raw_connection
+
+ unless pg_conn.is_a?(PG::Connection)
+ raise 'ActiveRecord database must be Postgres in order to use the Postgres ActionCable storage adapter'
+ end
+
+ pg_conn.exec("NOTIFY #{channel}, '#{payload}'")
+ end
+ end
+
+ def subscribe(channel, message_callback, success_callback = nil)
+ Listener.instance.subscribe_to(channel, message_callback, success_callback)
end
- def pubsub
- PostgresWrapper.new
+ def unsubscribe(channel, message_callback)
+ Listener.instance.unsubscribe_to(channel, message_callback)
end
class Listener
@@ -37,6 +49,7 @@ module ActionCable
value = @queue.pop(true)
if value.first == :listen
pg_conn.exec("LISTEN #{value[1]}")
+ ::EM.next_tick(&value[2]) if value[2]
elsif value.first == :unlisten
pg_conn.exec("UNLISTEN #{value[1]}")
end
@@ -51,10 +64,10 @@ module ActionCable
end
end
- def subscribe_to(channel, callback)
+ def subscribe_to(channel, callback, success_callback)
@sync.synchronize do
if @subscribers[channel].empty?
- @queue.push([:listen, channel])
+ @queue.push([:listen, channel, success_callback])
end
@subscribers[channel] << callback
@@ -75,35 +88,6 @@ module ActionCable
end
end
end
-
- class PostgresWrapper
- def publish(channel, message)
- ActiveRecord::Base.connection_pool.with_connection do |ar_conn|
- pg_conn = ar_conn.raw_connection
-
- unless pg_conn.is_a?(PG::Connection)
- raise 'ActiveRecord database must be Postgres in order to use the Postgres ActionCable storage adapter'
- end
-
- pg_conn.exec("NOTIFY #{channel}, '#{message}'")
- end
- end
-
- def subscribe(channel, &callback)
- Listener.instance.subscribe_to(channel, callback)
- # Needed for channel/streams.rb#L79
- ::EM::DefaultDeferrable.new
- end
-
- def unsubscribe(channel)
- Listener.instance.unsubscribe_to(channel)
- end
-
- def unsubscribe_proc(channel, block)
- Listener.instance.unsubscribe_to(channel, block)
- end
- end
-
end
end
end
diff --git a/actioncable/lib/action_cable/storage_adapter/redis.rb b/actioncable/lib/action_cable/storage_adapter/redis.rb
index 7b712b9b03..3f0f6c4172 100644
--- a/actioncable/lib/action_cable/storage_adapter/redis.rb
+++ b/actioncable/lib/action_cable/storage_adapter/redis.rb
@@ -4,19 +4,29 @@ require 'redis'
module ActionCable
module StorageAdapter
class Redis < Base
- # The redis instance used for broadcasting. Not intended for direct user use.
- def broadcast
- @broadcast ||= ::Redis.new(@server.config.config_opts)
+ def broadcast(channel, payload)
+ redis_conn.publish(channel, payload)
+ end
+
+ def subscribe(channel, message_callback, success_callback = nil)
+ hi_redis_conn.pubsub.subscribe(channel, &message_callback).tap do |result|
+ result.callback(&success_callback) if success_callback
+ end
end
- def pubsub
- redis.pubsub
+ def unsubscribe(channel, message_callback)
+ hi_redis_conn.pubsub.unsubscribe_proc(channel, message_callback)
end
private
+ # The redis instance used for broadcasting. Not intended for direct user use.
+ def redis_conn
+ @broadcast ||= ::Redis.new(@server.config.config_opts)
+ end
+
# The EventMachine Redis instance used by the pubsub adapter.
- def redis
+ def hi_redis_conn
@redis ||= EM::Hiredis.connect(@server.config.config_opts[:url]).tap do |redis|
redis.on(:reconnect_failed) do
@logger.info "[ActionCable] Redis reconnect failed."