aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable/lib/action_cable')
-rw-r--r--actioncable/lib/action_cable/channel/base.rb26
-rw-r--r--actioncable/lib/action_cable/channel/streams.rb3
-rw-r--r--actioncable/lib/action_cable/connection/subscriptions.rb4
3 files changed, 23 insertions, 10 deletions
diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb
index 2e589a2cfa..1d9038b76a 100644
--- a/actioncable/lib/action_cable/channel/base.rb
+++ b/actioncable/lib/action_cable/channel/base.rb
@@ -144,7 +144,9 @@ module ActionCable
# When a channel is streaming via pubsub, we want to delay the confirmation
# transmission until pubsub subscription is confirmed.
- @defer_subscription_confirmation = false
+ #
+ # We use atomic fixnum to track the number of waiting tasks to avoid race conditions
+ @defer_subscription_confirmation_counter = Concurrent::AtomicFixnum.new(1)
@reject_subscription = nil
@subscription_confirmation_sent = nil
@@ -153,6 +155,14 @@ module ActionCable
subscribe_to_channel
end
+ # This method is called after subscription has been added to the channel.
+ # Send confirmation here to avoid race conditions when client tries to perform actions
+ # right after receiving confirmation.
+ def registered!
+ @defer_subscription_confirmation_counter.decrement
+ transmit_subscription_confirmation unless defer_subscription_confirmation?
+ end
+
# Extract the action name from the passed data and process it via the channel. The process will ensure
# that the action requested is a public method on the channel declared by the user (so not one of the callbacks
# like #subscribed).
@@ -202,17 +212,21 @@ module ActionCable
end
def defer_subscription_confirmation!
- @defer_subscription_confirmation = true
+ @defer_subscription_confirmation_counter.increment
end
def defer_subscription_confirmation?
- @defer_subscription_confirmation
+ @defer_subscription_confirmation_counter.value.positive?
end
def subscription_confirmation_sent?
@subscription_confirmation_sent
end
+ def registered?
+ @registered
+ end
+
def reject
@reject_subscription = true
end
@@ -235,11 +249,7 @@ module ActionCable
subscribed
end
- if subscription_rejected?
- reject_subscription
- else
- transmit_subscription_confirmation unless defer_subscription_confirmation?
- end
+ reject_subscription if subscription_rejected?
end
def extract_action(data)
diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb
index 561750d713..c9b58f3a04 100644
--- a/actioncable/lib/action_cable/channel/streams.rb
+++ b/actioncable/lib/action_cable/channel/streams.rb
@@ -84,7 +84,8 @@ module ActionCable
connection.server.event_loop.post do
pubsub.subscribe(broadcasting, handler, lambda do
- transmit_subscription_confirmation
+ @defer_subscription_confirmation_counter.decrement
+ transmit_subscription_confirmation unless defer_subscription_confirmation?
logger.info "#{self.class.name} is streaming from #{broadcasting}"
end)
end
diff --git a/actioncable/lib/action_cable/connection/subscriptions.rb b/actioncable/lib/action_cable/connection/subscriptions.rb
index 9060183249..ccac802332 100644
--- a/actioncable/lib/action_cable/connection/subscriptions.rb
+++ b/actioncable/lib/action_cable/connection/subscriptions.rb
@@ -29,7 +29,9 @@ module ActionCable
subscription_klass = id_options[:channel].safe_constantize
if subscription_klass && ActionCable::Channel::Base >= subscription_klass
- subscriptions[id_key] ||= subscription_klass.new(connection, id_key, id_options)
+ subscription = subscription_klass.new(connection, id_key, id_options)
+ subscriptions[id_key] ||= subscription
+ subscription.registered!
else
logger.error "Subscription class not found: #{id_options[:channel].inspect}"
end