From dccc15d4030f250f38987328b6201282c1ef34a5 Mon Sep 17 00:00:00 2001 From: Matthew Draper Date: Fri, 22 Jan 2016 11:13:12 +1030 Subject: Split internal subscriber tracking from Postgres adapter --- .../subscription_adapter/postgresql.rb | 36 ++++++--------- .../subscription_adapter/subscriber_map.rb | 53 ++++++++++++++++++++++ 2 files changed, 66 insertions(+), 23 deletions(-) create mode 100644 actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb (limited to 'actioncable/lib/action_cable/subscription_adapter') diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb index 6465663c97..37247634db 100644 --- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb +++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb @@ -12,11 +12,11 @@ module ActionCable end def subscribe(channel, callback, success_callback = nil) - listener.subscribe_to(channel, callback, success_callback) + listener.add_subscriber(channel, callback, success_callback) end def unsubscribe(channel, callback) - listener.unsubscribe_from(channel, callback) + listener.remove_subscriber(channel, callback) end def with_connection(&block) # :nodoc: @@ -36,11 +36,11 @@ module ActionCable @listener ||= Listener.new(self) end - class Listener + class Listener < SubscriberMap def initialize(adapter) + super() + @adapter = adapter - @subscribers = Hash.new { |h,k| h[k] = [] } - @sync = Mutex.new @queue = Queue.new Thread.new do @@ -65,32 +65,22 @@ module ActionCable end pg_conn.wait_for_notify(1) do |chan, pid, message| - @subscribers[chan].each do |callback| - ::EM.next_tick { callback.call(message) } - end + broadcast(chan, message) end end end end - def subscribe_to(channel, callback, success_callback) - @sync.synchronize do - if @subscribers[channel].empty? - @queue.push([:listen, channel, success_callback]) - end - - @subscribers[channel] << callback - end + def add_channel(channel, on_success) + @queue.push([:listen, channel, on_success]) end - def unsubscribe_from(channel, callback) - @sync.synchronize do - @subscribers[channel].delete(callback) + def remove_channel(channel) + @queue.push([:unlisten, channel]) + end - if @subscribers[channel].empty? - @queue.push([:unlisten, channel]) - end - end + def invoke_callback(*) + ::EM.next_tick { super } end end end diff --git a/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb b/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb new file mode 100644 index 0000000000..37eed09793 --- /dev/null +++ b/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb @@ -0,0 +1,53 @@ +module ActionCable + module SubscriptionAdapter + class SubscriberMap + def initialize + @subscribers = Hash.new { |h,k| h[k] = [] } + @sync = Mutex.new + end + + def add_subscriber(channel, subscriber, on_success) + @sync.synchronize do + new_channel = !@subscribers.key?(channel) + + @subscribers[channel] << subscriber + + if new_channel + add_channel channel, on_success + elsif on_success + on_success.call + end + end + end + + def remove_subscriber(channel, subscriber) + @sync.synchronize do + @subscribers[channel].delete(subscriber) + + if @subscribers[channel].empty? + @subscribers.delete channel + remove_channel channel + end + end + end + + def broadcast(channel, message) + list = @sync.synchronize { @subscribers[channel].dup } + list.each do |subscriber| + invoke_callback(subscriber, message) + end + end + + def add_channel(channel, on_success) + on_success.call if on_success + end + + def remove_channel(channel) + end + + def invoke_callback(callback, message) + callback.call message + end + end + end +end -- cgit v1.2.3