aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable/storage_adapter/postgres.rb
diff options
context:
space:
mode:
authorMatthew Draper <matthew@trebex.net>2016-01-14 16:11:02 +1030
committerJon Moss <me@jonathanmoss.me>2016-01-18 18:59:01 -0500
commit78ff63ee4191758940ae8e0efaa5f9915af0a788 (patch)
tree84bccbb4aa9a0b0f84af1c16823c26fe8f387c8c /actioncable/lib/action_cable/storage_adapter/postgres.rb
parentbc413e814bbeafe8774b166bd2447ec84475b402 (diff)
downloadrails-78ff63ee4191758940ae8e0efaa5f9915af0a788.tar.gz
rails-78ff63ee4191758940ae8e0efaa5f9915af0a788.tar.bz2
rails-78ff63ee4191758940ae8e0efaa5f9915af0a788.zip
Listener no longer needs to be a singleton
We now only create one adapter instance for the server, so it can hold the listener. This in turn allows the listener to get the PG connection from the adapter, which will be a good place to allow more flexible configuration.
Diffstat (limited to 'actioncable/lib/action_cable/storage_adapter/postgres.rb')
-rw-r--r--actioncable/lib/action_cable/storage_adapter/postgres.rb110
1 files changed, 58 insertions, 52 deletions
diff --git a/actioncable/lib/action_cable/storage_adapter/postgres.rb b/actioncable/lib/action_cable/storage_adapter/postgres.rb
index 119ea787d7..5d874533be 100644
--- a/actioncable/lib/action_cable/storage_adapter/postgres.rb
+++ b/actioncable/lib/action_cable/storage_adapter/postgres.rb
@@ -5,85 +5,91 @@ module ActionCable
class Postgres < Base
# The storage instance used for broadcasting. Not intended for direct user use.
def broadcast(channel, payload)
- ActiveRecord::Base.connection_pool.with_connection do |ar_conn|
- pg_conn = ar_conn.raw_connection
-
- unless pg_conn.is_a?(PG::Connection)
- raise 'ActiveRecord database must be Postgres in order to use the Postgres ActionCable storage adapter'
- end
-
+ with_connection do |pg_conn|
pg_conn.exec("NOTIFY #{channel}, '#{payload}'")
end
end
- def subscribe(channel, message_callback, success_callback = nil)
- Listener.instance.subscribe_to(channel, message_callback, success_callback)
+ def subscribe(channel, callback, success_callback = nil)
+ listener.subscribe_to(channel, callback, success_callback)
end
- def unsubscribe(channel, message_callback)
- Listener.instance.unsubscribe_to(channel, message_callback)
+ def unsubscribe(channel, callback)
+ listener.unsubscribe_to(channel, callback)
end
- class Listener
- include Singleton
+ def with_connection(&block) # :nodoc:
+ ActiveRecord::Base.connection_pool.with_connection do |ar_conn|
+ pg_conn = ar_conn.raw_connection
- attr_accessor :subscribers
+ unless pg_conn.is_a?(PG::Connection)
+ raise 'ActiveRecord database must be Postgres in order to use the Postgres ActionCable storage adapter'
+ end
- def initialize
- @subscribers = Hash.new {|h,k| h[k] = [] }
- @sync = Mutex.new
- @queue = Queue.new
+ yield pg_conn
+ end
+ end
- Thread.new do
- Thread.current.abort_on_exception = true
- listen
- end
+ private
+ def listener
+ @listener ||= Listener.new(self)
end
- def listen
- ActiveRecord::Base.connection_pool.with_connection do |ar_conn|
- pg_conn = ar_conn.raw_connection
+ class Listener
+ def initialize(adapter)
+ @adapter = adapter
+ @subscribers = Hash.new { |h,k| h[k] = [] }
+ @sync = Mutex.new
+ @queue = Queue.new
- loop do
- until @queue.empty?
- value = @queue.pop(true)
- if value.first == :listen
- pg_conn.exec("LISTEN #{value[1]}")
- ::EM.next_tick(&value[2]) if value[2]
- elsif value.first == :unlisten
- pg_conn.exec("UNLISTEN #{value[1]}")
- end
- end
+ Thread.new do
+ Thread.current.abort_on_exception = true
+ listen
+ end
+ end
- pg_conn.wait_for_notify(1) do |chan, pid, message|
- @subscribers[chan].each do |callback|
- ::EM.next_tick { callback.call(message) }
+ def listen
+ @adapter.with_connection do |pg_conn|
+ loop do
+ until @queue.empty?
+ value = @queue.pop(true)
+ if value.first == :listen
+ pg_conn.exec("LISTEN #{value[1]}")
+ ::EM.next_tick(&value[2]) if value[2]
+ elsif value.first == :unlisten
+ pg_conn.exec("UNLISTEN #{value[1]}")
+ end
+
+ pg_conn.wait_for_notify(1) do |chan, pid, message|
+ @subscribers[chan].each do |callback|
+ ::EM.next_tick { callback.call(message) }
+ end
+ end
end
end
end
end
- end
- def subscribe_to(channel, callback, success_callback)
- @sync.synchronize do
- if @subscribers[channel].empty?
- @queue.push([:listen, channel, success_callback])
- end
+ def subscribe_to(channel, callback, success_callback)
+ @sync.synchronize do
+ if @subscribers[channel].empty?
+ @queue.push([:listen, channel, success_callback])
+ end
- @subscribers[channel] << callback
+ @subscribers[channel] << callback
+ end
end
- end
- def unsubscribe_to(channel, callback)
- @sync.synchronize do
- @subscribers[channel].delete(callback)
+ def unsubscribe_to(channel, callback)
+ @sync.synchronize do
+ @subscribers[channel].delete(callback)
- if @subscribers[channel].empty?
- @queue.push([:unlisten, channel])
+ if @subscribers[channel].empty?
+ @queue.push([:unlisten, channel])
+ end
end
end
end
- end
end
end
end