diff options
Diffstat (limited to 'actioncable/lib')
-rw-r--r-- | actioncable/lib/action_cable/channel/base.rb | 26 | ||||
-rw-r--r-- | actioncable/lib/action_cable/channel/streams.rb | 3 | ||||
-rw-r--r-- | actioncable/lib/action_cable/connection/subscriptions.rb | 4 |
3 files changed, 23 insertions, 10 deletions
diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb index 2e589a2cfa..1d9038b76a 100644 --- a/actioncable/lib/action_cable/channel/base.rb +++ b/actioncable/lib/action_cable/channel/base.rb @@ -144,7 +144,9 @@ module ActionCable # When a channel is streaming via pubsub, we want to delay the confirmation # transmission until pubsub subscription is confirmed. - @defer_subscription_confirmation = false + # + # We use atomic fixnum to track the number of waiting tasks to avoid race conditions + @defer_subscription_confirmation_counter = Concurrent::AtomicFixnum.new(1) @reject_subscription = nil @subscription_confirmation_sent = nil @@ -153,6 +155,14 @@ module ActionCable subscribe_to_channel end + # This method is called after subscription has been added to the channel. + # Send confirmation here to avoid race conditions when client tries to perform actions + # right after receiving confirmation. + def registered! + @defer_subscription_confirmation_counter.decrement + transmit_subscription_confirmation unless defer_subscription_confirmation? + end + # Extract the action name from the passed data and process it via the channel. The process will ensure # that the action requested is a public method on the channel declared by the user (so not one of the callbacks # like #subscribed). @@ -202,17 +212,21 @@ module ActionCable end def defer_subscription_confirmation! - @defer_subscription_confirmation = true + @defer_subscription_confirmation_counter.increment end def defer_subscription_confirmation? - @defer_subscription_confirmation + @defer_subscription_confirmation_counter.value.positive? end def subscription_confirmation_sent? @subscription_confirmation_sent end + def registered? + @registered + end + def reject @reject_subscription = true end @@ -235,11 +249,7 @@ module ActionCable subscribed end - if subscription_rejected? - reject_subscription - else - transmit_subscription_confirmation unless defer_subscription_confirmation? - end + reject_subscription if subscription_rejected? end def extract_action(data) diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index 561750d713..c9b58f3a04 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -84,7 +84,8 @@ module ActionCable connection.server.event_loop.post do pubsub.subscribe(broadcasting, handler, lambda do - transmit_subscription_confirmation + @defer_subscription_confirmation_counter.decrement + transmit_subscription_confirmation unless defer_subscription_confirmation? logger.info "#{self.class.name} is streaming from #{broadcasting}" end) end diff --git a/actioncable/lib/action_cable/connection/subscriptions.rb b/actioncable/lib/action_cable/connection/subscriptions.rb index 9060183249..ccac802332 100644 --- a/actioncable/lib/action_cable/connection/subscriptions.rb +++ b/actioncable/lib/action_cable/connection/subscriptions.rb @@ -29,7 +29,9 @@ module ActionCable subscription_klass = id_options[:channel].safe_constantize if subscription_klass && ActionCable::Channel::Base >= subscription_klass - subscriptions[id_key] ||= subscription_klass.new(connection, id_key, id_options) + subscription = subscription_klass.new(connection, id_key, id_options) + subscriptions[id_key] ||= subscription + subscription.registered! else logger.error "Subscription class not found: #{id_options[:channel].inspect}" end |