aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--actioncable/CHANGELOG.md8
-rw-r--r--actioncable/lib/action_cable/channel/base.rb38
-rw-r--r--actioncable/lib/action_cable/channel/streams.rb2
-rw-r--r--actioncable/lib/action_cable/connection/subscriptions.rb6
-rw-r--r--actioncable/test/channel/base_test.rb15
-rw-r--r--actioncable/test/channel/periodic_timers_test.rb1
-rw-r--r--actioncable/test/channel/rejection_test.rb2
-rw-r--r--actioncable/test/channel/stream_test.rb31
-rw-r--r--actioncable/test/test_helper.rb4
9 files changed, 83 insertions, 24 deletions
diff --git a/actioncable/CHANGELOG.md b/actioncable/CHANGELOG.md
index 2e6c2f2afc..137c88d91b 100644
--- a/actioncable/CHANGELOG.md
+++ b/actioncable/CHANGELOG.md
@@ -1,3 +1,11 @@
+* Prevent race where the client could receive and act upon a
+ subscription confirmation before the channel's `subscribed` method
+ completed.
+
+ Fixes #25381.
+
+ *Vladimir Dementyev*
+
* Buffer writes to websocket connections, to avoid blocking threads
that could be doing more useful things.
diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb
index 2e589a2cfa..a866044f95 100644
--- a/actioncable/lib/action_cable/channel/base.rb
+++ b/actioncable/lib/action_cable/channel/base.rb
@@ -144,13 +144,14 @@ module ActionCable
# When a channel is streaming via pubsub, we want to delay the confirmation
# transmission until pubsub subscription is confirmed.
- @defer_subscription_confirmation = false
+ #
+ # The counter starts at 1 because it's awaiting a call to #subscribe_to_channel
+ @defer_subscription_confirmation_counter = Concurrent::AtomicFixnum.new(1)
@reject_subscription = nil
@subscription_confirmation_sent = nil
delegate_connection_identifiers
- subscribe_to_channel
end
# Extract the action name from the passed data and process it via the channel. The process will ensure
@@ -169,6 +170,17 @@ module ActionCable
end
end
+ # This method is called after subscription has been added to the connection
+ # and confirms or rejects the subscription.
+ def subscribe_to_channel
+ run_callbacks :subscribe do
+ subscribed
+ end
+
+ reject_subscription if subscription_rejected?
+ ensure_confirmation_sent
+ end
+
# Called by the cable connection when it's cut, so the channel has a chance to cleanup with callbacks.
# This method is not intended to be called directly by the user. Instead, overwrite the #unsubscribed callback.
def unsubscribe_from_channel # :nodoc:
@@ -201,12 +213,18 @@ module ActionCable
end
end
+ def ensure_confirmation_sent
+ return if subscription_rejected?
+ @defer_subscription_confirmation_counter.decrement
+ transmit_subscription_confirmation unless defer_subscription_confirmation?
+ end
+
def defer_subscription_confirmation!
- @defer_subscription_confirmation = true
+ @defer_subscription_confirmation_counter.increment
end
def defer_subscription_confirmation?
- @defer_subscription_confirmation
+ @defer_subscription_confirmation_counter.value > 0
end
def subscription_confirmation_sent?
@@ -230,18 +248,6 @@ module ActionCable
end
end
- def subscribe_to_channel
- run_callbacks :subscribe do
- subscribed
- end
-
- if subscription_rejected?
- reject_subscription
- else
- transmit_subscription_confirmation unless defer_subscription_confirmation?
- end
- end
-
def extract_action(data)
(data["action"].presence || :receive).to_sym
end
diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb
index 13deb62662..dbba333353 100644
--- a/actioncable/lib/action_cable/channel/streams.rb
+++ b/actioncable/lib/action_cable/channel/streams.rb
@@ -84,7 +84,7 @@ module ActionCable
connection.server.event_loop.post do
pubsub.subscribe(broadcasting, handler, lambda do
- transmit_subscription_confirmation
+ ensure_confirmation_sent
logger.info "#{self.class.name} is streaming from #{broadcasting}"
end)
end
diff --git a/actioncable/lib/action_cable/connection/subscriptions.rb b/actioncable/lib/action_cable/connection/subscriptions.rb
index 9060183249..00511aead5 100644
--- a/actioncable/lib/action_cable/connection/subscriptions.rb
+++ b/actioncable/lib/action_cable/connection/subscriptions.rb
@@ -26,10 +26,14 @@ module ActionCable
id_key = data["identifier"]
id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access
+ return if subscriptions.key?(id_key)
+
subscription_klass = id_options[:channel].safe_constantize
if subscription_klass && ActionCable::Channel::Base >= subscription_klass
- subscriptions[id_key] ||= subscription_klass.new(connection, id_key, id_options)
+ subscription = subscription_klass.new(connection, id_key, id_options)
+ subscriptions[id_key] = subscription
+ subscription.subscribe_to_channel
else
logger.error "Subscription class not found: #{id_options[:channel].inspect}"
end
diff --git a/actioncable/test/channel/base_test.rb b/actioncable/test/channel/base_test.rb
index 2bb3214f74..9a3a3581e6 100644
--- a/actioncable/test/channel/base_test.rb
+++ b/actioncable/test/channel/base_test.rb
@@ -77,11 +77,13 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase
@channel = ChatChannel.new @connection, "{id: 1}", id: 1
end
- test "should subscribe to a channel on initialize" do
+ test "should subscribe to a channel" do
+ @channel.subscribe_to_channel
assert_equal 1, @channel.room.id
end
test "on subscribe callbacks" do
+ @channel.subscribe_to_channel
assert @channel.subscribed
end
@@ -90,6 +92,8 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase
end
test "unsubscribing from a channel" do
+ @channel.subscribe_to_channel
+
assert @channel.room
assert @channel.subscribed?
@@ -150,8 +154,13 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase
assert_equal expected, @connection.last_transmission
end
- test "subscription confirmation" do
+ test "do not send subscription confirmation on initialize" do
+ assert_nil @connection.last_transmission
+ end
+
+ test "subscription confirmation on subscribe_to_channel" do
expected = { "identifier" => "{id: 1}", "type" => "confirm_subscription" }
+ @channel.subscribe_to_channel
assert_equal expected, @connection.last_transmission
end
@@ -208,6 +217,8 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase
test "notification for transmit_subscription_confirmation" do
begin
+ @channel.subscribe_to_channel
+
events = []
ActiveSupport::Notifications.subscribe "transmit_subscription_confirmation.action_cable" do |*args|
events << ActiveSupport::Notifications::Event.new(*args)
diff --git a/actioncable/test/channel/periodic_timers_test.rb b/actioncable/test/channel/periodic_timers_test.rb
index 529abb9535..2ee711fd29 100644
--- a/actioncable/test/channel/periodic_timers_test.rb
+++ b/actioncable/test/channel/periodic_timers_test.rb
@@ -62,6 +62,7 @@ class ActionCable::Channel::PeriodicTimersTest < ActiveSupport::TestCase
@connection.server.event_loop.expects(:timer).times(3).returns(stub(shutdown: nil))
channel = ChatChannel.new @connection, "{id: 1}", id: 1
+ channel.subscribe_to_channel
channel.unsubscribe_from_channel
assert_equal [], channel.send(:active_periodic_timers)
end
diff --git a/actioncable/test/channel/rejection_test.rb b/actioncable/test/channel/rejection_test.rb
index faf35ad048..99c4a7603a 100644
--- a/actioncable/test/channel/rejection_test.rb
+++ b/actioncable/test/channel/rejection_test.rb
@@ -20,6 +20,7 @@ class ActionCable::Channel::RejectionTest < ActiveSupport::TestCase
test "subscription rejection" do
@connection.expects(:subscriptions).returns mock().tap { |m| m.expects(:remove_subscription).with instance_of(SecretChannel) }
@channel = SecretChannel.new @connection, "{id: 1}", id: 1
+ @channel.subscribe_to_channel
expected = { "identifier" => "{id: 1}", "type" => "reject_subscription" }
assert_equal expected, @connection.last_transmission
@@ -28,6 +29,7 @@ class ActionCable::Channel::RejectionTest < ActiveSupport::TestCase
test "does not execute action if subscription is rejected" do
@connection.expects(:subscriptions).returns mock().tap { |m| m.expects(:remove_subscription).with instance_of(SecretChannel) }
@channel = SecretChannel.new @connection, "{id: 1}", id: 1
+ @channel.subscribe_to_channel
expected = { "identifier" => "{id: 1}", "type" => "reject_subscription" }
assert_equal expected, @connection.last_transmission
diff --git a/actioncable/test/channel/stream_test.rb b/actioncable/test/channel/stream_test.rb
index da26f81a5c..31dcde2e29 100644
--- a/actioncable/test/channel/stream_test.rb
+++ b/actioncable/test/channel/stream_test.rb
@@ -53,6 +53,7 @@ module ActionCable::StreamTests
connection = TestConnection.new
connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("test_room_1", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) }
channel = ChatChannel.new connection, "{id: 1}", id: 1
+ channel.subscribe_to_channel
connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe) }
channel.unsubscribe_from_channel
@@ -64,6 +65,7 @@ module ActionCable::StreamTests
connection = TestConnection.new
connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("channel", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) }
channel = SymbolChannel.new connection, ""
+ channel.subscribe_to_channel
connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe) }
channel.unsubscribe_from_channel
@@ -76,6 +78,7 @@ module ActionCable::StreamTests
connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:stream_tests:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) }
channel = ChatChannel.new connection, ""
+ channel.subscribe_to_channel
channel.stream_for Room.new(1)
end
end
@@ -84,7 +87,9 @@ module ActionCable::StreamTests
run_in_eventmachine do
connection = TestConnection.new
- ChatChannel.new connection, "{id: 1}", id: 1
+ channel = ChatChannel.new connection, "{id: 1}", id: 1
+ channel.subscribe_to_channel
+
assert_nil connection.last_transmission
wait_for_async
@@ -114,7 +119,7 @@ module ActionCable::StreamTests
end
end
- require "action_cable/subscription_adapter/inline"
+ require "action_cable/subscription_adapter/async"
class UserCallbackChannel < ActionCable::Channel::Base
def subscribed
@@ -124,9 +129,16 @@ module ActionCable::StreamTests
end
end
- class StreamEncodingTest < ActionCable::TestCase
+ class MultiChatChannel < ActionCable::Channel::Base
+ def subscribed
+ stream_from "main_room"
+ stream_from "test_all_rooms"
+ end
+ end
+
+ class StreamFromTest < ActionCable::TestCase
setup do
- @server = TestServer.new(subscription_adapter: ActionCable::SubscriptionAdapter::Inline)
+ @server = TestServer.new(subscription_adapter: ActionCable::SubscriptionAdapter::Async)
@server.config.allowed_request_origins = %w( http://rubyonrails.com )
end
@@ -153,6 +165,17 @@ module ActionCable::StreamTests
end
end
+ test "subscription confirmation should only be sent out once with muptiple stream_from" do
+ run_in_eventmachine do
+ connection = open_connection
+ expected = { "identifier" => { "channel" => MultiChatChannel.name }.to_json, "type" => "confirm_subscription" }
+ connection.websocket.expects(:transmit).with(expected.to_json)
+ receive(connection, command: "subscribe", channel: MultiChatChannel.name, identifiers: {})
+
+ wait_for_async
+ end
+ end
+
private
def subscribe_to(connection, identifiers:)
receive connection, command: "subscribe", identifiers: identifiers
diff --git a/actioncable/test/test_helper.rb b/actioncable/test/test_helper.rb
index 39855ea252..af3c7eee1d 100644
--- a/actioncable/test/test_helper.rb
+++ b/actioncable/test/test_helper.rb
@@ -66,8 +66,12 @@ class ActionCable::TestCase < ActiveSupport::TestCase
end
def wait_for_executor(executor)
+ # do not wait forever, wait 2s
+ timeout = 2
until executor.completed_task_count == executor.scheduled_task_count
sleep 0.1
+ timeout -= 0.1
+ raise "Executor could not complete all tasks in 2 seconds" unless timeout > 0
end
end
end