aboutsummaryrefslogtreecommitdiffstats
path: root/lib/action_cable/channel/streams.rb
diff options
context:
space:
mode:
authorPratik Naik <pratiknaik@gmail.com>2015-10-16 21:05:33 -0500
committerPratik Naik <pratiknaik@gmail.com>2015-10-16 21:11:21 -0500
commit84b1f0a3e622d35bf1fb1b2662bc0262a040e119 (patch)
treecbfef05e91f6e071cc90379ebdf63cf227b91d83 /lib/action_cable/channel/streams.rb
parentdf5a32dfbc94723d847aa8d8034041a2bd8751e2 (diff)
downloadrails-84b1f0a3e622d35bf1fb1b2662bc0262a040e119.tar.gz
rails-84b1f0a3e622d35bf1fb1b2662bc0262a040e119.tar.bz2
rails-84b1f0a3e622d35bf1fb1b2662bc0262a040e119.zip
Send subscription confirmation from server to the client to avoid race conditions.
Without this, it's very easy to send messages over a subscription even before the redis pubsub has been fully initialized. Now we delay calling the subscription#connected method on the client side until we receive a subscription confirmation message from the server.
Diffstat (limited to 'lib/action_cable/channel/streams.rb')
-rw-r--r--lib/action_cable/channel/streams.rb12
1 files changed, 9 insertions, 3 deletions
diff --git a/lib/action_cable/channel/streams.rb b/lib/action_cable/channel/streams.rb
index 9fffdf1789..b5ffa17f72 100644
--- a/lib/action_cable/channel/streams.rb
+++ b/lib/action_cable/channel/streams.rb
@@ -69,12 +69,18 @@ module ActionCable
# Start streaming from the named <tt>broadcasting</tt> pubsub queue. Optionally, you can pass a <tt>callback</tt> that'll be used
# instead of the default of just transmitting the updates straight to the subscriber.
def stream_from(broadcasting, callback = nil)
- callback ||= default_stream_callback(broadcasting)
+ # Hold off the confirmation until pubsub#subscribe is successful
+ defer_subscription_confirmation!
+ callback ||= default_stream_callback(broadcasting)
streams << [ broadcasting, callback ]
- EM.next_tick { pubsub.subscribe broadcasting, &callback }
- logger.info "#{self.class.name} is streaming from #{broadcasting}"
+ EM.next_tick do
+ pubsub.subscribe(broadcasting, &callback).callback do |reply|
+ transmit_subscription_confirmation
+ logger.info "#{self.class.name} is streaming from #{broadcasting}"
+ end
+ end
end
# Start streaming the pubsub queue for the <tt>model</tt> in this channel. Optionally, you can pass a