diff options
author | Matthew Draper <matthew@trebex.net> | 2016-01-14 16:11:02 +1030 |
---|---|---|
committer | Jon Moss <me@jonathanmoss.me> | 2016-01-18 18:59:01 -0500 |
commit | 78ff63ee4191758940ae8e0efaa5f9915af0a788 (patch) | |
tree | 84bccbb4aa9a0b0f84af1c16823c26fe8f387c8c /actioncable/lib/action_cable | |
parent | bc413e814bbeafe8774b166bd2447ec84475b402 (diff) | |
download | rails-78ff63ee4191758940ae8e0efaa5f9915af0a788.tar.gz rails-78ff63ee4191758940ae8e0efaa5f9915af0a788.tar.bz2 rails-78ff63ee4191758940ae8e0efaa5f9915af0a788.zip |
Listener no longer needs to be a singleton
We now only create one adapter instance for the server, so it can hold
the listener. This in turn allows the listener to get the PG connection
from the adapter, which will be a good place to allow more flexible
configuration.
Diffstat (limited to 'actioncable/lib/action_cable')
-rw-r--r-- | actioncable/lib/action_cable/storage_adapter/postgres.rb | 110 |
1 files changed, 58 insertions, 52 deletions
diff --git a/actioncable/lib/action_cable/storage_adapter/postgres.rb b/actioncable/lib/action_cable/storage_adapter/postgres.rb index 119ea787d7..5d874533be 100644 --- a/actioncable/lib/action_cable/storage_adapter/postgres.rb +++ b/actioncable/lib/action_cable/storage_adapter/postgres.rb @@ -5,85 +5,91 @@ module ActionCable class Postgres < Base # The storage instance used for broadcasting. Not intended for direct user use. def broadcast(channel, payload) - ActiveRecord::Base.connection_pool.with_connection do |ar_conn| - pg_conn = ar_conn.raw_connection - - unless pg_conn.is_a?(PG::Connection) - raise 'ActiveRecord database must be Postgres in order to use the Postgres ActionCable storage adapter' - end - + with_connection do |pg_conn| pg_conn.exec("NOTIFY #{channel}, '#{payload}'") end end - def subscribe(channel, message_callback, success_callback = nil) - Listener.instance.subscribe_to(channel, message_callback, success_callback) + def subscribe(channel, callback, success_callback = nil) + listener.subscribe_to(channel, callback, success_callback) end - def unsubscribe(channel, message_callback) - Listener.instance.unsubscribe_to(channel, message_callback) + def unsubscribe(channel, callback) + listener.unsubscribe_to(channel, callback) end - class Listener - include Singleton + def with_connection(&block) # :nodoc: + ActiveRecord::Base.connection_pool.with_connection do |ar_conn| + pg_conn = ar_conn.raw_connection - attr_accessor :subscribers + unless pg_conn.is_a?(PG::Connection) + raise 'ActiveRecord database must be Postgres in order to use the Postgres ActionCable storage adapter' + end - def initialize - @subscribers = Hash.new {|h,k| h[k] = [] } - @sync = Mutex.new - @queue = Queue.new + yield pg_conn + end + end - Thread.new do - Thread.current.abort_on_exception = true - listen - end + private + def listener + @listener ||= Listener.new(self) end - def listen - ActiveRecord::Base.connection_pool.with_connection do |ar_conn| - pg_conn = ar_conn.raw_connection + class Listener + def initialize(adapter) + @adapter = adapter + @subscribers = Hash.new { |h,k| h[k] = [] } + @sync = Mutex.new + @queue = Queue.new - loop do - until @queue.empty? - value = @queue.pop(true) - if value.first == :listen - pg_conn.exec("LISTEN #{value[1]}") - ::EM.next_tick(&value[2]) if value[2] - elsif value.first == :unlisten - pg_conn.exec("UNLISTEN #{value[1]}") - end - end + Thread.new do + Thread.current.abort_on_exception = true + listen + end + end - pg_conn.wait_for_notify(1) do |chan, pid, message| - @subscribers[chan].each do |callback| - ::EM.next_tick { callback.call(message) } + def listen + @adapter.with_connection do |pg_conn| + loop do + until @queue.empty? + value = @queue.pop(true) + if value.first == :listen + pg_conn.exec("LISTEN #{value[1]}") + ::EM.next_tick(&value[2]) if value[2] + elsif value.first == :unlisten + pg_conn.exec("UNLISTEN #{value[1]}") + end + + pg_conn.wait_for_notify(1) do |chan, pid, message| + @subscribers[chan].each do |callback| + ::EM.next_tick { callback.call(message) } + end + end end end end end - end - def subscribe_to(channel, callback, success_callback) - @sync.synchronize do - if @subscribers[channel].empty? - @queue.push([:listen, channel, success_callback]) - end + def subscribe_to(channel, callback, success_callback) + @sync.synchronize do + if @subscribers[channel].empty? + @queue.push([:listen, channel, success_callback]) + end - @subscribers[channel] << callback + @subscribers[channel] << callback + end end - end - def unsubscribe_to(channel, callback) - @sync.synchronize do - @subscribers[channel].delete(callback) + def unsubscribe_to(channel, callback) + @sync.synchronize do + @subscribers[channel].delete(callback) - if @subscribers[channel].empty? - @queue.push([:unlisten, channel]) + if @subscribers[channel].empty? + @queue.push([:unlisten, channel]) + end end end end - end end end end |