aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable/storage_adapter/postgres.rb
diff options
context:
space:
mode:
authorMatthew Draper <matthew@trebex.net>2016-01-14 15:55:41 +1030
committerJon Moss <me@jonathanmoss.me>2016-01-18 18:59:00 -0500
commit2815db356977b506f63d155aecf71ee010a64c62 (patch)
tree6ac39a82985340ff7cea0b8a976e88799d895e24 /actioncable/lib/action_cable/storage_adapter/postgres.rb
parent7b79ae0335b67377636cf2ba7be70a4119ca90cd (diff)
downloadrails-2815db356977b506f63d155aecf71ee010a64c62.tar.gz
rails-2815db356977b506f63d155aecf71ee010a64c62.tar.bz2
rails-2815db356977b506f63d155aecf71ee010a64c62.zip
Pull the action methods directly onto the adapter
Diffstat (limited to 'actioncable/lib/action_cable/storage_adapter/postgres.rb')
-rw-r--r--actioncable/lib/action_cable/storage_adapter/postgres.rb54
1 files changed, 19 insertions, 35 deletions
diff --git a/actioncable/lib/action_cable/storage_adapter/postgres.rb b/actioncable/lib/action_cable/storage_adapter/postgres.rb
index 67bc2cd77a..07c2c7ce6a 100644
--- a/actioncable/lib/action_cable/storage_adapter/postgres.rb
+++ b/actioncable/lib/action_cable/storage_adapter/postgres.rb
@@ -4,12 +4,24 @@ module ActionCable
module StorageAdapter
class Postgres < Base
# The storage instance used for broadcasting. Not intended for direct user use.
- def broadcast
- @broadcast ||= PostgresWrapper.new
+ def broadcast(channel, payload)
+ ActiveRecord::Base.connection_pool.with_connection do |ar_conn|
+ pg_conn = ar_conn.raw_connection
+
+ unless pg_conn.is_a?(PG::Connection)
+ raise 'ActiveRecord database must be Postgres in order to use the Postgres ActionCable storage adapter'
+ end
+
+ pg_conn.exec("NOTIFY #{channel}, '#{payload}'")
+ end
+ end
+
+ def subscribe(channel, message_callback, success_callback = nil)
+ Listener.instance.subscribe_to(channel, message_callback, success_callback)
end
- def pubsub
- PostgresWrapper.new
+ def unsubscribe(channel, message_callback)
+ Listener.instance.unsubscribe_to(channel, message_callback)
end
class Listener
@@ -37,6 +49,7 @@ module ActionCable
value = @queue.pop(true)
if value.first == :listen
pg_conn.exec("LISTEN #{value[1]}")
+ ::EM.next_tick(&value[2]) if value[2]
elsif value.first == :unlisten
pg_conn.exec("UNLISTEN #{value[1]}")
end
@@ -51,10 +64,10 @@ module ActionCable
end
end
- def subscribe_to(channel, callback)
+ def subscribe_to(channel, callback, success_callback)
@sync.synchronize do
if @subscribers[channel].empty?
- @queue.push([:listen, channel])
+ @queue.push([:listen, channel, success_callback])
end
@subscribers[channel] << callback
@@ -75,35 +88,6 @@ module ActionCable
end
end
end
-
- class PostgresWrapper
- def publish(channel, message)
- ActiveRecord::Base.connection_pool.with_connection do |ar_conn|
- pg_conn = ar_conn.raw_connection
-
- unless pg_conn.is_a?(PG::Connection)
- raise 'ActiveRecord database must be Postgres in order to use the Postgres ActionCable storage adapter'
- end
-
- pg_conn.exec("NOTIFY #{channel}, '#{message}'")
- end
- end
-
- def subscribe(channel, &callback)
- Listener.instance.subscribe_to(channel, callback)
- # Needed for channel/streams.rb#L79
- ::EM::DefaultDeferrable.new
- end
-
- def unsubscribe(channel)
- Listener.instance.unsubscribe_to(channel)
- end
-
- def unsubscribe_proc(channel, block)
- Listener.instance.unsubscribe_to(channel, block)
- end
- end
-
end
end
end