From 03a209e92aeed1e724b3ff787ec77936b7163ca5 Mon Sep 17 00:00:00 2001 From: palkan Date: Mon, 19 Sep 2016 12:29:23 +0300 Subject: [Fix #25381] Avoid race condition on subscription confirmation --- actioncable/lib/action_cable/channel/base.rb | 26 +++++++++++++------ actioncable/lib/action_cable/channel/streams.rb | 3 ++- .../lib/action_cable/connection/subscriptions.rb | 4 ++- actioncable/test/channel/base_test.rb | 9 ++++++- actioncable/test/channel/stream_test.rb | 29 +++++++++++++++++++--- actioncable/test/test_helper.rb | 4 +++ 6 files changed, 60 insertions(+), 15 deletions(-) diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb index 2e589a2cfa..1d9038b76a 100644 --- a/actioncable/lib/action_cable/channel/base.rb +++ b/actioncable/lib/action_cable/channel/base.rb @@ -144,7 +144,9 @@ module ActionCable # When a channel is streaming via pubsub, we want to delay the confirmation # transmission until pubsub subscription is confirmed. - @defer_subscription_confirmation = false + # + # We use atomic fixnum to track the number of waiting tasks to avoid race conditions + @defer_subscription_confirmation_counter = Concurrent::AtomicFixnum.new(1) @reject_subscription = nil @subscription_confirmation_sent = nil @@ -153,6 +155,14 @@ module ActionCable subscribe_to_channel end + # This method is called after subscription has been added to the channel. + # Send confirmation here to avoid race conditions when client tries to perform actions + # right after receiving confirmation. + def registered! + @defer_subscription_confirmation_counter.decrement + transmit_subscription_confirmation unless defer_subscription_confirmation? + end + # Extract the action name from the passed data and process it via the channel. The process will ensure # that the action requested is a public method on the channel declared by the user (so not one of the callbacks # like #subscribed). @@ -202,17 +212,21 @@ module ActionCable end def defer_subscription_confirmation! - @defer_subscription_confirmation = true + @defer_subscription_confirmation_counter.increment end def defer_subscription_confirmation? - @defer_subscription_confirmation + @defer_subscription_confirmation_counter.value.positive? end def subscription_confirmation_sent? @subscription_confirmation_sent end + def registered? + @registered + end + def reject @reject_subscription = true end @@ -235,11 +249,7 @@ module ActionCable subscribed end - if subscription_rejected? - reject_subscription - else - transmit_subscription_confirmation unless defer_subscription_confirmation? - end + reject_subscription if subscription_rejected? end def extract_action(data) diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index 561750d713..c9b58f3a04 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -84,7 +84,8 @@ module ActionCable connection.server.event_loop.post do pubsub.subscribe(broadcasting, handler, lambda do - transmit_subscription_confirmation + @defer_subscription_confirmation_counter.decrement + transmit_subscription_confirmation unless defer_subscription_confirmation? logger.info "#{self.class.name} is streaming from #{broadcasting}" end) end diff --git a/actioncable/lib/action_cable/connection/subscriptions.rb b/actioncable/lib/action_cable/connection/subscriptions.rb index 9060183249..ccac802332 100644 --- a/actioncable/lib/action_cable/connection/subscriptions.rb +++ b/actioncable/lib/action_cable/connection/subscriptions.rb @@ -29,7 +29,9 @@ module ActionCable subscription_klass = id_options[:channel].safe_constantize if subscription_klass && ActionCable::Channel::Base >= subscription_klass - subscriptions[id_key] ||= subscription_klass.new(connection, id_key, id_options) + subscription = subscription_klass.new(connection, id_key, id_options) + subscriptions[id_key] ||= subscription + subscription.registered! else logger.error "Subscription class not found: #{id_options[:channel].inspect}" end diff --git a/actioncable/test/channel/base_test.rb b/actioncable/test/channel/base_test.rb index 2bb3214f74..7a332cdc3c 100644 --- a/actioncable/test/channel/base_test.rb +++ b/actioncable/test/channel/base_test.rb @@ -150,8 +150,13 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase assert_equal expected, @connection.last_transmission end - test "subscription confirmation" do + test "do not send subscription confirmation on initialize" do + assert_nil @connection.last_transmission + end + + test "subscription confirmation on registration" do expected = { "identifier" => "{id: 1}", "type" => "confirm_subscription" } + @channel.registered! assert_equal expected, @connection.last_transmission end @@ -208,6 +213,8 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase test "notification for transmit_subscription_confirmation" do begin + @channel.registered! + events = [] ActiveSupport::Notifications.subscribe "transmit_subscription_confirmation.action_cable" do |*args| events << ActiveSupport::Notifications::Event.new(*args) diff --git a/actioncable/test/channel/stream_test.rb b/actioncable/test/channel/stream_test.rb index da26f81a5c..b1f7560f9a 100644 --- a/actioncable/test/channel/stream_test.rb +++ b/actioncable/test/channel/stream_test.rb @@ -84,7 +84,9 @@ module ActionCable::StreamTests run_in_eventmachine do connection = TestConnection.new - ChatChannel.new connection, "{id: 1}", id: 1 + channel = ChatChannel.new connection, "{id: 1}", id: 1 + channel.registered! + assert_nil connection.last_transmission wait_for_async @@ -101,6 +103,7 @@ module ActionCable::StreamTests connection = TestConnection.new channel = ChatChannel.new connection, "test_channel" + channel.registered! channel.send_confirmation channel.send_confirmation @@ -114,7 +117,7 @@ module ActionCable::StreamTests end end - require "action_cable/subscription_adapter/inline" + require "action_cable/subscription_adapter/async" class UserCallbackChannel < ActionCable::Channel::Base def subscribed @@ -124,9 +127,16 @@ module ActionCable::StreamTests end end - class StreamEncodingTest < ActionCable::TestCase + class MultiChatChannel < ActionCable::Channel::Base + def subscribed + stream_from "main_room" + stream_from "test_all_rooms" + end + end + + class StreamFromTest < ActionCable::TestCase setup do - @server = TestServer.new(subscription_adapter: ActionCable::SubscriptionAdapter::Inline) + @server = TestServer.new(subscription_adapter: ActionCable::SubscriptionAdapter::Async) @server.config.allowed_request_origins = %w( http://rubyonrails.com ) end @@ -153,6 +163,17 @@ module ActionCable::StreamTests end end + test "subscription confirmation should only be sent out once with muptiple stream_from" do + run_in_eventmachine do + connection = open_connection + expected = { "identifier" => { "channel" => MultiChatChannel.name }.to_json, "type" => "confirm_subscription" } + connection.websocket.expects(:transmit).with(expected.to_json) + receive(connection, command: "subscribe", channel: MultiChatChannel.name, identifiers: {}) + + wait_for_async + end + end + private def subscribe_to(connection, identifiers:) receive connection, command: "subscribe", identifiers: identifiers diff --git a/actioncable/test/test_helper.rb b/actioncable/test/test_helper.rb index 39855ea252..af3c7eee1d 100644 --- a/actioncable/test/test_helper.rb +++ b/actioncable/test/test_helper.rb @@ -66,8 +66,12 @@ class ActionCable::TestCase < ActiveSupport::TestCase end def wait_for_executor(executor) + # do not wait forever, wait 2s + timeout = 2 until executor.completed_task_count == executor.scheduled_task_count sleep 0.1 + timeout -= 0.1 + raise "Executor could not complete all tasks in 2 seconds" unless timeout > 0 end end end -- cgit v1.2.3