From 03a209e92aeed1e724b3ff787ec77936b7163ca5 Mon Sep 17 00:00:00 2001 From: palkan Date: Mon, 19 Sep 2016 12:29:23 +0300 Subject: [Fix #25381] Avoid race condition on subscription confirmation --- actioncable/lib/action_cable/channel/base.rb | 26 +++++++++++++++++-------- actioncable/lib/action_cable/channel/streams.rb | 3 ++- 2 files changed, 20 insertions(+), 9 deletions(-) (limited to 'actioncable/lib/action_cable/channel') diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb index 2e589a2cfa..1d9038b76a 100644 --- a/actioncable/lib/action_cable/channel/base.rb +++ b/actioncable/lib/action_cable/channel/base.rb @@ -144,7 +144,9 @@ module ActionCable # When a channel is streaming via pubsub, we want to delay the confirmation # transmission until pubsub subscription is confirmed. - @defer_subscription_confirmation = false + # + # We use atomic fixnum to track the number of waiting tasks to avoid race conditions + @defer_subscription_confirmation_counter = Concurrent::AtomicFixnum.new(1) @reject_subscription = nil @subscription_confirmation_sent = nil @@ -153,6 +155,14 @@ module ActionCable subscribe_to_channel end + # This method is called after subscription has been added to the channel. + # Send confirmation here to avoid race conditions when client tries to perform actions + # right after receiving confirmation. + def registered! + @defer_subscription_confirmation_counter.decrement + transmit_subscription_confirmation unless defer_subscription_confirmation? + end + # Extract the action name from the passed data and process it via the channel. The process will ensure # that the action requested is a public method on the channel declared by the user (so not one of the callbacks # like #subscribed). @@ -202,17 +212,21 @@ module ActionCable end def defer_subscription_confirmation! - @defer_subscription_confirmation = true + @defer_subscription_confirmation_counter.increment end def defer_subscription_confirmation? - @defer_subscription_confirmation + @defer_subscription_confirmation_counter.value.positive? end def subscription_confirmation_sent? @subscription_confirmation_sent end + def registered? + @registered + end + def reject @reject_subscription = true end @@ -235,11 +249,7 @@ module ActionCable subscribed end - if subscription_rejected? - reject_subscription - else - transmit_subscription_confirmation unless defer_subscription_confirmation? - end + reject_subscription if subscription_rejected? end def extract_action(data) diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index 561750d713..c9b58f3a04 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -84,7 +84,8 @@ module ActionCable connection.server.event_loop.post do pubsub.subscribe(broadcasting, handler, lambda do - transmit_subscription_confirmation + @defer_subscription_confirmation_counter.decrement + transmit_subscription_confirmation unless defer_subscription_confirmation? logger.info "#{self.class.name} is streaming from #{broadcasting}" end) end -- cgit v1.2.3 From 3e68d8b872b48ecb45268a7e5fcb937e68f2724f Mon Sep 17 00:00:00 2001 From: palkan Date: Wed, 21 Sep 2016 02:57:10 +0300 Subject: Add Channel#ensure_confirmation_sent; call #subscribe_to_channel after initializing --- actioncable/lib/action_cable/channel/base.rb | 42 +++++++++++-------------- actioncable/lib/action_cable/channel/streams.rb | 3 +- 2 files changed, 20 insertions(+), 25 deletions(-) (limited to 'actioncable/lib/action_cable/channel') diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb index 1d9038b76a..a866044f95 100644 --- a/actioncable/lib/action_cable/channel/base.rb +++ b/actioncable/lib/action_cable/channel/base.rb @@ -145,22 +145,13 @@ module ActionCable # When a channel is streaming via pubsub, we want to delay the confirmation # transmission until pubsub subscription is confirmed. # - # We use atomic fixnum to track the number of waiting tasks to avoid race conditions + # The counter starts at 1 because it's awaiting a call to #subscribe_to_channel @defer_subscription_confirmation_counter = Concurrent::AtomicFixnum.new(1) @reject_subscription = nil @subscription_confirmation_sent = nil delegate_connection_identifiers - subscribe_to_channel - end - - # This method is called after subscription has been added to the channel. - # Send confirmation here to avoid race conditions when client tries to perform actions - # right after receiving confirmation. - def registered! - @defer_subscription_confirmation_counter.decrement - transmit_subscription_confirmation unless defer_subscription_confirmation? end # Extract the action name from the passed data and process it via the channel. The process will ensure @@ -179,6 +170,17 @@ module ActionCable end end + # This method is called after subscription has been added to the connection + # and confirms or rejects the subscription. + def subscribe_to_channel + run_callbacks :subscribe do + subscribed + end + + reject_subscription if subscription_rejected? + ensure_confirmation_sent + end + # Called by the cable connection when it's cut, so the channel has a chance to cleanup with callbacks. # This method is not intended to be called directly by the user. Instead, overwrite the #unsubscribed callback. def unsubscribe_from_channel # :nodoc: @@ -211,22 +213,24 @@ module ActionCable end end + def ensure_confirmation_sent + return if subscription_rejected? + @defer_subscription_confirmation_counter.decrement + transmit_subscription_confirmation unless defer_subscription_confirmation? + end + def defer_subscription_confirmation! @defer_subscription_confirmation_counter.increment end def defer_subscription_confirmation? - @defer_subscription_confirmation_counter.value.positive? + @defer_subscription_confirmation_counter.value > 0 end def subscription_confirmation_sent? @subscription_confirmation_sent end - def registered? - @registered - end - def reject @reject_subscription = true end @@ -244,14 +248,6 @@ module ActionCable end end - def subscribe_to_channel - run_callbacks :subscribe do - subscribed - end - - reject_subscription if subscription_rejected? - end - def extract_action(data) (data["action"].presence || :receive).to_sym end diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index c9b58f3a04..e480b93df0 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -84,8 +84,7 @@ module ActionCable connection.server.event_loop.post do pubsub.subscribe(broadcasting, handler, lambda do - @defer_subscription_confirmation_counter.decrement - transmit_subscription_confirmation unless defer_subscription_confirmation? + ensure_confirmation_sent logger.info "#{self.class.name} is streaming from #{broadcasting}" end) end -- cgit v1.2.3