aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
diff options
context:
space:
mode:
authorMatthew Draper <matthew@trebex.net>2016-01-22 11:13:12 +1030
committerMatthew Draper <matthew@trebex.net>2016-01-24 15:52:47 +1030
commitdccc15d4030f250f38987328b6201282c1ef34a5 (patch)
tree0ed4a4769529bc89de3030bc62c56a3d8947db55 /actioncable/lib/action_cable/subscription_adapter/postgresql.rb
parent83d2c39d5eb8d82ba124b6725d08c8e90760c764 (diff)
downloadrails-dccc15d4030f250f38987328b6201282c1ef34a5.tar.gz
rails-dccc15d4030f250f38987328b6201282c1ef34a5.tar.bz2
rails-dccc15d4030f250f38987328b6201282c1ef34a5.zip
Split internal subscriber tracking from Postgres adapter
Diffstat (limited to 'actioncable/lib/action_cable/subscription_adapter/postgresql.rb')
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/postgresql.rb36
1 files changed, 13 insertions, 23 deletions
diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
index 6465663c97..37247634db 100644
--- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
@@ -12,11 +12,11 @@ module ActionCable
end
def subscribe(channel, callback, success_callback = nil)
- listener.subscribe_to(channel, callback, success_callback)
+ listener.add_subscriber(channel, callback, success_callback)
end
def unsubscribe(channel, callback)
- listener.unsubscribe_from(channel, callback)
+ listener.remove_subscriber(channel, callback)
end
def with_connection(&block) # :nodoc:
@@ -36,11 +36,11 @@ module ActionCable
@listener ||= Listener.new(self)
end
- class Listener
+ class Listener < SubscriberMap
def initialize(adapter)
+ super()
+
@adapter = adapter
- @subscribers = Hash.new { |h,k| h[k] = [] }
- @sync = Mutex.new
@queue = Queue.new
Thread.new do
@@ -65,32 +65,22 @@ module ActionCable
end
pg_conn.wait_for_notify(1) do |chan, pid, message|
- @subscribers[chan].each do |callback|
- ::EM.next_tick { callback.call(message) }
- end
+ broadcast(chan, message)
end
end
end
end
- def subscribe_to(channel, callback, success_callback)
- @sync.synchronize do
- if @subscribers[channel].empty?
- @queue.push([:listen, channel, success_callback])
- end
-
- @subscribers[channel] << callback
- end
+ def add_channel(channel, on_success)
+ @queue.push([:listen, channel, on_success])
end
- def unsubscribe_from(channel, callback)
- @sync.synchronize do
- @subscribers[channel].delete(callback)
+ def remove_channel(channel)
+ @queue.push([:unlisten, channel])
+ end
- if @subscribers[channel].empty?
- @queue.push([:unlisten, channel])
- end
- end
+ def invoke_callback(*)
+ ::EM.next_tick { super }
end
end
end