aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib
diff options
context:
space:
mode:
authorpalkan <dementiev.vm@gmail.com>2016-09-21 02:57:10 +0300
committerpalkan <dementiev.vm@gmail.com>2016-09-22 20:25:09 +0300
commit3e68d8b872b48ecb45268a7e5fcb937e68f2724f (patch)
tree6c1a25c42a8baea1141af95016abbce1f7d78469 /actioncable/lib
parent03a209e92aeed1e724b3ff787ec77936b7163ca5 (diff)
downloadrails-3e68d8b872b48ecb45268a7e5fcb937e68f2724f.tar.gz
rails-3e68d8b872b48ecb45268a7e5fcb937e68f2724f.tar.bz2
rails-3e68d8b872b48ecb45268a7e5fcb937e68f2724f.zip
Add Channel#ensure_confirmation_sent; call #subscribe_to_channel after initializing
Diffstat (limited to 'actioncable/lib')
-rw-r--r--actioncable/lib/action_cable/channel/base.rb42
-rw-r--r--actioncable/lib/action_cable/channel/streams.rb3
-rw-r--r--actioncable/lib/action_cable/connection/subscriptions.rb6
3 files changed, 24 insertions, 27 deletions
diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb
index 1d9038b76a..a866044f95 100644
--- a/actioncable/lib/action_cable/channel/base.rb
+++ b/actioncable/lib/action_cable/channel/base.rb
@@ -145,22 +145,13 @@ module ActionCable
# When a channel is streaming via pubsub, we want to delay the confirmation
# transmission until pubsub subscription is confirmed.
#
- # We use atomic fixnum to track the number of waiting tasks to avoid race conditions
+ # The counter starts at 1 because it's awaiting a call to #subscribe_to_channel
@defer_subscription_confirmation_counter = Concurrent::AtomicFixnum.new(1)
@reject_subscription = nil
@subscription_confirmation_sent = nil
delegate_connection_identifiers
- subscribe_to_channel
- end
-
- # This method is called after subscription has been added to the channel.
- # Send confirmation here to avoid race conditions when client tries to perform actions
- # right after receiving confirmation.
- def registered!
- @defer_subscription_confirmation_counter.decrement
- transmit_subscription_confirmation unless defer_subscription_confirmation?
end
# Extract the action name from the passed data and process it via the channel. The process will ensure
@@ -179,6 +170,17 @@ module ActionCable
end
end
+ # This method is called after subscription has been added to the connection
+ # and confirms or rejects the subscription.
+ def subscribe_to_channel
+ run_callbacks :subscribe do
+ subscribed
+ end
+
+ reject_subscription if subscription_rejected?
+ ensure_confirmation_sent
+ end
+
# Called by the cable connection when it's cut, so the channel has a chance to cleanup with callbacks.
# This method is not intended to be called directly by the user. Instead, overwrite the #unsubscribed callback.
def unsubscribe_from_channel # :nodoc:
@@ -211,22 +213,24 @@ module ActionCable
end
end
+ def ensure_confirmation_sent
+ return if subscription_rejected?
+ @defer_subscription_confirmation_counter.decrement
+ transmit_subscription_confirmation unless defer_subscription_confirmation?
+ end
+
def defer_subscription_confirmation!
@defer_subscription_confirmation_counter.increment
end
def defer_subscription_confirmation?
- @defer_subscription_confirmation_counter.value.positive?
+ @defer_subscription_confirmation_counter.value > 0
end
def subscription_confirmation_sent?
@subscription_confirmation_sent
end
- def registered?
- @registered
- end
-
def reject
@reject_subscription = true
end
@@ -244,14 +248,6 @@ module ActionCable
end
end
- def subscribe_to_channel
- run_callbacks :subscribe do
- subscribed
- end
-
- reject_subscription if subscription_rejected?
- end
-
def extract_action(data)
(data["action"].presence || :receive).to_sym
end
diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb
index c9b58f3a04..e480b93df0 100644
--- a/actioncable/lib/action_cable/channel/streams.rb
+++ b/actioncable/lib/action_cable/channel/streams.rb
@@ -84,8 +84,7 @@ module ActionCable
connection.server.event_loop.post do
pubsub.subscribe(broadcasting, handler, lambda do
- @defer_subscription_confirmation_counter.decrement
- transmit_subscription_confirmation unless defer_subscription_confirmation?
+ ensure_confirmation_sent
logger.info "#{self.class.name} is streaming from #{broadcasting}"
end)
end
diff --git a/actioncable/lib/action_cable/connection/subscriptions.rb b/actioncable/lib/action_cable/connection/subscriptions.rb
index ccac802332..00511aead5 100644
--- a/actioncable/lib/action_cable/connection/subscriptions.rb
+++ b/actioncable/lib/action_cable/connection/subscriptions.rb
@@ -26,12 +26,14 @@ module ActionCable
id_key = data["identifier"]
id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access
+ return if subscriptions.key?(id_key)
+
subscription_klass = id_options[:channel].safe_constantize
if subscription_klass && ActionCable::Channel::Base >= subscription_klass
subscription = subscription_klass.new(connection, id_key, id_options)
- subscriptions[id_key] ||= subscription
- subscription.registered!
+ subscriptions[id_key] = subscription
+ subscription.subscribe_to_channel
else
logger.error "Subscription class not found: #{id_options[:channel].inspect}"
end