aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable/storage_adapter
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable/lib/action_cable/storage_adapter')
-rw-r--r--actioncable/lib/action_cable/storage_adapter/postgres.rb110
1 files changed, 58 insertions, 52 deletions
diff --git a/actioncable/lib/action_cable/storage_adapter/postgres.rb b/actioncable/lib/action_cable/storage_adapter/postgres.rb
index 119ea787d7..5d874533be 100644
--- a/actioncable/lib/action_cable/storage_adapter/postgres.rb
+++ b/actioncable/lib/action_cable/storage_adapter/postgres.rb
@@ -5,85 +5,91 @@ module ActionCable
class Postgres < Base
# The storage instance used for broadcasting. Not intended for direct user use.
def broadcast(channel, payload)
- ActiveRecord::Base.connection_pool.with_connection do |ar_conn|
- pg_conn = ar_conn.raw_connection
-
- unless pg_conn.is_a?(PG::Connection)
- raise 'ActiveRecord database must be Postgres in order to use the Postgres ActionCable storage adapter'
- end
-
+ with_connection do |pg_conn|
pg_conn.exec("NOTIFY #{channel}, '#{payload}'")
end
end
- def subscribe(channel, message_callback, success_callback = nil)
- Listener.instance.subscribe_to(channel, message_callback, success_callback)
+ def subscribe(channel, callback, success_callback = nil)
+ listener.subscribe_to(channel, callback, success_callback)
end
- def unsubscribe(channel, message_callback)
- Listener.instance.unsubscribe_to(channel, message_callback)
+ def unsubscribe(channel, callback)
+ listener.unsubscribe_to(channel, callback)
end
- class Listener
- include Singleton
+ def with_connection(&block) # :nodoc:
+ ActiveRecord::Base.connection_pool.with_connection do |ar_conn|
+ pg_conn = ar_conn.raw_connection
- attr_accessor :subscribers
+ unless pg_conn.is_a?(PG::Connection)
+ raise 'ActiveRecord database must be Postgres in order to use the Postgres ActionCable storage adapter'
+ end
- def initialize
- @subscribers = Hash.new {|h,k| h[k] = [] }
- @sync = Mutex.new
- @queue = Queue.new
+ yield pg_conn
+ end
+ end
- Thread.new do
- Thread.current.abort_on_exception = true
- listen
- end
+ private
+ def listener
+ @listener ||= Listener.new(self)
end
- def listen
- ActiveRecord::Base.connection_pool.with_connection do |ar_conn|
- pg_conn = ar_conn.raw_connection
+ class Listener
+ def initialize(adapter)
+ @adapter = adapter
+ @subscribers = Hash.new { |h,k| h[k] = [] }
+ @sync = Mutex.new
+ @queue = Queue.new
- loop do
- until @queue.empty?
- value = @queue.pop(true)
- if value.first == :listen
- pg_conn.exec("LISTEN #{value[1]}")
- ::EM.next_tick(&value[2]) if value[2]
- elsif value.first == :unlisten
- pg_conn.exec("UNLISTEN #{value[1]}")
- end
- end
+ Thread.new do
+ Thread.current.abort_on_exception = true
+ listen
+ end
+ end
- pg_conn.wait_for_notify(1) do |chan, pid, message|
- @subscribers[chan].each do |callback|
- ::EM.next_tick { callback.call(message) }
+ def listen
+ @adapter.with_connection do |pg_conn|
+ loop do
+ until @queue.empty?
+ value = @queue.pop(true)
+ if value.first == :listen
+ pg_conn.exec("LISTEN #{value[1]}")
+ ::EM.next_tick(&value[2]) if value[2]
+ elsif value.first == :unlisten
+ pg_conn.exec("UNLISTEN #{value[1]}")
+ end
+
+ pg_conn.wait_for_notify(1) do |chan, pid, message|
+ @subscribers[chan].each do |callback|
+ ::EM.next_tick { callback.call(message) }
+ end
+ end
end
end
end
end
- end
- def subscribe_to(channel, callback, success_callback)
- @sync.synchronize do
- if @subscribers[channel].empty?
- @queue.push([:listen, channel, success_callback])
- end
+ def subscribe_to(channel, callback, success_callback)
+ @sync.synchronize do
+ if @subscribers[channel].empty?
+ @queue.push([:listen, channel, success_callback])
+ end
- @subscribers[channel] << callback
+ @subscribers[channel] << callback
+ end
end
- end
- def unsubscribe_to(channel, callback)
- @sync.synchronize do
- @subscribers[channel].delete(callback)
+ def unsubscribe_to(channel, callback)
+ @sync.synchronize do
+ @subscribers[channel].delete(callback)
- if @subscribers[channel].empty?
- @queue.push([:unlisten, channel])
+ if @subscribers[channel].empty?
+ @queue.push([:unlisten, channel])
+ end
end
end
end
- end
end
end
end