diff options
-rw-r--r-- | lib/action_cable/channel/base.rb | 22 | ||||
-rw-r--r-- | lib/action_cable/channel/streams.rb | 12 | ||||
-rw-r--r-- | lib/assets/javascripts/cable.coffee | 2 | ||||
-rw-r--r-- | lib/assets/javascripts/cable/connection.coffee | 10 | ||||
-rw-r--r-- | lib/assets/javascripts/cable/subscriptions.coffee | 6 | ||||
-rw-r--r-- | test/channel/base_test.rb | 6 | ||||
-rw-r--r-- | test/channel/stream_test.rb | 35 | ||||
-rw-r--r-- | test/test_helper.rb | 1 |
8 files changed, 76 insertions, 18 deletions
diff --git a/lib/action_cable/channel/base.rb b/lib/action_cable/channel/base.rb index df87064195..0b9aa2f4ad 100644 --- a/lib/action_cable/channel/base.rb +++ b/lib/action_cable/channel/base.rb @@ -73,6 +73,8 @@ module ActionCable include Naming include Broadcasting + SUBSCRIPTION_CONFIRMATION_INTERNAL_MESSAGE = 'confirm_subscription'.freeze + on_subscribe :subscribed on_unsubscribe :unsubscribed @@ -120,6 +122,10 @@ module ActionCable @identifier = identifier @params = params + # When a channel is streaming via redis pubsub, we want to delay the confirmation + # transmission until redis pubsub subscription is confirmed. + @defer_subscription_confirmation = false + delegate_connection_identifiers subscribe_to_channel end @@ -165,6 +171,15 @@ module ActionCable end + protected + def defer_subscription_confirmation! + @defer_subscription_confirmation = true + end + + def defer_subscription_confirmation? + @defer_subscription_confirmation + end + private def delegate_connection_identifiers connection.identifiers.each do |identifier| @@ -177,6 +192,7 @@ module ActionCable def subscribe_to_channel run_subscribe_callbacks + transmit_subscription_confirmation unless defer_subscription_confirmation? end @@ -213,6 +229,12 @@ module ActionCable def run_unsubscribe_callbacks self.class.on_unsubscribe_callbacks.each { |callback| send(callback) } end + + def transmit_subscription_confirmation + logger.info "#{self.class.name} is transmitting the subscription confirmation" + connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: SUBSCRIPTION_CONFIRMATION_INTERNAL_MESSAGE) + end + end end end diff --git a/lib/action_cable/channel/streams.rb b/lib/action_cable/channel/streams.rb index 9fffdf1789..b5ffa17f72 100644 --- a/lib/action_cable/channel/streams.rb +++ b/lib/action_cable/channel/streams.rb @@ -69,12 +69,18 @@ module ActionCable # Start streaming from the named <tt>broadcasting</tt> pubsub queue. Optionally, you can pass a <tt>callback</tt> that'll be used # instead of the default of just transmitting the updates straight to the subscriber. def stream_from(broadcasting, callback = nil) - callback ||= default_stream_callback(broadcasting) + # Hold off the confirmation until pubsub#subscribe is successful + defer_subscription_confirmation! + callback ||= default_stream_callback(broadcasting) streams << [ broadcasting, callback ] - EM.next_tick { pubsub.subscribe broadcasting, &callback } - logger.info "#{self.class.name} is streaming from #{broadcasting}" + EM.next_tick do + pubsub.subscribe(broadcasting, &callback).callback do |reply| + transmit_subscription_confirmation + logger.info "#{self.class.name} is streaming from #{broadcasting}" + end + end end # Start streaming the pubsub queue for the <tt>model</tt> in this channel. Optionally, you can pass a diff --git a/lib/assets/javascripts/cable.coffee b/lib/assets/javascripts/cable.coffee index 0bd1757505..476d90ef72 100644 --- a/lib/assets/javascripts/cable.coffee +++ b/lib/assets/javascripts/cable.coffee @@ -3,6 +3,8 @@ @Cable = PING_IDENTIFIER: "_ping" + INTERNAL_MESSAGES: + SUBSCRIPTION_CONFIRMATION: 'confirm_subscription' createConsumer: (url) -> new Cable.Consumer url diff --git a/lib/assets/javascripts/cable/connection.coffee b/lib/assets/javascripts/cable/connection.coffee index 90d8fac3e1..33159130c7 100644 --- a/lib/assets/javascripts/cable/connection.coffee +++ b/lib/assets/javascripts/cable/connection.coffee @@ -52,8 +52,14 @@ class Cable.Connection events: message: (event) -> - {identifier, message} = JSON.parse(event.data) - @consumer.subscriptions.notify(identifier, "received", message) + {identifier, message, type} = JSON.parse(event.data) + + if type? + switch type + when Cable.INTERNAL_MESSAGES.SUBSCRIPTION_CONFIRMATION + @consumer.subscriptions.notify(identifier, "connected") + else + @consumer.subscriptions.notify(identifier, "received", message) open: -> @disconnected = false diff --git a/lib/assets/javascripts/cable/subscriptions.coffee b/lib/assets/javascripts/cable/subscriptions.coffee index eeaa697081..4efb384ee2 100644 --- a/lib/assets/javascripts/cable/subscriptions.coffee +++ b/lib/assets/javascripts/cable/subscriptions.coffee @@ -21,13 +21,11 @@ class Cable.Subscriptions add: (subscription) -> @subscriptions.push(subscription) @notify(subscription, "initialized") - if @sendCommand(subscription, "subscribe") - @notify(subscription, "connected") + @sendCommand(subscription, "subscribe") reload: -> for subscription in @subscriptions - if @sendCommand(subscription, "subscribe") - @notify(subscription, "connected") + @sendCommand(subscription, "subscribe") remove: (subscription) -> @subscriptions = (s for s in @subscriptions when s isnt subscription) diff --git a/test/channel/base_test.rb b/test/channel/base_test.rb index bac8569780..7eb8e15845 100644 --- a/test/channel/base_test.rb +++ b/test/channel/base_test.rb @@ -139,4 +139,10 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "message" => { "data" => "latest" } assert_equal expected, @connection.last_transmission end + + test "subscription confirmation" do + expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "confirm_subscription" + assert_equal expected, @connection.last_transmission + end + end diff --git a/test/channel/stream_test.rb b/test/channel/stream_test.rb index 4e0248d7b4..08cfef5736 100644 --- a/test/channel/stream_test.rb +++ b/test/channel/stream_test.rb @@ -12,28 +12,45 @@ class ActionCable::Channel::StreamTest < ActionCable::TestCase end end - setup do - @connection = TestConnection.new - end - test "streaming start and stop" do run_in_eventmachine do - @connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("test_room_1").returns stub_everything(:pubsub) } - channel = ChatChannel.new @connection, "{id: 1}", { id: 1 } + connection = TestConnection.new + connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("test_room_1").returns stub_everything(:pubsub) } + channel = ChatChannel.new connection, "{id: 1}", { id: 1 } - @connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe_proc) } + connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe_proc) } channel.unsubscribe_from_channel end end test "stream_for" do run_in_eventmachine do + connection = TestConnection.new EM.next_tick do - @connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire").returns stub_everything(:pubsub) } + connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire").returns stub_everything(:pubsub) } end - channel = ChatChannel.new @connection, "" + channel = ChatChannel.new connection, "" channel.stream_for Room.new(1) end end + + test "stream_from subscription confirmation" do + EM.run do + connection = TestConnection.new + connection.expects(:pubsub).returns EM::Hiredis.connect.pubsub + + channel = ChatChannel.new connection, "{id: 1}", { id: 1 } + assert_nil connection.last_transmission + + EM::Timer.new(0.1) do + expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "confirm_subscription" + assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation within 0.1s" + + EM.run_deferred_callbacks + EM.stop + end + end + end + end diff --git a/test/test_helper.rb b/test/test_helper.rb index f8a9971077..935e50e900 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -8,6 +8,7 @@ Bundler.setup Bundler.require :default, :test require 'puma' +require 'em-hiredis' require 'mocha/mini_test' require 'rack/mock' |