diff options
author | Matthew Draper <matthew@trebex.net> | 2016-01-14 15:55:41 +1030 |
---|---|---|
committer | Jon Moss <me@jonathanmoss.me> | 2016-01-18 18:59:00 -0500 |
commit | 2815db356977b506f63d155aecf71ee010a64c62 (patch) | |
tree | 6ac39a82985340ff7cea0b8a976e88799d895e24 /actioncable/lib/action_cable/storage_adapter/postgres.rb | |
parent | 7b79ae0335b67377636cf2ba7be70a4119ca90cd (diff) | |
download | rails-2815db356977b506f63d155aecf71ee010a64c62.tar.gz rails-2815db356977b506f63d155aecf71ee010a64c62.tar.bz2 rails-2815db356977b506f63d155aecf71ee010a64c62.zip |
Pull the action methods directly onto the adapter
Diffstat (limited to 'actioncable/lib/action_cable/storage_adapter/postgres.rb')
-rw-r--r-- | actioncable/lib/action_cable/storage_adapter/postgres.rb | 54 |
1 files changed, 19 insertions, 35 deletions
diff --git a/actioncable/lib/action_cable/storage_adapter/postgres.rb b/actioncable/lib/action_cable/storage_adapter/postgres.rb index 67bc2cd77a..07c2c7ce6a 100644 --- a/actioncable/lib/action_cable/storage_adapter/postgres.rb +++ b/actioncable/lib/action_cable/storage_adapter/postgres.rb @@ -4,12 +4,24 @@ module ActionCable module StorageAdapter class Postgres < Base # The storage instance used for broadcasting. Not intended for direct user use. - def broadcast - @broadcast ||= PostgresWrapper.new + def broadcast(channel, payload) + ActiveRecord::Base.connection_pool.with_connection do |ar_conn| + pg_conn = ar_conn.raw_connection + + unless pg_conn.is_a?(PG::Connection) + raise 'ActiveRecord database must be Postgres in order to use the Postgres ActionCable storage adapter' + end + + pg_conn.exec("NOTIFY #{channel}, '#{payload}'") + end + end + + def subscribe(channel, message_callback, success_callback = nil) + Listener.instance.subscribe_to(channel, message_callback, success_callback) end - def pubsub - PostgresWrapper.new + def unsubscribe(channel, message_callback) + Listener.instance.unsubscribe_to(channel, message_callback) end class Listener @@ -37,6 +49,7 @@ module ActionCable value = @queue.pop(true) if value.first == :listen pg_conn.exec("LISTEN #{value[1]}") + ::EM.next_tick(&value[2]) if value[2] elsif value.first == :unlisten pg_conn.exec("UNLISTEN #{value[1]}") end @@ -51,10 +64,10 @@ module ActionCable end end - def subscribe_to(channel, callback) + def subscribe_to(channel, callback, success_callback) @sync.synchronize do if @subscribers[channel].empty? - @queue.push([:listen, channel]) + @queue.push([:listen, channel, success_callback]) end @subscribers[channel] << callback @@ -75,35 +88,6 @@ module ActionCable end end end - - class PostgresWrapper - def publish(channel, message) - ActiveRecord::Base.connection_pool.with_connection do |ar_conn| - pg_conn = ar_conn.raw_connection - - unless pg_conn.is_a?(PG::Connection) - raise 'ActiveRecord database must be Postgres in order to use the Postgres ActionCable storage adapter' - end - - pg_conn.exec("NOTIFY #{channel}, '#{message}'") - end - end - - def subscribe(channel, &callback) - Listener.instance.subscribe_to(channel, callback) - # Needed for channel/streams.rb#L79 - ::EM::DefaultDeferrable.new - end - - def unsubscribe(channel) - Listener.instance.unsubscribe_to(channel) - end - - def unsubscribe_proc(channel, block) - Listener.instance.unsubscribe_to(channel, block) - end - end - end end end |