From 84b1f0a3e622d35bf1fb1b2662bc0262a040e119 Mon Sep 17 00:00:00 2001 From: Pratik Naik Date: Fri, 16 Oct 2015 21:05:33 -0500 Subject: Send subscription confirmation from server to the client to avoid race conditions. Without this, it's very easy to send messages over a subscription even before the redis pubsub has been fully initialized. Now we delay calling the subscription#connected method on the client side until we receive a subscription confirmation message from the server. --- lib/action_cable/channel/base.rb | 22 ++++++++++++++++++++++ lib/action_cable/channel/streams.rb | 12 +++++++++--- 2 files changed, 31 insertions(+), 3 deletions(-) (limited to 'lib/action_cable/channel') diff --git a/lib/action_cable/channel/base.rb b/lib/action_cable/channel/base.rb index df87064195..c8292be183 100644 --- a/lib/action_cable/channel/base.rb +++ b/lib/action_cable/channel/base.rb @@ -73,6 +73,8 @@ module ActionCable include Naming include Broadcasting + SUBSCRIPTION_CONFIRMATION_INTERNAL_MESSAGE = 'confirm_subscription' + on_subscribe :subscribed on_unsubscribe :unsubscribed @@ -120,6 +122,10 @@ module ActionCable @identifier = identifier @params = params + # When a channel is streaming via redis pubsub, we want to delay the confirmation + # transmission until redis pubsub subscription is confirmed. + @defer_subscription_confirmation = false + delegate_connection_identifiers subscribe_to_channel end @@ -165,6 +171,15 @@ module ActionCable end + protected + def defer_subscription_confirmation! + @defer_subscription_confirmation = true + end + + def defer_subscription_confirmation? + @defer_subscription_confirmation + end + private def delegate_connection_identifiers connection.identifiers.each do |identifier| @@ -177,6 +192,7 @@ module ActionCable def subscribe_to_channel run_subscribe_callbacks + transmit_subscription_confirmation unless defer_subscription_confirmation? end @@ -213,6 +229,12 @@ module ActionCable def run_unsubscribe_callbacks self.class.on_unsubscribe_callbacks.each { |callback| send(callback) } end + + def transmit_subscription_confirmation + logger.info "#{self.class.name} is transmitting the subscription confirmation" + connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: SUBSCRIPTION_CONFIRMATION_INTERNAL_MESSAGE) + end + end end end diff --git a/lib/action_cable/channel/streams.rb b/lib/action_cable/channel/streams.rb index 9fffdf1789..b5ffa17f72 100644 --- a/lib/action_cable/channel/streams.rb +++ b/lib/action_cable/channel/streams.rb @@ -69,12 +69,18 @@ module ActionCable # Start streaming from the named broadcasting pubsub queue. Optionally, you can pass a callback that'll be used # instead of the default of just transmitting the updates straight to the subscriber. def stream_from(broadcasting, callback = nil) - callback ||= default_stream_callback(broadcasting) + # Hold off the confirmation until pubsub#subscribe is successful + defer_subscription_confirmation! + callback ||= default_stream_callback(broadcasting) streams << [ broadcasting, callback ] - EM.next_tick { pubsub.subscribe broadcasting, &callback } - logger.info "#{self.class.name} is streaming from #{broadcasting}" + EM.next_tick do + pubsub.subscribe(broadcasting, &callback).callback do |reply| + transmit_subscription_confirmation + logger.info "#{self.class.name} is streaming from #{broadcasting}" + end + end end # Start streaming the pubsub queue for the model in this channel. Optionally, you can pass a -- cgit v1.2.3