From dccc15d4030f250f38987328b6201282c1ef34a5 Mon Sep 17 00:00:00 2001 From: Matthew Draper Date: Fri, 22 Jan 2016 11:13:12 +1030 Subject: Split internal subscriber tracking from Postgres adapter --- .../lib/action_cable/subscription_adapter.rb | 5 +- .../subscription_adapter/postgresql.rb | 36 ++++++--------- .../subscription_adapter/subscriber_map.rb | 53 ++++++++++++++++++++++ 3 files changed, 70 insertions(+), 24 deletions(-) create mode 100644 actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb (limited to 'actioncable/lib/action_cable') diff --git a/actioncable/lib/action_cable/subscription_adapter.rb b/actioncable/lib/action_cable/subscription_adapter.rb index e770f4fb00..72e62f3daf 100644 --- a/actioncable/lib/action_cable/subscription_adapter.rb +++ b/actioncable/lib/action_cable/subscription_adapter.rb @@ -1,5 +1,8 @@ module ActionCable module SubscriptionAdapter - autoload :Base, 'action_cable/subscription_adapter/base' + extend ActiveSupport::Autoload + + autoload :Base + autoload :SubscriberMap end end diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb index 6465663c97..37247634db 100644 --- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb +++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb @@ -12,11 +12,11 @@ module ActionCable end def subscribe(channel, callback, success_callback = nil) - listener.subscribe_to(channel, callback, success_callback) + listener.add_subscriber(channel, callback, success_callback) end def unsubscribe(channel, callback) - listener.unsubscribe_from(channel, callback) + listener.remove_subscriber(channel, callback) end def with_connection(&block) # :nodoc: @@ -36,11 +36,11 @@ module ActionCable @listener ||= Listener.new(self) end - class Listener + class Listener < SubscriberMap def initialize(adapter) + super() + @adapter = adapter - @subscribers = Hash.new { |h,k| h[k] = [] } - @sync = Mutex.new @queue = Queue.new Thread.new do @@ -65,32 +65,22 @@ module ActionCable end pg_conn.wait_for_notify(1) do |chan, pid, message| - @subscribers[chan].each do |callback| - ::EM.next_tick { callback.call(message) } - end + broadcast(chan, message) end end end end - def subscribe_to(channel, callback, success_callback) - @sync.synchronize do - if @subscribers[channel].empty? - @queue.push([:listen, channel, success_callback]) - end - - @subscribers[channel] << callback - end + def add_channel(channel, on_success) + @queue.push([:listen, channel, on_success]) end - def unsubscribe_from(channel, callback) - @sync.synchronize do - @subscribers[channel].delete(callback) + def remove_channel(channel) + @queue.push([:unlisten, channel]) + end - if @subscribers[channel].empty? - @queue.push([:unlisten, channel]) - end - end + def invoke_callback(*) + ::EM.next_tick { super } end end end diff --git a/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb b/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb new file mode 100644 index 0000000000..37eed09793 --- /dev/null +++ b/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb @@ -0,0 +1,53 @@ +module ActionCable + module SubscriptionAdapter + class SubscriberMap + def initialize + @subscribers = Hash.new { |h,k| h[k] = [] } + @sync = Mutex.new + end + + def add_subscriber(channel, subscriber, on_success) + @sync.synchronize do + new_channel = !@subscribers.key?(channel) + + @subscribers[channel] << subscriber + + if new_channel + add_channel channel, on_success + elsif on_success + on_success.call + end + end + end + + def remove_subscriber(channel, subscriber) + @sync.synchronize do + @subscribers[channel].delete(subscriber) + + if @subscribers[channel].empty? + @subscribers.delete channel + remove_channel channel + end + end + end + + def broadcast(channel, message) + list = @sync.synchronize { @subscribers[channel].dup } + list.each do |subscriber| + invoke_callback(subscriber, message) + end + end + + def add_channel(channel, on_success) + on_success.call if on_success + end + + def remove_channel(channel) + end + + def invoke_callback(callback, message) + callback.call message + end + end + end +end -- cgit v1.2.3 From e81bb80cb4214ff795f7a2bb03f699e2a7f7d45a Mon Sep 17 00:00:00 2001 From: Matthew Draper Date: Sun, 24 Jan 2016 15:51:21 +1030 Subject: Normalize on no arguments for the success callback --- actioncable/lib/action_cable/channel/streams.rb | 2 +- actioncable/lib/action_cable/subscription_adapter/redis.rb | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'actioncable/lib/action_cable') diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index 589946c3db..e2876ef6fa 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -76,7 +76,7 @@ module ActionCable streams << [ broadcasting, callback ] EM.next_tick do - pubsub.subscribe(broadcasting, callback, lambda do |reply| + pubsub.subscribe(broadcasting, callback, lambda do transmit_subscription_confirmation logger.info "#{self.class.name} is streaming from #{broadcasting}" end) diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb index e42ab2a03f..90abf21367 100644 --- a/actioncable/lib/action_cable/subscription_adapter/redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb @@ -12,7 +12,7 @@ module ActionCable def subscribe(channel, message_callback, success_callback = nil) redis_connection_for_subscriptions.pubsub.subscribe(channel, &message_callback).tap do |result| - result.callback(&success_callback) if success_callback + result.callback { |reply| success_callback.call } if success_callback end end -- cgit v1.2.3 From b17a7e4c4daa4ab54223b0c5e2d03b491c070e0e Mon Sep 17 00:00:00 2001 From: Matthew Draper Date: Fri, 22 Jan 2016 11:13:51 +1030 Subject: Add Async and Inline adapters Just like their ActiveJob equivalents, these only work within the current process. --- .../lib/action_cable/subscription_adapter/async.rb | 22 ++++++++++++++++++++++ .../action_cable/subscription_adapter/inline.rb | 22 ++++++++++++++++++++++ 2 files changed, 44 insertions(+) create mode 100644 actioncable/lib/action_cable/subscription_adapter/async.rb create mode 100644 actioncable/lib/action_cable/subscription_adapter/inline.rb (limited to 'actioncable/lib/action_cable') diff --git a/actioncable/lib/action_cable/subscription_adapter/async.rb b/actioncable/lib/action_cable/subscription_adapter/async.rb new file mode 100644 index 0000000000..85d4892e4c --- /dev/null +++ b/actioncable/lib/action_cable/subscription_adapter/async.rb @@ -0,0 +1,22 @@ +require 'action_cable/subscription_adapter/inline' + +module ActionCable + module SubscriptionAdapter + class Async < Inline # :nodoc: + private + def subscriber_map + @subscriber_map ||= AsyncSubscriberMap.new + end + + class AsyncSubscriberMap < SubscriberMap + def add_subscriber(*) + ::EM.next_tick { super } + end + + def invoke_callback(*) + ::EM.next_tick { super } + end + end + end + end +end diff --git a/actioncable/lib/action_cable/subscription_adapter/inline.rb b/actioncable/lib/action_cable/subscription_adapter/inline.rb new file mode 100644 index 0000000000..19747601be --- /dev/null +++ b/actioncable/lib/action_cable/subscription_adapter/inline.rb @@ -0,0 +1,22 @@ +module ActionCable + module SubscriptionAdapter + class Inline < Base # :nodoc: + def broadcast(channel, payload) + subscriber_map.broadcast(channel, payload) + end + + def subscribe(channel, callback, success_callback = nil) + subscriber_map.add_subscriber(channel, callback, success_callback) + end + + def unsubscribe(channel, callback) + subscriber_map.remove_subscriber(channel, callback) + end + + private + def subscriber_map + @subscriber_map ||= SubscriberMap.new + end + end + end +end -- cgit v1.2.3 From 7363ad43f598d6833d3c773f56e58a9f5d6e2e1f Mon Sep 17 00:00:00 2001 From: Matthew Draper Date: Sun, 24 Jan 2016 19:00:06 +1030 Subject: Allow subscription adapters to be shut down --- .../lib/action_cable/subscription_adapter/base.rb | 4 +++ .../action_cable/subscription_adapter/inline.rb | 4 +++ .../subscription_adapter/postgresql.rb | 41 ++++++++++++++-------- .../lib/action_cable/subscription_adapter/redis.rb | 5 +++ 4 files changed, 40 insertions(+), 14 deletions(-) (limited to 'actioncable/lib/action_cable') diff --git a/actioncable/lib/action_cable/subscription_adapter/base.rb b/actioncable/lib/action_cable/subscription_adapter/base.rb index 11910803e8..796db5ffa3 100644 --- a/actioncable/lib/action_cable/subscription_adapter/base.rb +++ b/actioncable/lib/action_cable/subscription_adapter/base.rb @@ -19,6 +19,10 @@ module ActionCable def unsubscribe(channel, message_callback) raise NotImplementedError end + + def shutdown + raise NotImplementedError + end end end end diff --git a/actioncable/lib/action_cable/subscription_adapter/inline.rb b/actioncable/lib/action_cable/subscription_adapter/inline.rb index 19747601be..4a2a8d23a2 100644 --- a/actioncable/lib/action_cable/subscription_adapter/inline.rb +++ b/actioncable/lib/action_cable/subscription_adapter/inline.rb @@ -13,6 +13,10 @@ module ActionCable subscriber_map.remove_subscriber(channel, callback) end + def shutdown + # nothing to do + end + private def subscriber_map @subscriber_map ||= SubscriberMap.new diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb index 37247634db..78f8aeb599 100644 --- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb +++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb @@ -19,6 +19,10 @@ module ActionCable listener.remove_subscriber(channel, callback) end + def shutdown + listener.shutdown + end + def with_connection(&block) # :nodoc: ActiveRecord::Base.connection_pool.with_connection do |ar_conn| pg_conn = ar_conn.raw_connection @@ -43,7 +47,7 @@ module ActionCable @adapter = adapter @queue = Queue.new - Thread.new do + @thread = Thread.new do Thread.current.abort_on_exception = true listen end @@ -51,26 +55,35 @@ module ActionCable def listen @adapter.with_connection do |pg_conn| - loop do - until @queue.empty? - action, channel, callback = @queue.pop(true) - escaped_channel = pg_conn.escape_identifier(channel) - - if action == :listen - pg_conn.exec("LISTEN #{escaped_channel}") - ::EM.next_tick(&callback) if callback - elsif action == :unlisten - pg_conn.exec("UNLISTEN #{escaped_channel}") + catch :shutdown do + loop do + until @queue.empty? + action, channel, callback = @queue.pop(true) + + case action + when :listen + pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}") + ::EM.next_tick(&callback) if callback + when :unlisten + pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}") + when :shutdown + throw :shutdown + end end - end - pg_conn.wait_for_notify(1) do |chan, pid, message| - broadcast(chan, message) + pg_conn.wait_for_notify(1) do |chan, pid, message| + broadcast(chan, message) + end end end end end + def shutdown + @queue.push([:shutdown]) + Thread.pass while @thread.alive? + end + def add_channel(channel, on_success) @queue.push([:listen, channel, on_success]) end diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb index 90abf21367..3b86354621 100644 --- a/actioncable/lib/action_cable/subscription_adapter/redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb @@ -20,6 +20,11 @@ module ActionCable redis_connection_for_subscriptions.pubsub.unsubscribe_proc(channel, message_callback) end + def shutdown + redis_connection_for_subscriptions.pubsub.close_connection + @redis_connection_for_subscriptions = nil + end + private def redis_connection_for_subscriptions @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis| -- cgit v1.2.3