diff options
Diffstat (limited to 'actioncable/lib/action_cable/storage_adapter')
-rw-r--r-- | actioncable/lib/action_cable/storage_adapter/postgres.rb | 110 |
1 files changed, 58 insertions, 52 deletions
diff --git a/actioncable/lib/action_cable/storage_adapter/postgres.rb b/actioncable/lib/action_cable/storage_adapter/postgres.rb index 119ea787d7..5d874533be 100644 --- a/actioncable/lib/action_cable/storage_adapter/postgres.rb +++ b/actioncable/lib/action_cable/storage_adapter/postgres.rb @@ -5,85 +5,91 @@ module ActionCable class Postgres < Base # The storage instance used for broadcasting. Not intended for direct user use. def broadcast(channel, payload) - ActiveRecord::Base.connection_pool.with_connection do |ar_conn| - pg_conn = ar_conn.raw_connection - - unless pg_conn.is_a?(PG::Connection) - raise 'ActiveRecord database must be Postgres in order to use the Postgres ActionCable storage adapter' - end - + with_connection do |pg_conn| pg_conn.exec("NOTIFY #{channel}, '#{payload}'") end end - def subscribe(channel, message_callback, success_callback = nil) - Listener.instance.subscribe_to(channel, message_callback, success_callback) + def subscribe(channel, callback, success_callback = nil) + listener.subscribe_to(channel, callback, success_callback) end - def unsubscribe(channel, message_callback) - Listener.instance.unsubscribe_to(channel, message_callback) + def unsubscribe(channel, callback) + listener.unsubscribe_to(channel, callback) end - class Listener - include Singleton + def with_connection(&block) # :nodoc: + ActiveRecord::Base.connection_pool.with_connection do |ar_conn| + pg_conn = ar_conn.raw_connection - attr_accessor :subscribers + unless pg_conn.is_a?(PG::Connection) + raise 'ActiveRecord database must be Postgres in order to use the Postgres ActionCable storage adapter' + end - def initialize - @subscribers = Hash.new {|h,k| h[k] = [] } - @sync = Mutex.new - @queue = Queue.new + yield pg_conn + end + end - Thread.new do - Thread.current.abort_on_exception = true - listen - end + private + def listener + @listener ||= Listener.new(self) end - def listen - ActiveRecord::Base.connection_pool.with_connection do |ar_conn| - pg_conn = ar_conn.raw_connection + class Listener + def initialize(adapter) + @adapter = adapter + @subscribers = Hash.new { |h,k| h[k] = [] } + @sync = Mutex.new + @queue = Queue.new - loop do - until @queue.empty? - value = @queue.pop(true) - if value.first == :listen - pg_conn.exec("LISTEN #{value[1]}") - ::EM.next_tick(&value[2]) if value[2] - elsif value.first == :unlisten - pg_conn.exec("UNLISTEN #{value[1]}") - end - end + Thread.new do + Thread.current.abort_on_exception = true + listen + end + end - pg_conn.wait_for_notify(1) do |chan, pid, message| - @subscribers[chan].each do |callback| - ::EM.next_tick { callback.call(message) } + def listen + @adapter.with_connection do |pg_conn| + loop do + until @queue.empty? + value = @queue.pop(true) + if value.first == :listen + pg_conn.exec("LISTEN #{value[1]}") + ::EM.next_tick(&value[2]) if value[2] + elsif value.first == :unlisten + pg_conn.exec("UNLISTEN #{value[1]}") + end + + pg_conn.wait_for_notify(1) do |chan, pid, message| + @subscribers[chan].each do |callback| + ::EM.next_tick { callback.call(message) } + end + end end end end end - end - def subscribe_to(channel, callback, success_callback) - @sync.synchronize do - if @subscribers[channel].empty? - @queue.push([:listen, channel, success_callback]) - end + def subscribe_to(channel, callback, success_callback) + @sync.synchronize do + if @subscribers[channel].empty? + @queue.push([:listen, channel, success_callback]) + end - @subscribers[channel] << callback + @subscribers[channel] << callback + end end - end - def unsubscribe_to(channel, callback) - @sync.synchronize do - @subscribers[channel].delete(callback) + def unsubscribe_to(channel, callback) + @sync.synchronize do + @subscribers[channel].delete(callback) - if @subscribers[channel].empty? - @queue.push([:unlisten, channel]) + if @subscribers[channel].empty? + @queue.push([:unlisten, channel]) + end end end end - end end end end |