From 78ff63ee4191758940ae8e0efaa5f9915af0a788 Mon Sep 17 00:00:00 2001 From: Matthew Draper Date: Thu, 14 Jan 2016 16:11:02 +1030 Subject: Listener no longer needs to be a singleton We now only create one adapter instance for the server, so it can hold the listener. This in turn allows the listener to get the PG connection from the adapter, which will be a good place to allow more flexible configuration. --- .../lib/action_cable/storage_adapter/postgres.rb | 110 +++++++++++---------- 1 file changed, 58 insertions(+), 52 deletions(-) (limited to 'actioncable/lib') diff --git a/actioncable/lib/action_cable/storage_adapter/postgres.rb b/actioncable/lib/action_cable/storage_adapter/postgres.rb index 119ea787d7..5d874533be 100644 --- a/actioncable/lib/action_cable/storage_adapter/postgres.rb +++ b/actioncable/lib/action_cable/storage_adapter/postgres.rb @@ -5,85 +5,91 @@ module ActionCable class Postgres < Base # The storage instance used for broadcasting. Not intended for direct user use. def broadcast(channel, payload) - ActiveRecord::Base.connection_pool.with_connection do |ar_conn| - pg_conn = ar_conn.raw_connection - - unless pg_conn.is_a?(PG::Connection) - raise 'ActiveRecord database must be Postgres in order to use the Postgres ActionCable storage adapter' - end - + with_connection do |pg_conn| pg_conn.exec("NOTIFY #{channel}, '#{payload}'") end end - def subscribe(channel, message_callback, success_callback = nil) - Listener.instance.subscribe_to(channel, message_callback, success_callback) + def subscribe(channel, callback, success_callback = nil) + listener.subscribe_to(channel, callback, success_callback) end - def unsubscribe(channel, message_callback) - Listener.instance.unsubscribe_to(channel, message_callback) + def unsubscribe(channel, callback) + listener.unsubscribe_to(channel, callback) end - class Listener - include Singleton + def with_connection(&block) # :nodoc: + ActiveRecord::Base.connection_pool.with_connection do |ar_conn| + pg_conn = ar_conn.raw_connection - attr_accessor :subscribers + unless pg_conn.is_a?(PG::Connection) + raise 'ActiveRecord database must be Postgres in order to use the Postgres ActionCable storage adapter' + end - def initialize - @subscribers = Hash.new {|h,k| h[k] = [] } - @sync = Mutex.new - @queue = Queue.new + yield pg_conn + end + end - Thread.new do - Thread.current.abort_on_exception = true - listen - end + private + def listener + @listener ||= Listener.new(self) end - def listen - ActiveRecord::Base.connection_pool.with_connection do |ar_conn| - pg_conn = ar_conn.raw_connection + class Listener + def initialize(adapter) + @adapter = adapter + @subscribers = Hash.new { |h,k| h[k] = [] } + @sync = Mutex.new + @queue = Queue.new - loop do - until @queue.empty? - value = @queue.pop(true) - if value.first == :listen - pg_conn.exec("LISTEN #{value[1]}") - ::EM.next_tick(&value[2]) if value[2] - elsif value.first == :unlisten - pg_conn.exec("UNLISTEN #{value[1]}") - end - end + Thread.new do + Thread.current.abort_on_exception = true + listen + end + end - pg_conn.wait_for_notify(1) do |chan, pid, message| - @subscribers[chan].each do |callback| - ::EM.next_tick { callback.call(message) } + def listen + @adapter.with_connection do |pg_conn| + loop do + until @queue.empty? + value = @queue.pop(true) + if value.first == :listen + pg_conn.exec("LISTEN #{value[1]}") + ::EM.next_tick(&value[2]) if value[2] + elsif value.first == :unlisten + pg_conn.exec("UNLISTEN #{value[1]}") + end + + pg_conn.wait_for_notify(1) do |chan, pid, message| + @subscribers[chan].each do |callback| + ::EM.next_tick { callback.call(message) } + end + end end end end end - end - def subscribe_to(channel, callback, success_callback) - @sync.synchronize do - if @subscribers[channel].empty? - @queue.push([:listen, channel, success_callback]) - end + def subscribe_to(channel, callback, success_callback) + @sync.synchronize do + if @subscribers[channel].empty? + @queue.push([:listen, channel, success_callback]) + end - @subscribers[channel] << callback + @subscribers[channel] << callback + end end - end - def unsubscribe_to(channel, callback) - @sync.synchronize do - @subscribers[channel].delete(callback) + def unsubscribe_to(channel, callback) + @sync.synchronize do + @subscribers[channel].delete(callback) - if @subscribers[channel].empty? - @queue.push([:unlisten, channel]) + if @subscribers[channel].empty? + @queue.push([:unlisten, channel]) + end end end end - end end end end -- cgit v1.2.3