aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--lib/action_cable/channel/base.rb22
-rw-r--r--lib/action_cable/channel/streams.rb12
-rw-r--r--lib/assets/javascripts/cable.coffee2
-rw-r--r--lib/assets/javascripts/cable/connection.coffee10
-rw-r--r--lib/assets/javascripts/cable/subscriptions.coffee6
-rw-r--r--test/channel/base_test.rb6
-rw-r--r--test/channel/stream_test.rb35
-rw-r--r--test/test_helper.rb1
8 files changed, 76 insertions, 18 deletions
diff --git a/lib/action_cable/channel/base.rb b/lib/action_cable/channel/base.rb
index df87064195..0b9aa2f4ad 100644
--- a/lib/action_cable/channel/base.rb
+++ b/lib/action_cable/channel/base.rb
@@ -73,6 +73,8 @@ module ActionCable
include Naming
include Broadcasting
+ SUBSCRIPTION_CONFIRMATION_INTERNAL_MESSAGE = 'confirm_subscription'.freeze
+
on_subscribe :subscribed
on_unsubscribe :unsubscribed
@@ -120,6 +122,10 @@ module ActionCable
@identifier = identifier
@params = params
+ # When a channel is streaming via redis pubsub, we want to delay the confirmation
+ # transmission until redis pubsub subscription is confirmed.
+ @defer_subscription_confirmation = false
+
delegate_connection_identifiers
subscribe_to_channel
end
@@ -165,6 +171,15 @@ module ActionCable
end
+ protected
+ def defer_subscription_confirmation!
+ @defer_subscription_confirmation = true
+ end
+
+ def defer_subscription_confirmation?
+ @defer_subscription_confirmation
+ end
+
private
def delegate_connection_identifiers
connection.identifiers.each do |identifier|
@@ -177,6 +192,7 @@ module ActionCable
def subscribe_to_channel
run_subscribe_callbacks
+ transmit_subscription_confirmation unless defer_subscription_confirmation?
end
@@ -213,6 +229,12 @@ module ActionCable
def run_unsubscribe_callbacks
self.class.on_unsubscribe_callbacks.each { |callback| send(callback) }
end
+
+ def transmit_subscription_confirmation
+ logger.info "#{self.class.name} is transmitting the subscription confirmation"
+ connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: SUBSCRIPTION_CONFIRMATION_INTERNAL_MESSAGE)
+ end
+
end
end
end
diff --git a/lib/action_cable/channel/streams.rb b/lib/action_cable/channel/streams.rb
index 9fffdf1789..b5ffa17f72 100644
--- a/lib/action_cable/channel/streams.rb
+++ b/lib/action_cable/channel/streams.rb
@@ -69,12 +69,18 @@ module ActionCable
# Start streaming from the named <tt>broadcasting</tt> pubsub queue. Optionally, you can pass a <tt>callback</tt> that'll be used
# instead of the default of just transmitting the updates straight to the subscriber.
def stream_from(broadcasting, callback = nil)
- callback ||= default_stream_callback(broadcasting)
+ # Hold off the confirmation until pubsub#subscribe is successful
+ defer_subscription_confirmation!
+ callback ||= default_stream_callback(broadcasting)
streams << [ broadcasting, callback ]
- EM.next_tick { pubsub.subscribe broadcasting, &callback }
- logger.info "#{self.class.name} is streaming from #{broadcasting}"
+ EM.next_tick do
+ pubsub.subscribe(broadcasting, &callback).callback do |reply|
+ transmit_subscription_confirmation
+ logger.info "#{self.class.name} is streaming from #{broadcasting}"
+ end
+ end
end
# Start streaming the pubsub queue for the <tt>model</tt> in this channel. Optionally, you can pass a
diff --git a/lib/assets/javascripts/cable.coffee b/lib/assets/javascripts/cable.coffee
index 0bd1757505..476d90ef72 100644
--- a/lib/assets/javascripts/cable.coffee
+++ b/lib/assets/javascripts/cable.coffee
@@ -3,6 +3,8 @@
@Cable =
PING_IDENTIFIER: "_ping"
+ INTERNAL_MESSAGES:
+ SUBSCRIPTION_CONFIRMATION: 'confirm_subscription'
createConsumer: (url) ->
new Cable.Consumer url
diff --git a/lib/assets/javascripts/cable/connection.coffee b/lib/assets/javascripts/cable/connection.coffee
index 90d8fac3e1..33159130c7 100644
--- a/lib/assets/javascripts/cable/connection.coffee
+++ b/lib/assets/javascripts/cable/connection.coffee
@@ -52,8 +52,14 @@ class Cable.Connection
events:
message: (event) ->
- {identifier, message} = JSON.parse(event.data)
- @consumer.subscriptions.notify(identifier, "received", message)
+ {identifier, message, type} = JSON.parse(event.data)
+
+ if type?
+ switch type
+ when Cable.INTERNAL_MESSAGES.SUBSCRIPTION_CONFIRMATION
+ @consumer.subscriptions.notify(identifier, "connected")
+ else
+ @consumer.subscriptions.notify(identifier, "received", message)
open: ->
@disconnected = false
diff --git a/lib/assets/javascripts/cable/subscriptions.coffee b/lib/assets/javascripts/cable/subscriptions.coffee
index eeaa697081..4efb384ee2 100644
--- a/lib/assets/javascripts/cable/subscriptions.coffee
+++ b/lib/assets/javascripts/cable/subscriptions.coffee
@@ -21,13 +21,11 @@ class Cable.Subscriptions
add: (subscription) ->
@subscriptions.push(subscription)
@notify(subscription, "initialized")
- if @sendCommand(subscription, "subscribe")
- @notify(subscription, "connected")
+ @sendCommand(subscription, "subscribe")
reload: ->
for subscription in @subscriptions
- if @sendCommand(subscription, "subscribe")
- @notify(subscription, "connected")
+ @sendCommand(subscription, "subscribe")
remove: (subscription) ->
@subscriptions = (s for s in @subscriptions when s isnt subscription)
diff --git a/test/channel/base_test.rb b/test/channel/base_test.rb
index bac8569780..7eb8e15845 100644
--- a/test/channel/base_test.rb
+++ b/test/channel/base_test.rb
@@ -139,4 +139,10 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase
expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "message" => { "data" => "latest" }
assert_equal expected, @connection.last_transmission
end
+
+ test "subscription confirmation" do
+ expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "confirm_subscription"
+ assert_equal expected, @connection.last_transmission
+ end
+
end
diff --git a/test/channel/stream_test.rb b/test/channel/stream_test.rb
index 4e0248d7b4..08cfef5736 100644
--- a/test/channel/stream_test.rb
+++ b/test/channel/stream_test.rb
@@ -12,28 +12,45 @@ class ActionCable::Channel::StreamTest < ActionCable::TestCase
end
end
- setup do
- @connection = TestConnection.new
- end
-
test "streaming start and stop" do
run_in_eventmachine do
- @connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("test_room_1").returns stub_everything(:pubsub) }
- channel = ChatChannel.new @connection, "{id: 1}", { id: 1 }
+ connection = TestConnection.new
+ connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("test_room_1").returns stub_everything(:pubsub) }
+ channel = ChatChannel.new connection, "{id: 1}", { id: 1 }
- @connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe_proc) }
+ connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe_proc) }
channel.unsubscribe_from_channel
end
end
test "stream_for" do
run_in_eventmachine do
+ connection = TestConnection.new
EM.next_tick do
- @connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire").returns stub_everything(:pubsub) }
+ connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire").returns stub_everything(:pubsub) }
end
- channel = ChatChannel.new @connection, ""
+ channel = ChatChannel.new connection, ""
channel.stream_for Room.new(1)
end
end
+
+ test "stream_from subscription confirmation" do
+ EM.run do
+ connection = TestConnection.new
+ connection.expects(:pubsub).returns EM::Hiredis.connect.pubsub
+
+ channel = ChatChannel.new connection, "{id: 1}", { id: 1 }
+ assert_nil connection.last_transmission
+
+ EM::Timer.new(0.1) do
+ expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "confirm_subscription"
+ assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation within 0.1s"
+
+ EM.run_deferred_callbacks
+ EM.stop
+ end
+ end
+ end
+
end
diff --git a/test/test_helper.rb b/test/test_helper.rb
index f8a9971077..935e50e900 100644
--- a/test/test_helper.rb
+++ b/test/test_helper.rb
@@ -8,6 +8,7 @@ Bundler.setup
Bundler.require :default, :test
require 'puma'
+require 'em-hiredis'
require 'mocha/mini_test'
require 'rack/mock'