diff options
-rw-r--r-- | actioncable/CHANGELOG.md | 8 | ||||
-rw-r--r-- | actioncable/lib/action_cable/channel/base.rb | 38 | ||||
-rw-r--r-- | actioncable/lib/action_cable/channel/streams.rb | 2 | ||||
-rw-r--r-- | actioncable/lib/action_cable/connection/subscriptions.rb | 6 | ||||
-rw-r--r-- | actioncable/test/channel/base_test.rb | 15 | ||||
-rw-r--r-- | actioncable/test/channel/periodic_timers_test.rb | 1 | ||||
-rw-r--r-- | actioncable/test/channel/rejection_test.rb | 2 | ||||
-rw-r--r-- | actioncable/test/channel/stream_test.rb | 31 | ||||
-rw-r--r-- | actioncable/test/test_helper.rb | 4 |
9 files changed, 83 insertions, 24 deletions
diff --git a/actioncable/CHANGELOG.md b/actioncable/CHANGELOG.md index 2e6c2f2afc..137c88d91b 100644 --- a/actioncable/CHANGELOG.md +++ b/actioncable/CHANGELOG.md @@ -1,3 +1,11 @@ +* Prevent race where the client could receive and act upon a + subscription confirmation before the channel's `subscribed` method + completed. + + Fixes #25381. + + *Vladimir Dementyev* + * Buffer writes to websocket connections, to avoid blocking threads that could be doing more useful things. diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb index 2e589a2cfa..a866044f95 100644 --- a/actioncable/lib/action_cable/channel/base.rb +++ b/actioncable/lib/action_cable/channel/base.rb @@ -144,13 +144,14 @@ module ActionCable # When a channel is streaming via pubsub, we want to delay the confirmation # transmission until pubsub subscription is confirmed. - @defer_subscription_confirmation = false + # + # The counter starts at 1 because it's awaiting a call to #subscribe_to_channel + @defer_subscription_confirmation_counter = Concurrent::AtomicFixnum.new(1) @reject_subscription = nil @subscription_confirmation_sent = nil delegate_connection_identifiers - subscribe_to_channel end # Extract the action name from the passed data and process it via the channel. The process will ensure @@ -169,6 +170,17 @@ module ActionCable end end + # This method is called after subscription has been added to the connection + # and confirms or rejects the subscription. + def subscribe_to_channel + run_callbacks :subscribe do + subscribed + end + + reject_subscription if subscription_rejected? + ensure_confirmation_sent + end + # Called by the cable connection when it's cut, so the channel has a chance to cleanup with callbacks. # This method is not intended to be called directly by the user. Instead, overwrite the #unsubscribed callback. def unsubscribe_from_channel # :nodoc: @@ -201,12 +213,18 @@ module ActionCable end end + def ensure_confirmation_sent + return if subscription_rejected? + @defer_subscription_confirmation_counter.decrement + transmit_subscription_confirmation unless defer_subscription_confirmation? + end + def defer_subscription_confirmation! - @defer_subscription_confirmation = true + @defer_subscription_confirmation_counter.increment end def defer_subscription_confirmation? - @defer_subscription_confirmation + @defer_subscription_confirmation_counter.value > 0 end def subscription_confirmation_sent? @@ -230,18 +248,6 @@ module ActionCable end end - def subscribe_to_channel - run_callbacks :subscribe do - subscribed - end - - if subscription_rejected? - reject_subscription - else - transmit_subscription_confirmation unless defer_subscription_confirmation? - end - end - def extract_action(data) (data["action"].presence || :receive).to_sym end diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index 13deb62662..dbba333353 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -84,7 +84,7 @@ module ActionCable connection.server.event_loop.post do pubsub.subscribe(broadcasting, handler, lambda do - transmit_subscription_confirmation + ensure_confirmation_sent logger.info "#{self.class.name} is streaming from #{broadcasting}" end) end diff --git a/actioncable/lib/action_cable/connection/subscriptions.rb b/actioncable/lib/action_cable/connection/subscriptions.rb index 9060183249..00511aead5 100644 --- a/actioncable/lib/action_cable/connection/subscriptions.rb +++ b/actioncable/lib/action_cable/connection/subscriptions.rb @@ -26,10 +26,14 @@ module ActionCable id_key = data["identifier"] id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access + return if subscriptions.key?(id_key) + subscription_klass = id_options[:channel].safe_constantize if subscription_klass && ActionCable::Channel::Base >= subscription_klass - subscriptions[id_key] ||= subscription_klass.new(connection, id_key, id_options) + subscription = subscription_klass.new(connection, id_key, id_options) + subscriptions[id_key] = subscription + subscription.subscribe_to_channel else logger.error "Subscription class not found: #{id_options[:channel].inspect}" end diff --git a/actioncable/test/channel/base_test.rb b/actioncable/test/channel/base_test.rb index 2bb3214f74..9a3a3581e6 100644 --- a/actioncable/test/channel/base_test.rb +++ b/actioncable/test/channel/base_test.rb @@ -77,11 +77,13 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase @channel = ChatChannel.new @connection, "{id: 1}", id: 1 end - test "should subscribe to a channel on initialize" do + test "should subscribe to a channel" do + @channel.subscribe_to_channel assert_equal 1, @channel.room.id end test "on subscribe callbacks" do + @channel.subscribe_to_channel assert @channel.subscribed end @@ -90,6 +92,8 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase end test "unsubscribing from a channel" do + @channel.subscribe_to_channel + assert @channel.room assert @channel.subscribed? @@ -150,8 +154,13 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase assert_equal expected, @connection.last_transmission end - test "subscription confirmation" do + test "do not send subscription confirmation on initialize" do + assert_nil @connection.last_transmission + end + + test "subscription confirmation on subscribe_to_channel" do expected = { "identifier" => "{id: 1}", "type" => "confirm_subscription" } + @channel.subscribe_to_channel assert_equal expected, @connection.last_transmission end @@ -208,6 +217,8 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase test "notification for transmit_subscription_confirmation" do begin + @channel.subscribe_to_channel + events = [] ActiveSupport::Notifications.subscribe "transmit_subscription_confirmation.action_cable" do |*args| events << ActiveSupport::Notifications::Event.new(*args) diff --git a/actioncable/test/channel/periodic_timers_test.rb b/actioncable/test/channel/periodic_timers_test.rb index 529abb9535..2ee711fd29 100644 --- a/actioncable/test/channel/periodic_timers_test.rb +++ b/actioncable/test/channel/periodic_timers_test.rb @@ -62,6 +62,7 @@ class ActionCable::Channel::PeriodicTimersTest < ActiveSupport::TestCase @connection.server.event_loop.expects(:timer).times(3).returns(stub(shutdown: nil)) channel = ChatChannel.new @connection, "{id: 1}", id: 1 + channel.subscribe_to_channel channel.unsubscribe_from_channel assert_equal [], channel.send(:active_periodic_timers) end diff --git a/actioncable/test/channel/rejection_test.rb b/actioncable/test/channel/rejection_test.rb index faf35ad048..99c4a7603a 100644 --- a/actioncable/test/channel/rejection_test.rb +++ b/actioncable/test/channel/rejection_test.rb @@ -20,6 +20,7 @@ class ActionCable::Channel::RejectionTest < ActiveSupport::TestCase test "subscription rejection" do @connection.expects(:subscriptions).returns mock().tap { |m| m.expects(:remove_subscription).with instance_of(SecretChannel) } @channel = SecretChannel.new @connection, "{id: 1}", id: 1 + @channel.subscribe_to_channel expected = { "identifier" => "{id: 1}", "type" => "reject_subscription" } assert_equal expected, @connection.last_transmission @@ -28,6 +29,7 @@ class ActionCable::Channel::RejectionTest < ActiveSupport::TestCase test "does not execute action if subscription is rejected" do @connection.expects(:subscriptions).returns mock().tap { |m| m.expects(:remove_subscription).with instance_of(SecretChannel) } @channel = SecretChannel.new @connection, "{id: 1}", id: 1 + @channel.subscribe_to_channel expected = { "identifier" => "{id: 1}", "type" => "reject_subscription" } assert_equal expected, @connection.last_transmission diff --git a/actioncable/test/channel/stream_test.rb b/actioncable/test/channel/stream_test.rb index da26f81a5c..31dcde2e29 100644 --- a/actioncable/test/channel/stream_test.rb +++ b/actioncable/test/channel/stream_test.rb @@ -53,6 +53,7 @@ module ActionCable::StreamTests connection = TestConnection.new connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("test_room_1", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } channel = ChatChannel.new connection, "{id: 1}", id: 1 + channel.subscribe_to_channel connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe) } channel.unsubscribe_from_channel @@ -64,6 +65,7 @@ module ActionCable::StreamTests connection = TestConnection.new connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("channel", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } channel = SymbolChannel.new connection, "" + channel.subscribe_to_channel connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe) } channel.unsubscribe_from_channel @@ -76,6 +78,7 @@ module ActionCable::StreamTests connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:stream_tests:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } channel = ChatChannel.new connection, "" + channel.subscribe_to_channel channel.stream_for Room.new(1) end end @@ -84,7 +87,9 @@ module ActionCable::StreamTests run_in_eventmachine do connection = TestConnection.new - ChatChannel.new connection, "{id: 1}", id: 1 + channel = ChatChannel.new connection, "{id: 1}", id: 1 + channel.subscribe_to_channel + assert_nil connection.last_transmission wait_for_async @@ -114,7 +119,7 @@ module ActionCable::StreamTests end end - require "action_cable/subscription_adapter/inline" + require "action_cable/subscription_adapter/async" class UserCallbackChannel < ActionCable::Channel::Base def subscribed @@ -124,9 +129,16 @@ module ActionCable::StreamTests end end - class StreamEncodingTest < ActionCable::TestCase + class MultiChatChannel < ActionCable::Channel::Base + def subscribed + stream_from "main_room" + stream_from "test_all_rooms" + end + end + + class StreamFromTest < ActionCable::TestCase setup do - @server = TestServer.new(subscription_adapter: ActionCable::SubscriptionAdapter::Inline) + @server = TestServer.new(subscription_adapter: ActionCable::SubscriptionAdapter::Async) @server.config.allowed_request_origins = %w( http://rubyonrails.com ) end @@ -153,6 +165,17 @@ module ActionCable::StreamTests end end + test "subscription confirmation should only be sent out once with muptiple stream_from" do + run_in_eventmachine do + connection = open_connection + expected = { "identifier" => { "channel" => MultiChatChannel.name }.to_json, "type" => "confirm_subscription" } + connection.websocket.expects(:transmit).with(expected.to_json) + receive(connection, command: "subscribe", channel: MultiChatChannel.name, identifiers: {}) + + wait_for_async + end + end + private def subscribe_to(connection, identifiers:) receive connection, command: "subscribe", identifiers: identifiers diff --git a/actioncable/test/test_helper.rb b/actioncable/test/test_helper.rb index 39855ea252..af3c7eee1d 100644 --- a/actioncable/test/test_helper.rb +++ b/actioncable/test/test_helper.rb @@ -66,8 +66,12 @@ class ActionCable::TestCase < ActiveSupport::TestCase end def wait_for_executor(executor) + # do not wait forever, wait 2s + timeout = 2 until executor.completed_task_count == executor.scheduled_task_count sleep 0.1 + timeout -= 0.1 + raise "Executor could not complete all tasks in 2 seconds" unless timeout > 0 end end end |