aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable')
-rw-r--r--actioncable/lib/action_cable/channel/base.rb26
-rw-r--r--actioncable/lib/action_cable/channel/streams.rb3
-rw-r--r--actioncable/lib/action_cable/connection/subscriptions.rb4
-rw-r--r--actioncable/test/channel/base_test.rb9
-rw-r--r--actioncable/test/channel/stream_test.rb29
-rw-r--r--actioncable/test/test_helper.rb4
6 files changed, 60 insertions, 15 deletions
diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb
index 2e589a2cfa..1d9038b76a 100644
--- a/actioncable/lib/action_cable/channel/base.rb
+++ b/actioncable/lib/action_cable/channel/base.rb
@@ -144,7 +144,9 @@ module ActionCable
# When a channel is streaming via pubsub, we want to delay the confirmation
# transmission until pubsub subscription is confirmed.
- @defer_subscription_confirmation = false
+ #
+ # We use atomic fixnum to track the number of waiting tasks to avoid race conditions
+ @defer_subscription_confirmation_counter = Concurrent::AtomicFixnum.new(1)
@reject_subscription = nil
@subscription_confirmation_sent = nil
@@ -153,6 +155,14 @@ module ActionCable
subscribe_to_channel
end
+ # This method is called after subscription has been added to the channel.
+ # Send confirmation here to avoid race conditions when client tries to perform actions
+ # right after receiving confirmation.
+ def registered!
+ @defer_subscription_confirmation_counter.decrement
+ transmit_subscription_confirmation unless defer_subscription_confirmation?
+ end
+
# Extract the action name from the passed data and process it via the channel. The process will ensure
# that the action requested is a public method on the channel declared by the user (so not one of the callbacks
# like #subscribed).
@@ -202,17 +212,21 @@ module ActionCable
end
def defer_subscription_confirmation!
- @defer_subscription_confirmation = true
+ @defer_subscription_confirmation_counter.increment
end
def defer_subscription_confirmation?
- @defer_subscription_confirmation
+ @defer_subscription_confirmation_counter.value.positive?
end
def subscription_confirmation_sent?
@subscription_confirmation_sent
end
+ def registered?
+ @registered
+ end
+
def reject
@reject_subscription = true
end
@@ -235,11 +249,7 @@ module ActionCable
subscribed
end
- if subscription_rejected?
- reject_subscription
- else
- transmit_subscription_confirmation unless defer_subscription_confirmation?
- end
+ reject_subscription if subscription_rejected?
end
def extract_action(data)
diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb
index 561750d713..c9b58f3a04 100644
--- a/actioncable/lib/action_cable/channel/streams.rb
+++ b/actioncable/lib/action_cable/channel/streams.rb
@@ -84,7 +84,8 @@ module ActionCable
connection.server.event_loop.post do
pubsub.subscribe(broadcasting, handler, lambda do
- transmit_subscription_confirmation
+ @defer_subscription_confirmation_counter.decrement
+ transmit_subscription_confirmation unless defer_subscription_confirmation?
logger.info "#{self.class.name} is streaming from #{broadcasting}"
end)
end
diff --git a/actioncable/lib/action_cable/connection/subscriptions.rb b/actioncable/lib/action_cable/connection/subscriptions.rb
index 9060183249..ccac802332 100644
--- a/actioncable/lib/action_cable/connection/subscriptions.rb
+++ b/actioncable/lib/action_cable/connection/subscriptions.rb
@@ -29,7 +29,9 @@ module ActionCable
subscription_klass = id_options[:channel].safe_constantize
if subscription_klass && ActionCable::Channel::Base >= subscription_klass
- subscriptions[id_key] ||= subscription_klass.new(connection, id_key, id_options)
+ subscription = subscription_klass.new(connection, id_key, id_options)
+ subscriptions[id_key] ||= subscription
+ subscription.registered!
else
logger.error "Subscription class not found: #{id_options[:channel].inspect}"
end
diff --git a/actioncable/test/channel/base_test.rb b/actioncable/test/channel/base_test.rb
index 2bb3214f74..7a332cdc3c 100644
--- a/actioncable/test/channel/base_test.rb
+++ b/actioncable/test/channel/base_test.rb
@@ -150,8 +150,13 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase
assert_equal expected, @connection.last_transmission
end
- test "subscription confirmation" do
+ test "do not send subscription confirmation on initialize" do
+ assert_nil @connection.last_transmission
+ end
+
+ test "subscription confirmation on registration" do
expected = { "identifier" => "{id: 1}", "type" => "confirm_subscription" }
+ @channel.registered!
assert_equal expected, @connection.last_transmission
end
@@ -208,6 +213,8 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase
test "notification for transmit_subscription_confirmation" do
begin
+ @channel.registered!
+
events = []
ActiveSupport::Notifications.subscribe "transmit_subscription_confirmation.action_cable" do |*args|
events << ActiveSupport::Notifications::Event.new(*args)
diff --git a/actioncable/test/channel/stream_test.rb b/actioncable/test/channel/stream_test.rb
index da26f81a5c..b1f7560f9a 100644
--- a/actioncable/test/channel/stream_test.rb
+++ b/actioncable/test/channel/stream_test.rb
@@ -84,7 +84,9 @@ module ActionCable::StreamTests
run_in_eventmachine do
connection = TestConnection.new
- ChatChannel.new connection, "{id: 1}", id: 1
+ channel = ChatChannel.new connection, "{id: 1}", id: 1
+ channel.registered!
+
assert_nil connection.last_transmission
wait_for_async
@@ -101,6 +103,7 @@ module ActionCable::StreamTests
connection = TestConnection.new
channel = ChatChannel.new connection, "test_channel"
+ channel.registered!
channel.send_confirmation
channel.send_confirmation
@@ -114,7 +117,7 @@ module ActionCable::StreamTests
end
end
- require "action_cable/subscription_adapter/inline"
+ require "action_cable/subscription_adapter/async"
class UserCallbackChannel < ActionCable::Channel::Base
def subscribed
@@ -124,9 +127,16 @@ module ActionCable::StreamTests
end
end
- class StreamEncodingTest < ActionCable::TestCase
+ class MultiChatChannel < ActionCable::Channel::Base
+ def subscribed
+ stream_from "main_room"
+ stream_from "test_all_rooms"
+ end
+ end
+
+ class StreamFromTest < ActionCable::TestCase
setup do
- @server = TestServer.new(subscription_adapter: ActionCable::SubscriptionAdapter::Inline)
+ @server = TestServer.new(subscription_adapter: ActionCable::SubscriptionAdapter::Async)
@server.config.allowed_request_origins = %w( http://rubyonrails.com )
end
@@ -153,6 +163,17 @@ module ActionCable::StreamTests
end
end
+ test "subscription confirmation should only be sent out once with muptiple stream_from" do
+ run_in_eventmachine do
+ connection = open_connection
+ expected = { "identifier" => { "channel" => MultiChatChannel.name }.to_json, "type" => "confirm_subscription" }
+ connection.websocket.expects(:transmit).with(expected.to_json)
+ receive(connection, command: "subscribe", channel: MultiChatChannel.name, identifiers: {})
+
+ wait_for_async
+ end
+ end
+
private
def subscribe_to(connection, identifiers:)
receive connection, command: "subscribe", identifiers: identifiers
diff --git a/actioncable/test/test_helper.rb b/actioncable/test/test_helper.rb
index 39855ea252..af3c7eee1d 100644
--- a/actioncable/test/test_helper.rb
+++ b/actioncable/test/test_helper.rb
@@ -66,8 +66,12 @@ class ActionCable::TestCase < ActiveSupport::TestCase
end
def wait_for_executor(executor)
+ # do not wait forever, wait 2s
+ timeout = 2
until executor.completed_task_count == executor.scheduled_task_count
sleep 0.1
+ timeout -= 0.1
+ raise "Executor could not complete all tasks in 2 seconds" unless timeout > 0
end
end
end