From 3e68d8b872b48ecb45268a7e5fcb937e68f2724f Mon Sep 17 00:00:00 2001 From: palkan Date: Wed, 21 Sep 2016 02:57:10 +0300 Subject: Add Channel#ensure_confirmation_sent; call #subscribe_to_channel after initializing --- actioncable/lib/action_cable/channel/base.rb | 42 ++++++++++------------ actioncable/lib/action_cable/channel/streams.rb | 3 +- .../lib/action_cable/connection/subscriptions.rb | 6 ++-- actioncable/test/channel/base_test.rb | 12 ++++--- actioncable/test/channel/periodic_timers_test.rb | 1 + actioncable/test/channel/rejection_test.rb | 2 ++ actioncable/test/channel/stream_test.rb | 6 ++-- 7 files changed, 39 insertions(+), 33 deletions(-) diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb index 1d9038b76a..a866044f95 100644 --- a/actioncable/lib/action_cable/channel/base.rb +++ b/actioncable/lib/action_cable/channel/base.rb @@ -145,22 +145,13 @@ module ActionCable # When a channel is streaming via pubsub, we want to delay the confirmation # transmission until pubsub subscription is confirmed. # - # We use atomic fixnum to track the number of waiting tasks to avoid race conditions + # The counter starts at 1 because it's awaiting a call to #subscribe_to_channel @defer_subscription_confirmation_counter = Concurrent::AtomicFixnum.new(1) @reject_subscription = nil @subscription_confirmation_sent = nil delegate_connection_identifiers - subscribe_to_channel - end - - # This method is called after subscription has been added to the channel. - # Send confirmation here to avoid race conditions when client tries to perform actions - # right after receiving confirmation. - def registered! - @defer_subscription_confirmation_counter.decrement - transmit_subscription_confirmation unless defer_subscription_confirmation? end # Extract the action name from the passed data and process it via the channel. The process will ensure @@ -179,6 +170,17 @@ module ActionCable end end + # This method is called after subscription has been added to the connection + # and confirms or rejects the subscription. + def subscribe_to_channel + run_callbacks :subscribe do + subscribed + end + + reject_subscription if subscription_rejected? + ensure_confirmation_sent + end + # Called by the cable connection when it's cut, so the channel has a chance to cleanup with callbacks. # This method is not intended to be called directly by the user. Instead, overwrite the #unsubscribed callback. def unsubscribe_from_channel # :nodoc: @@ -211,22 +213,24 @@ module ActionCable end end + def ensure_confirmation_sent + return if subscription_rejected? + @defer_subscription_confirmation_counter.decrement + transmit_subscription_confirmation unless defer_subscription_confirmation? + end + def defer_subscription_confirmation! @defer_subscription_confirmation_counter.increment end def defer_subscription_confirmation? - @defer_subscription_confirmation_counter.value.positive? + @defer_subscription_confirmation_counter.value > 0 end def subscription_confirmation_sent? @subscription_confirmation_sent end - def registered? - @registered - end - def reject @reject_subscription = true end @@ -244,14 +248,6 @@ module ActionCable end end - def subscribe_to_channel - run_callbacks :subscribe do - subscribed - end - - reject_subscription if subscription_rejected? - end - def extract_action(data) (data["action"].presence || :receive).to_sym end diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index c9b58f3a04..e480b93df0 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -84,8 +84,7 @@ module ActionCable connection.server.event_loop.post do pubsub.subscribe(broadcasting, handler, lambda do - @defer_subscription_confirmation_counter.decrement - transmit_subscription_confirmation unless defer_subscription_confirmation? + ensure_confirmation_sent logger.info "#{self.class.name} is streaming from #{broadcasting}" end) end diff --git a/actioncable/lib/action_cable/connection/subscriptions.rb b/actioncable/lib/action_cable/connection/subscriptions.rb index ccac802332..00511aead5 100644 --- a/actioncable/lib/action_cable/connection/subscriptions.rb +++ b/actioncable/lib/action_cable/connection/subscriptions.rb @@ -26,12 +26,14 @@ module ActionCable id_key = data["identifier"] id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access + return if subscriptions.key?(id_key) + subscription_klass = id_options[:channel].safe_constantize if subscription_klass && ActionCable::Channel::Base >= subscription_klass subscription = subscription_klass.new(connection, id_key, id_options) - subscriptions[id_key] ||= subscription - subscription.registered! + subscriptions[id_key] = subscription + subscription.subscribe_to_channel else logger.error "Subscription class not found: #{id_options[:channel].inspect}" end diff --git a/actioncable/test/channel/base_test.rb b/actioncable/test/channel/base_test.rb index 7a332cdc3c..9a3a3581e6 100644 --- a/actioncable/test/channel/base_test.rb +++ b/actioncable/test/channel/base_test.rb @@ -77,11 +77,13 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase @channel = ChatChannel.new @connection, "{id: 1}", id: 1 end - test "should subscribe to a channel on initialize" do + test "should subscribe to a channel" do + @channel.subscribe_to_channel assert_equal 1, @channel.room.id end test "on subscribe callbacks" do + @channel.subscribe_to_channel assert @channel.subscribed end @@ -90,6 +92,8 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase end test "unsubscribing from a channel" do + @channel.subscribe_to_channel + assert @channel.room assert @channel.subscribed? @@ -154,9 +158,9 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase assert_nil @connection.last_transmission end - test "subscription confirmation on registration" do + test "subscription confirmation on subscribe_to_channel" do expected = { "identifier" => "{id: 1}", "type" => "confirm_subscription" } - @channel.registered! + @channel.subscribe_to_channel assert_equal expected, @connection.last_transmission end @@ -213,7 +217,7 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase test "notification for transmit_subscription_confirmation" do begin - @channel.registered! + @channel.subscribe_to_channel events = [] ActiveSupport::Notifications.subscribe "transmit_subscription_confirmation.action_cable" do |*args| diff --git a/actioncable/test/channel/periodic_timers_test.rb b/actioncable/test/channel/periodic_timers_test.rb index 529abb9535..2ee711fd29 100644 --- a/actioncable/test/channel/periodic_timers_test.rb +++ b/actioncable/test/channel/periodic_timers_test.rb @@ -62,6 +62,7 @@ class ActionCable::Channel::PeriodicTimersTest < ActiveSupport::TestCase @connection.server.event_loop.expects(:timer).times(3).returns(stub(shutdown: nil)) channel = ChatChannel.new @connection, "{id: 1}", id: 1 + channel.subscribe_to_channel channel.unsubscribe_from_channel assert_equal [], channel.send(:active_periodic_timers) end diff --git a/actioncable/test/channel/rejection_test.rb b/actioncable/test/channel/rejection_test.rb index faf35ad048..99c4a7603a 100644 --- a/actioncable/test/channel/rejection_test.rb +++ b/actioncable/test/channel/rejection_test.rb @@ -20,6 +20,7 @@ class ActionCable::Channel::RejectionTest < ActiveSupport::TestCase test "subscription rejection" do @connection.expects(:subscriptions).returns mock().tap { |m| m.expects(:remove_subscription).with instance_of(SecretChannel) } @channel = SecretChannel.new @connection, "{id: 1}", id: 1 + @channel.subscribe_to_channel expected = { "identifier" => "{id: 1}", "type" => "reject_subscription" } assert_equal expected, @connection.last_transmission @@ -28,6 +29,7 @@ class ActionCable::Channel::RejectionTest < ActiveSupport::TestCase test "does not execute action if subscription is rejected" do @connection.expects(:subscriptions).returns mock().tap { |m| m.expects(:remove_subscription).with instance_of(SecretChannel) } @channel = SecretChannel.new @connection, "{id: 1}", id: 1 + @channel.subscribe_to_channel expected = { "identifier" => "{id: 1}", "type" => "reject_subscription" } assert_equal expected, @connection.last_transmission diff --git a/actioncable/test/channel/stream_test.rb b/actioncable/test/channel/stream_test.rb index b1f7560f9a..31dcde2e29 100644 --- a/actioncable/test/channel/stream_test.rb +++ b/actioncable/test/channel/stream_test.rb @@ -53,6 +53,7 @@ module ActionCable::StreamTests connection = TestConnection.new connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("test_room_1", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } channel = ChatChannel.new connection, "{id: 1}", id: 1 + channel.subscribe_to_channel connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe) } channel.unsubscribe_from_channel @@ -64,6 +65,7 @@ module ActionCable::StreamTests connection = TestConnection.new connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("channel", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } channel = SymbolChannel.new connection, "" + channel.subscribe_to_channel connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe) } channel.unsubscribe_from_channel @@ -76,6 +78,7 @@ module ActionCable::StreamTests connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:stream_tests:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } channel = ChatChannel.new connection, "" + channel.subscribe_to_channel channel.stream_for Room.new(1) end end @@ -85,7 +88,7 @@ module ActionCable::StreamTests connection = TestConnection.new channel = ChatChannel.new connection, "{id: 1}", id: 1 - channel.registered! + channel.subscribe_to_channel assert_nil connection.last_transmission @@ -103,7 +106,6 @@ module ActionCable::StreamTests connection = TestConnection.new channel = ChatChannel.new connection, "test_channel" - channel.registered! channel.send_confirmation channel.send_confirmation -- cgit v1.2.3