diff options
Diffstat (limited to 'actioncable')
29 files changed, 332 insertions, 284 deletions
diff --git a/actioncable/CHANGELOG.md b/actioncable/CHANGELOG.md index dec6f7c027..137c88d91b 100644 --- a/actioncable/CHANGELOG.md +++ b/actioncable/CHANGELOG.md @@ -1,3 +1,16 @@ +* Prevent race where the client could receive and act upon a + subscription confirmation before the channel's `subscribed` method + completed. + + Fixes #25381. + + *Vladimir Dementyev* + +* Buffer writes to websocket connections, to avoid blocking threads + that could be doing more useful things. + + *Matthew Draper*, *Tinco Andringa* + * Protect against concurrent writes to a websocket connection from multiple threads; the underlying OS write is not always threadsafe. diff --git a/actioncable/README.md b/actioncable/README.md index 28e2602cbf..a0b7412dd4 100644 --- a/actioncable/README.md +++ b/actioncable/README.md @@ -167,7 +167,7 @@ App.cable.subscriptions.create "AppearanceChannel", buttonSelector = "[data-behavior~=appear_away]" install: -> - $(document).on "page:change.appearance", => + $(document).on "turbolinks:load.appearance", => @appear() $(document).on "click.appearance", buttonSelector, => diff --git a/actioncable/Rakefile b/actioncable/Rakefile index 34649547a2..648de57004 100644 --- a/actioncable/Rakefile +++ b/actioncable/Rakefile @@ -21,7 +21,7 @@ namespace :test do task :isolated do Dir.glob("test/**/*_test.rb").all? do |file| sh(Gem.ruby, "-w", "-Ilib:test", file) - end or raise "Failures" + end || raise("Failures") end task :integration do diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb index 2e589a2cfa..a866044f95 100644 --- a/actioncable/lib/action_cable/channel/base.rb +++ b/actioncable/lib/action_cable/channel/base.rb @@ -144,13 +144,14 @@ module ActionCable # When a channel is streaming via pubsub, we want to delay the confirmation # transmission until pubsub subscription is confirmed. - @defer_subscription_confirmation = false + # + # The counter starts at 1 because it's awaiting a call to #subscribe_to_channel + @defer_subscription_confirmation_counter = Concurrent::AtomicFixnum.new(1) @reject_subscription = nil @subscription_confirmation_sent = nil delegate_connection_identifiers - subscribe_to_channel end # Extract the action name from the passed data and process it via the channel. The process will ensure @@ -169,6 +170,17 @@ module ActionCable end end + # This method is called after subscription has been added to the connection + # and confirms or rejects the subscription. + def subscribe_to_channel + run_callbacks :subscribe do + subscribed + end + + reject_subscription if subscription_rejected? + ensure_confirmation_sent + end + # Called by the cable connection when it's cut, so the channel has a chance to cleanup with callbacks. # This method is not intended to be called directly by the user. Instead, overwrite the #unsubscribed callback. def unsubscribe_from_channel # :nodoc: @@ -201,12 +213,18 @@ module ActionCable end end + def ensure_confirmation_sent + return if subscription_rejected? + @defer_subscription_confirmation_counter.decrement + transmit_subscription_confirmation unless defer_subscription_confirmation? + end + def defer_subscription_confirmation! - @defer_subscription_confirmation = true + @defer_subscription_confirmation_counter.increment end def defer_subscription_confirmation? - @defer_subscription_confirmation + @defer_subscription_confirmation_counter.value > 0 end def subscription_confirmation_sent? @@ -230,18 +248,6 @@ module ActionCable end end - def subscribe_to_channel - run_callbacks :subscribe do - subscribed - end - - if subscription_rejected? - reject_subscription - else - transmit_subscription_confirmation unless defer_subscription_confirmation? - end - end - def extract_action(data) (data["action"].presence || :receive).to_sym end diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index 561750d713..dbba333353 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -69,8 +69,8 @@ module ActionCable # Start streaming from the named <tt>broadcasting</tt> pubsub queue. Optionally, you can pass a <tt>callback</tt> that'll be used # instead of the default of just transmitting the updates straight to the subscriber. - # Pass `coder: ActiveSupport::JSON` to decode messages as JSON before passing to the callback. - # Defaults to `coder: nil` which does no decoding, passes raw messages. + # Pass <tt>coder: ActiveSupport::JSON</tt> to decode messages as JSON before passing to the callback. + # Defaults to <tt>coder: nil</tt> which does no decoding, passes raw messages. def stream_from(broadcasting, callback = nil, coder: nil, &block) broadcasting = String(broadcasting) @@ -84,7 +84,7 @@ module ActionCable connection.server.event_loop.post do pubsub.subscribe(broadcasting, handler, lambda do - transmit_subscription_confirmation + ensure_confirmation_sent logger.info "#{self.class.name} is streaming from #{broadcasting}" end) end @@ -94,8 +94,8 @@ module ActionCable # <tt>callback</tt> that'll be used instead of the default of just transmitting the updates straight # to the subscriber. # - # Pass `coder: ActiveSupport::JSON` to decode messages as JSON before passing to the callback. - # Defaults to `coder: nil` which does no decoding, passes raw messages. + # Pass <tt>coder: ActiveSupport::JSON</tt> to decode messages as JSON before passing to the callback. + # Defaults to <tt>coder: nil</tt> which does no decoding, passes raw messages. def stream_for(model, callback = nil, coder: nil, &block) stream_from(broadcasting_for([ channel_name, model ]), callback || block, coder: coder) end diff --git a/actioncable/lib/action_cable/connection.rb b/actioncable/lib/action_cable/connection.rb index 5f813cf8e0..902efb07e2 100644 --- a/actioncable/lib/action_cable/connection.rb +++ b/actioncable/lib/action_cable/connection.rb @@ -8,8 +8,6 @@ module ActionCable autoload :ClientSocket autoload :Identification autoload :InternalChannel - autoload :FayeClientSocket - autoload :FayeEventLoop autoload :MessageBuffer autoload :Stream autoload :StreamEventLoop diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb index b0615b08a1..06f4f5edd3 100644 --- a/actioncable/lib/action_cable/connection/base.rb +++ b/actioncable/lib/action_cable/connection/base.rb @@ -57,7 +57,7 @@ module ActionCable @worker_pool = server.worker_pool @logger = new_tagged_logger - @websocket = ActionCable::Connection::WebSocket.new(env, self, event_loop, server.config.client_socket_class) + @websocket = ActionCable::Connection::WebSocket.new(env, self, event_loop) @subscriptions = ActionCable::Connection::Subscriptions.new(self) @message_buffer = ActionCable::Connection::MessageBuffer.new(self) @@ -105,7 +105,7 @@ module ActionCable worker_pool.async_invoke(self, method, *arguments) end - # Return a basic hash of statistics for the connection keyed with `identifier`, `started_at`, and `subscriptions`. + # Return a basic hash of statistics for the connection keyed with <tt>identifier</tt>, <tt>started_at</tt>, <tt>subscriptions</tt>, and <tt>request_id</tt>. # This can be returned by a health check against the connection. def statistics { diff --git a/actioncable/lib/action_cable/connection/client_socket.rb b/actioncable/lib/action_cable/connection/client_socket.rb index c48f000d9c..70a2bbecb1 100644 --- a/actioncable/lib/action_cable/connection/client_socket.rb +++ b/actioncable/lib/action_cable/connection/client_socket.rb @@ -89,7 +89,7 @@ module ActionCable code ||= 1000 reason ||= "" - unless code == 1000 or (code >= 3000 and code <= 4999) + unless code == 1000 || (code >= 3000 && code <= 4999) raise ArgumentError, "Failed to execute 'close' on WebSocket: " + "The code must be either 1000, or between 3000 and 4999. " + "#{code} is neither." diff --git a/actioncable/lib/action_cable/connection/faye_client_socket.rb b/actioncable/lib/action_cable/connection/faye_client_socket.rb deleted file mode 100644 index 06e92c5d52..0000000000 --- a/actioncable/lib/action_cable/connection/faye_client_socket.rb +++ /dev/null @@ -1,48 +0,0 @@ -require "faye/websocket" - -module ActionCable - module Connection - class FayeClientSocket - def initialize(env, event_target, stream_event_loop, protocols) - @env = env - @event_target = event_target - @protocols = protocols - - @faye = nil - end - - def alive? - @faye && @faye.ready_state == Faye::WebSocket::API::OPEN - end - - def transmit(data) - connect - @faye.send data - end - - def close - @faye && @faye.close - end - - def protocol - @faye && @faye.protocol - end - - def rack_response - connect - @faye.rack_response - end - - private - def connect - return if @faye - @faye = Faye::WebSocket.new(@env, @protocols) - - @faye.on(:open) { |event| @event_target.on_open } - @faye.on(:message) { |event| @event_target.on_message(event.data) } - @faye.on(:close) { |event| @event_target.on_close(event.reason, event.code) } - @faye.on(:error) { |event| @event_target.on_error(event.message) } - end - end - end -end diff --git a/actioncable/lib/action_cable/connection/faye_event_loop.rb b/actioncable/lib/action_cable/connection/faye_event_loop.rb deleted file mode 100644 index cfbe26ee6a..0000000000 --- a/actioncable/lib/action_cable/connection/faye_event_loop.rb +++ /dev/null @@ -1,44 +0,0 @@ -require "thread" - -require "eventmachine" -EventMachine.epoll if EventMachine.epoll? -EventMachine.kqueue if EventMachine.kqueue? - -module ActionCable - module Connection - class FayeEventLoop - @@mutex = Mutex.new - - def timer(interval, &block) - ensure_reactor_running - EMTimer.new(::EM::PeriodicTimer.new(interval, &block)) - end - - def post(task = nil, &block) - task ||= block - - ensure_reactor_running - ::EM.next_tick(&task) - end - - private - def ensure_reactor_running - return if EventMachine.reactor_running? - @@mutex.synchronize do - Thread.new { EventMachine.run } unless EventMachine.reactor_running? - Thread.pass until EventMachine.reactor_running? - end - end - - class EMTimer - def initialize(inner) - @inner = inner - end - - def shutdown - @inner.cancel - end - end - end - end -end diff --git a/actioncable/lib/action_cable/connection/stream.rb b/actioncable/lib/action_cable/connection/stream.rb index 5a2aace0ba..e620b93845 100644 --- a/actioncable/lib/action_cable/connection/stream.rb +++ b/actioncable/lib/action_cable/connection/stream.rb @@ -14,6 +14,9 @@ module ActionCable @rack_hijack_io = nil @write_lock = Mutex.new + + @write_head = nil + @write_buffer = Queue.new end def each(&callback) @@ -30,14 +33,62 @@ module ActionCable end def write(data) - @write_lock.synchronize do - return @rack_hijack_io.write(data) if @rack_hijack_io - return @stream_send.call(data) if @stream_send + if @stream_send + return @stream_send.call(data) end + + if @write_lock.try_lock + begin + if @write_head.nil? && @write_buffer.empty? + written = @rack_hijack_io.write_nonblock(data, exception: false) + + case written + when :wait_writable + # proceed below + when data.bytesize + return data.bytesize + else + @write_head = data.byteslice(written, data.bytesize) + @event_loop.writes_pending @rack_hijack_io + + return data.bytesize + end + end + ensure + @write_lock.unlock + end + end + + @write_buffer << data + @event_loop.writes_pending @rack_hijack_io + + data.bytesize rescue EOFError, Errno::ECONNRESET @socket_object.client_gone end + def flush_write_buffer + @write_lock.synchronize do + loop do + if @write_head.nil? + return true if @write_buffer.empty? + @write_head = @write_buffer.pop + end + + written = @rack_hijack_io.write_nonblock(@write_head, exception: false) + case written + when :wait_writable + return false + when @write_head.bytesize + @write_head = nil + else + @write_head = @write_head.byteslice(written, @write_head.bytesize) + return false + end + end + end + end + def receive(data) @socket_object.parse(data) end @@ -55,7 +106,6 @@ module ActionCable def clean_rack_hijack return unless @rack_hijack_io @event_loop.detach(@rack_hijack_io, self) - @rack_hijack_io.close @rack_hijack_io = nil end end diff --git a/actioncable/lib/action_cable/connection/stream_event_loop.rb b/actioncable/lib/action_cable/connection/stream_event_loop.rb index 106b948c45..2d1af0ff9f 100644 --- a/actioncable/lib/action_cable/connection/stream_event_loop.rb +++ b/actioncable/lib/action_cable/connection/stream_event_loop.rb @@ -5,7 +5,7 @@ module ActionCable module Connection class StreamEventLoop def initialize - @nio = @thread = nil + @nio = @executor = @thread = nil @map = {} @stopping = false @todo = Queue.new @@ -20,13 +20,14 @@ module ActionCable def post(task = nil, &block) task ||= block - Concurrent.global_io_executor << task + spawn + @executor << task end def attach(io, stream) @todo << lambda do - @map[io] = stream - @nio.register(io, :r) + @map[io] = @nio.register(io, :r) + @map[io].value = stream end wakeup end @@ -35,6 +36,16 @@ module ActionCable @todo << lambda do @nio.deregister io @map.delete io + io.close + end + wakeup + end + + def writes_pending(io) + @todo << lambda do + if monitor = @map[io] + monitor.interests = :rw + end end wakeup end @@ -52,6 +63,13 @@ module ActionCable return if @thread && @thread.status @nio ||= NIO::Selector.new + + @executor ||= Concurrent::ThreadPoolExecutor.new( + min_threads: 1, + max_threads: 10, + max_queue: 0, + ) + @thread = Thread.new { run } return true @@ -77,12 +95,25 @@ module ActionCable monitors.each do |monitor| io = monitor.io - stream = @map[io] + stream = monitor.value begin - stream.receive io.read_nonblock(4096) - rescue IO::WaitReadable - next + if monitor.writable? + if stream.flush_write_buffer + monitor.interests = :r + end + next unless monitor.readable? + end + + incoming = io.read_nonblock(4096, exception: false) + case incoming + when :wait_readable + next + when nil + stream.close + else + stream.receive incoming + end rescue # We expect one of EOFError or Errno::ECONNRESET in # normal operation (when the client goes away). But if diff --git a/actioncable/lib/action_cable/connection/subscriptions.rb b/actioncable/lib/action_cable/connection/subscriptions.rb index 9060183249..00511aead5 100644 --- a/actioncable/lib/action_cable/connection/subscriptions.rb +++ b/actioncable/lib/action_cable/connection/subscriptions.rb @@ -26,10 +26,14 @@ module ActionCable id_key = data["identifier"] id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access + return if subscriptions.key?(id_key) + subscription_klass = id_options[:channel].safe_constantize if subscription_klass && ActionCable::Channel::Base >= subscription_klass - subscriptions[id_key] ||= subscription_klass.new(connection, id_key, id_options) + subscription = subscription_klass.new(connection, id_key, id_options) + subscriptions[id_key] = subscription + subscription.subscribe_to_channel else logger.error "Subscription class not found: #{id_options[:channel].inspect}" end diff --git a/actioncable/lib/action_cable/connection/web_socket.rb b/actioncable/lib/action_cable/connection/web_socket.rb index 52d8daad4b..382141b89f 100644 --- a/actioncable/lib/action_cable/connection/web_socket.rb +++ b/actioncable/lib/action_cable/connection/web_socket.rb @@ -4,8 +4,8 @@ module ActionCable module Connection # Wrap the real socket to minimize the externally-presented API class WebSocket - def initialize(env, event_target, event_loop, client_socket_class, protocols: ActionCable::INTERNAL[:protocols]) - @websocket = ::WebSocket::Driver.websocket?(env) ? client_socket_class.new(env, event_target, event_loop, protocols) : nil + def initialize(env, event_target, event_loop, protocols: ActionCable::INTERNAL[:protocols]) + @websocket = ::WebSocket::Driver.websocket?(env) ? ClientSocket.new(env, event_target, event_loop, protocols) : nil end def possible? diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb index c700297a8d..419eccd73c 100644 --- a/actioncable/lib/action_cable/server/base.rb +++ b/actioncable/lib/action_cable/server/base.rb @@ -37,9 +37,13 @@ module ActionCable connections.each(&:close) @mutex.synchronize do - worker_pool.halt if @worker_pool - + # Shutdown the worker pool + @worker_pool.halt if @worker_pool @worker_pool = nil + + # Shutdown the pub/sub adapter + @pubsub.shutdown if @pubsub + @pubsub = nil end end @@ -49,12 +53,12 @@ module ActionCable end def event_loop - @event_loop || @mutex.synchronize { @event_loop ||= config.event_loop_class.new } + @event_loop || @mutex.synchronize { @event_loop ||= ActionCable::Connection::StreamEventLoop.new } end # The worker pool is where we run connection callbacks and channel actions. We do as little as possible on the server's main thread. # The worker pool is an executor service that's backed by a pool of threads working from a task queue. The thread pool size maxes out - # at 4 worker threads by default. Tune the size yourself with `config.action_cable.worker_pool_size`. + # at 4 worker threads by default. Tune the size yourself with <tt>config.action_cable.worker_pool_size</tt>. # # Using Active Record, Redis, etc within your channel actions means you'll get a separate connection from each thread in the worker pool. # Plan your deployment accordingly: 5 servers each running 5 Puma workers each running an 8-thread worker pool means at least 200 database diff --git a/actioncable/lib/action_cable/server/configuration.rb b/actioncable/lib/action_cable/server/configuration.rb index 7153593d4c..dc146f07b0 100644 --- a/actioncable/lib/action_cable/server/configuration.rb +++ b/actioncable/lib/action_cable/server/configuration.rb @@ -4,7 +4,7 @@ module ActionCable # in a Rails config initializer. class Configuration attr_accessor :logger, :log_tags - attr_accessor :use_faye, :connection_class, :worker_pool_size + attr_accessor :connection_class, :worker_pool_size attr_accessor :disable_request_forgery_protection, :allowed_request_origins attr_accessor :cable, :url, :mount_path @@ -35,22 +35,6 @@ module ActionCable adapter = "PostgreSQL" if adapter == "Postgresql" "ActionCable::SubscriptionAdapter::#{adapter}".constantize end - - def event_loop_class - if use_faye - ActionCable::Connection::FayeEventLoop - else - ActionCable::Connection::StreamEventLoop - end - end - - def client_socket_class - if use_faye - ActionCable::Connection::FayeClientSocket - else - ActionCable::Connection::ClientSocket - end - end end end end diff --git a/actioncable/lib/action_cable/server/worker.rb b/actioncable/lib/action_cable/server/worker.rb index 7460472551..43639c27af 100644 --- a/actioncable/lib/action_cable/server/worker.rb +++ b/actioncable/lib/action_cable/server/worker.rb @@ -25,7 +25,7 @@ module ActionCable # Stop processing work: any work that has not already started # running will be discarded from the queue def halt - @executor.kill + @executor.shutdown end def stopping? diff --git a/actioncable/test/channel/base_test.rb b/actioncable/test/channel/base_test.rb index 2bb3214f74..9a3a3581e6 100644 --- a/actioncable/test/channel/base_test.rb +++ b/actioncable/test/channel/base_test.rb @@ -77,11 +77,13 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase @channel = ChatChannel.new @connection, "{id: 1}", id: 1 end - test "should subscribe to a channel on initialize" do + test "should subscribe to a channel" do + @channel.subscribe_to_channel assert_equal 1, @channel.room.id end test "on subscribe callbacks" do + @channel.subscribe_to_channel assert @channel.subscribed end @@ -90,6 +92,8 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase end test "unsubscribing from a channel" do + @channel.subscribe_to_channel + assert @channel.room assert @channel.subscribed? @@ -150,8 +154,13 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase assert_equal expected, @connection.last_transmission end - test "subscription confirmation" do + test "do not send subscription confirmation on initialize" do + assert_nil @connection.last_transmission + end + + test "subscription confirmation on subscribe_to_channel" do expected = { "identifier" => "{id: 1}", "type" => "confirm_subscription" } + @channel.subscribe_to_channel assert_equal expected, @connection.last_transmission end @@ -208,6 +217,8 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase test "notification for transmit_subscription_confirmation" do begin + @channel.subscribe_to_channel + events = [] ActiveSupport::Notifications.subscribe "transmit_subscription_confirmation.action_cable" do |*args| events << ActiveSupport::Notifications::Event.new(*args) diff --git a/actioncable/test/channel/periodic_timers_test.rb b/actioncable/test/channel/periodic_timers_test.rb index 529abb9535..2ee711fd29 100644 --- a/actioncable/test/channel/periodic_timers_test.rb +++ b/actioncable/test/channel/periodic_timers_test.rb @@ -62,6 +62,7 @@ class ActionCable::Channel::PeriodicTimersTest < ActiveSupport::TestCase @connection.server.event_loop.expects(:timer).times(3).returns(stub(shutdown: nil)) channel = ChatChannel.new @connection, "{id: 1}", id: 1 + channel.subscribe_to_channel channel.unsubscribe_from_channel assert_equal [], channel.send(:active_periodic_timers) end diff --git a/actioncable/test/channel/rejection_test.rb b/actioncable/test/channel/rejection_test.rb index faf35ad048..99c4a7603a 100644 --- a/actioncable/test/channel/rejection_test.rb +++ b/actioncable/test/channel/rejection_test.rb @@ -20,6 +20,7 @@ class ActionCable::Channel::RejectionTest < ActiveSupport::TestCase test "subscription rejection" do @connection.expects(:subscriptions).returns mock().tap { |m| m.expects(:remove_subscription).with instance_of(SecretChannel) } @channel = SecretChannel.new @connection, "{id: 1}", id: 1 + @channel.subscribe_to_channel expected = { "identifier" => "{id: 1}", "type" => "reject_subscription" } assert_equal expected, @connection.last_transmission @@ -28,6 +29,7 @@ class ActionCable::Channel::RejectionTest < ActiveSupport::TestCase test "does not execute action if subscription is rejected" do @connection.expects(:subscriptions).returns mock().tap { |m| m.expects(:remove_subscription).with instance_of(SecretChannel) } @channel = SecretChannel.new @connection, "{id: 1}", id: 1 + @channel.subscribe_to_channel expected = { "identifier" => "{id: 1}", "type" => "reject_subscription" } assert_equal expected, @connection.last_transmission diff --git a/actioncable/test/channel/stream_test.rb b/actioncable/test/channel/stream_test.rb index da26f81a5c..31dcde2e29 100644 --- a/actioncable/test/channel/stream_test.rb +++ b/actioncable/test/channel/stream_test.rb @@ -53,6 +53,7 @@ module ActionCable::StreamTests connection = TestConnection.new connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("test_room_1", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } channel = ChatChannel.new connection, "{id: 1}", id: 1 + channel.subscribe_to_channel connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe) } channel.unsubscribe_from_channel @@ -64,6 +65,7 @@ module ActionCable::StreamTests connection = TestConnection.new connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("channel", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } channel = SymbolChannel.new connection, "" + channel.subscribe_to_channel connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe) } channel.unsubscribe_from_channel @@ -76,6 +78,7 @@ module ActionCable::StreamTests connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:stream_tests:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } channel = ChatChannel.new connection, "" + channel.subscribe_to_channel channel.stream_for Room.new(1) end end @@ -84,7 +87,9 @@ module ActionCable::StreamTests run_in_eventmachine do connection = TestConnection.new - ChatChannel.new connection, "{id: 1}", id: 1 + channel = ChatChannel.new connection, "{id: 1}", id: 1 + channel.subscribe_to_channel + assert_nil connection.last_transmission wait_for_async @@ -114,7 +119,7 @@ module ActionCable::StreamTests end end - require "action_cable/subscription_adapter/inline" + require "action_cable/subscription_adapter/async" class UserCallbackChannel < ActionCable::Channel::Base def subscribed @@ -124,9 +129,16 @@ module ActionCable::StreamTests end end - class StreamEncodingTest < ActionCable::TestCase + class MultiChatChannel < ActionCable::Channel::Base + def subscribed + stream_from "main_room" + stream_from "test_all_rooms" + end + end + + class StreamFromTest < ActionCable::TestCase setup do - @server = TestServer.new(subscription_adapter: ActionCable::SubscriptionAdapter::Inline) + @server = TestServer.new(subscription_adapter: ActionCable::SubscriptionAdapter::Async) @server.config.allowed_request_origins = %w( http://rubyonrails.com ) end @@ -153,6 +165,17 @@ module ActionCable::StreamTests end end + test "subscription confirmation should only be sent out once with muptiple stream_from" do + run_in_eventmachine do + connection = open_connection + expected = { "identifier" => { "channel" => MultiChatChannel.name }.to_json, "type" => "confirm_subscription" } + connection.websocket.expects(:transmit).with(expected.to_json) + receive(connection, command: "subscribe", channel: MultiChatChannel.name, identifiers: {}) + + wait_for_async + end + end + private def subscribe_to(connection, identifiers:) receive connection, command: "subscribe", identifiers: identifiers diff --git a/actioncable/test/client_test.rb b/actioncable/test/client_test.rb index 2e3821828f..db10a7ad16 100644 --- a/actioncable/test/client_test.rb +++ b/actioncable/test/client_test.rb @@ -1,13 +1,31 @@ require "test_helper" require "concurrent" -require "faye/websocket" +require "websocket-client-simple" require "json" require "active_support/hash_with_indifferent_access" +#### +# 😷 Warning suppression 😷 +WebSocket::Frame::Handler::Handler03.prepend Module.new { + def initialize(*) + @application_data_buffer = nil + super + end +} + +WebSocket::Frame::Data.prepend Module.new { + def initialize(*) + @masking_key = nil + super + end +} +# +#### + class ClientTest < ActionCable::TestCase - WAIT_WHEN_EXPECTING_EVENT = 8 + WAIT_WHEN_EXPECTING_EVENT = 2 WAIT_WHEN_NOT_EXPECTING_EVENT = 0.5 class EchoChannel < ActionCable::Channel::Base @@ -39,20 +57,9 @@ class ClientTest < ActionCable::TestCase server.config.logger = Logger.new(StringIO.new).tap { |l| l.level = Logger::UNKNOWN } server.config.cable = ActiveSupport::HashWithIndifferentAccess.new(adapter: "async") - server.config.use_faye = ENV["FAYE"].present? # and now the "real" setup for our test: server.config.disable_request_forgery_protection = true - - Thread.new { EventMachine.run } unless EventMachine.reactor_running? - Thread.pass until EventMachine.reactor_running? - - # faye-websocket is warning-rich - @previous_verbose, $VERBOSE = $VERBOSE, nil - end - - def teardown - $VERBOSE = @previous_verbose end def with_puma_server(rack_app = ActionCable.server, port = 3099) @@ -73,44 +80,49 @@ class ClientTest < ActionCable::TestCase attr_reader :pings def initialize(port) - @ws = Faye::WebSocket::Client.new("ws://127.0.0.1:#{port}/") - @messages = Queue.new - @closed = Concurrent::Event.new - @has_messages = Concurrent::Semaphore.new(0) - @pings = 0 - - open = Concurrent::Event.new - error = nil - - @ws.on(:error) do |event| - if open.set? - @messages << RuntimeError.new(event.message) - else - error = event.message - open.set + messages = @messages = Queue.new + closed = @closed = Concurrent::Event.new + has_messages = @has_messages = Concurrent::Semaphore.new(0) + pings = @pings = Concurrent::AtomicFixnum.new(0) + + open = Concurrent::Promise.new + + @ws = WebSocket::Client::Simple.connect("ws://127.0.0.1:#{port}/") do |ws| + ws.on(:error) do |event| + event = RuntimeError.new(event.message) unless event.is_a?(Exception) + + if open.pending? + open.fail(event) + else + messages << event + has_messages.release + end end - end - @ws.on(:open) do |event| - open.set - end + ws.on(:open) do |event| + open.set(true) + end - @ws.on(:message) do |event| - message = JSON.parse(event.data) - if message["type"] == "ping" - @pings += 1 - else - @messages << message - @has_messages.release + ws.on(:message) do |event| + if event.type == :close + closed.set + else + message = JSON.parse(event.data) + if message["type"] == "ping" + pings.increment + else + messages << message + has_messages.release + end + end end - end - @ws.on(:close) do |event| - @closed.set + ws.on(:close) do |event| + closed.set + end end - open.wait(WAIT_WHEN_EXPECTING_EVENT) - raise error if error + open.wait!(WAIT_WHEN_EXPECTING_EVENT) end def read_message @@ -161,13 +173,17 @@ class ClientTest < ActionCable::TestCase end end - def faye_client(port) + def websocket_client(port) SyncClient.new(port) end + def concurrently(enum) + enum.map { |*x| Concurrent::Future.execute { yield(*x) } }.map(&:value!) + end + def test_single_client with_puma_server do |port| - c = faye_client(port) + c = websocket_client(port) assert_equal({ "type" => "welcome" }, c.read_message) # pop the first welcome message off the stack c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel") assert_equal({ "identifier"=>"{\"channel\":\"ClientTest::EchoChannel\"}", "type"=>"confirm_subscription" }, c.read_message) @@ -179,12 +195,12 @@ class ClientTest < ActionCable::TestCase def test_interacting_clients with_puma_server do |port| - clients = 10.times.map { faye_client(port) } + clients = concurrently(10.times) { websocket_client(port) } barrier_1 = Concurrent::CyclicBarrier.new(clients.size) barrier_2 = Concurrent::CyclicBarrier.new(clients.size) - clients.map { |c| Concurrent::Future.execute { + concurrently(clients) do |c| assert_equal({ "type" => "welcome" }, c.read_message) # pop the first welcome message off the stack c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel") assert_equal({ "identifier"=>'{"channel":"ClientTest::EchoChannel"}', "type"=>"confirm_subscription" }, c.read_message) @@ -194,38 +210,38 @@ class ClientTest < ActionCable::TestCase c.send_message command: "message", identifier: JSON.generate(channel: "ClientTest::EchoChannel"), data: JSON.generate(action: "bulk", message: "hello") barrier_2.wait WAIT_WHEN_EXPECTING_EVENT assert_equal clients.size, c.read_messages(clients.size).size - } }.each(&:wait!) + end - clients.map { |c| Concurrent::Future.execute { c.close } }.each(&:wait!) + concurrently(clients, &:close) end end def test_many_clients with_puma_server do |port| - clients = 100.times.map { faye_client(port) } + clients = concurrently(100.times) { websocket_client(port) } - clients.map { |c| Concurrent::Future.execute { + concurrently(clients) do |c| assert_equal({ "type" => "welcome" }, c.read_message) # pop the first welcome message off the stack c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel") assert_equal({ "identifier"=>'{"channel":"ClientTest::EchoChannel"}', "type"=>"confirm_subscription" }, c.read_message) c.send_message command: "message", identifier: JSON.generate(channel: "ClientTest::EchoChannel"), data: JSON.generate(action: "ding", message: "hello") assert_equal({ "identifier"=>'{"channel":"ClientTest::EchoChannel"}', "message"=>{ "dong"=>"hello" } }, c.read_message) - } }.each(&:wait!) + end - clients.map { |c| Concurrent::Future.execute { c.close } }.each(&:wait!) + concurrently(clients, &:close) end end def test_disappearing_client with_puma_server do |port| - c = faye_client(port) + c = websocket_client(port) assert_equal({ "type" => "welcome" }, c.read_message) # pop the first welcome message off the stack c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel") assert_equal({ "identifier"=>"{\"channel\":\"ClientTest::EchoChannel\"}", "type"=>"confirm_subscription" }, c.read_message) c.send_message command: "message", identifier: JSON.generate(channel: "ClientTest::EchoChannel"), data: JSON.generate(action: "delay", message: "hello") c.close # disappear before write - c = faye_client(port) + c = websocket_client(port) assert_equal({ "type" => "welcome" }, c.read_message) # pop the first welcome message off the stack c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel") assert_equal({ "identifier"=>"{\"channel\":\"ClientTest::EchoChannel\"}", "type"=>"confirm_subscription" }, c.read_message) @@ -240,7 +256,7 @@ class ClientTest < ActionCable::TestCase app = ActionCable.server identifier = JSON.generate(channel: "ClientTest::EchoChannel") - c = faye_client(port) + c = websocket_client(port) assert_equal({ "type" => "welcome" }, c.read_message) c.send_message command: "subscribe", identifier: identifier assert_equal({ "identifier"=>"{\"channel\":\"ClientTest::EchoChannel\"}", "type"=>"confirm_subscription" }, c.read_message) @@ -261,7 +277,7 @@ class ClientTest < ActionCable::TestCase def test_server_restart with_puma_server do |port| - c = faye_client(port) + c = websocket_client(port) assert_equal({ "type" => "welcome" }, c.read_message) c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel") assert_equal({ "identifier"=>"{\"channel\":\"ClientTest::EchoChannel\"}", "type"=>"confirm_subscription" }, c.read_message) diff --git a/actioncable/test/connection/client_socket_test.rb b/actioncable/test/connection/client_socket_test.rb index 5043a63370..bc3ff6a3d7 100644 --- a/actioncable/test/connection/client_socket_test.rb +++ b/actioncable/test/connection/client_socket_test.rb @@ -33,8 +33,6 @@ class ActionCable::Connection::ClientSocketTest < ActionCable::TestCase end test "delegate socket errors to on_error handler" do - skip if ENV["FAYE"].present? - run_in_eventmachine do connection = open_connection @@ -49,16 +47,16 @@ class ActionCable::Connection::ClientSocketTest < ActionCable::TestCase end test "closes hijacked i/o socket at shutdown" do - skip if ENV["FAYE"].present? - run_in_eventmachine do connection = open_connection client = connection.websocket.send(:websocket) + event = Concurrent::Event.new client.instance_variable_get("@stream") .instance_variable_get("@rack_hijack_io") - .expects(:close) + .define_singleton_method(:close) { event.set } connection.close + event.wait end end @@ -67,7 +65,13 @@ class ActionCable::Connection::ClientSocketTest < ActionCable::TestCase env = Rack::MockRequest.env_for "/test", "HTTP_CONNECTION" => "upgrade", "HTTP_UPGRADE" => "websocket", "HTTP_HOST" => "localhost", "HTTP_ORIGIN" => "http://rubyonrails.com" - env["rack.hijack"] = -> { env["rack.hijack_io"] = StringIO.new } + io = \ + begin + Socket.pair(Socket::AF_UNIX, Socket::SOCK_STREAM, 0).first + rescue + StringIO.new + end + env["rack.hijack"] = -> { env["rack.hijack_io"] = io } Connection.new(@server, env).tap do |connection| connection.process diff --git a/actioncable/test/connection/stream_test.rb b/actioncable/test/connection/stream_test.rb index 4128b32f15..36e1d3c095 100644 --- a/actioncable/test/connection/stream_test.rb +++ b/actioncable/test/connection/stream_test.rb @@ -34,8 +34,6 @@ class ActionCable::Connection::StreamTest < ActionCable::TestCase [ EOFError, Errno::ECONNRESET ].each do |closed_exception| test "closes socket on #{closed_exception}" do - skip if ENV["FAYE"].present? - run_in_eventmachine do connection = open_connection diff --git a/actioncable/test/server/base_test.rb b/actioncable/test/server/base_test.rb new file mode 100644 index 0000000000..f0a51c5a7d --- /dev/null +++ b/actioncable/test/server/base_test.rb @@ -0,0 +1,33 @@ +require "test_helper" +require "stubs/test_server" +require "active_support/core_ext/hash/indifferent_access" + +class BaseTest < ActiveSupport::TestCase + def setup + @server = ActionCable::Server::Base.new + @server.config.cable = { adapter: "async" }.with_indifferent_access + end + + class FakeConnection + def close + end + end + + test "#restart closes all open connections" do + conn = FakeConnection.new + @server.add_connection(conn) + + conn.expects(:close) + @server.restart + end + + test "#restart shuts down worker pool" do + @server.worker_pool.expects(:halt) + @server.restart + end + + test "#restart shuts down pub/sub adapter" do + @server.pubsub.expects(:shutdown) + @server.restart + end +end diff --git a/actioncable/test/stubs/test_server.rb b/actioncable/test/stubs/test_server.rb index b64ff33789..5bf2a151dc 100644 --- a/actioncable/test/stubs/test_server.rb +++ b/actioncable/test/stubs/test_server.rb @@ -10,12 +10,6 @@ class TestServer @logger = ActiveSupport::TaggedLogging.new ActiveSupport::Logger.new(StringIO.new) @config = OpenStruct.new(log_tags: [], subscription_adapter: subscription_adapter) - @config.use_faye = ENV["FAYE"].present? - @config.client_socket_class = if @config.use_faye - ActionCable::Connection::FayeClientSocket - else - ActionCable::Connection::ClientSocket - end @mutex = Monitor.new end @@ -25,10 +19,8 @@ class TestServer end def event_loop - @event_loop ||= if @config.use_faye - ActionCable::Connection::FayeEventLoop.new - else - ActionCable::Connection::StreamEventLoop.new + @event_loop ||= ActionCable::Connection::StreamEventLoop.new.tap do |loop| + loop.instance_variable_set(:@executor, Concurrent.global_io_executor) end end diff --git a/actioncable/test/subscription_adapter/common.rb b/actioncable/test/subscription_adapter/common.rb index 1538157995..3aa88c2caa 100644 --- a/actioncable/test/subscription_adapter/common.rb +++ b/actioncable/test/subscription_adapter/common.rb @@ -11,7 +11,7 @@ module CommonSubscriptionAdapterTest def setup server = ActionCable::Server::Base.new server.config.cable = cable_config.with_indifferent_access - server.config.use_faye = ENV["FAYE"].present? + server.config.logger = Logger.new(StringIO.new).tap { |l| l.level = Logger::UNKNOWN } adapter_klass = server.config.pubsub_adapter @@ -20,7 +20,7 @@ module CommonSubscriptionAdapterTest end def teardown - [@rx_adapter, @tx_adapter].uniq.each(&:shutdown) + [@rx_adapter, @tx_adapter].uniq.compact.each(&:shutdown) end def subscribe_as_queue(channel, adapter = @rx_adapter) diff --git a/actioncable/test/subscription_adapter/evented_redis_test.rb b/actioncable/test/subscription_adapter/evented_redis_test.rb index d6ca0e77cb..f316bc46ef 100644 --- a/actioncable/test/subscription_adapter/evented_redis_test.rb +++ b/actioncable/test/subscription_adapter/evented_redis_test.rb @@ -12,6 +12,14 @@ class EventedRedisAdapterTest < ActionCable::TestCase end def teardown + super + + # Ensure EM is shut down before we re-enable warnings + EventMachine.reactor_thread.tap do |thread| + EventMachine.stop + thread.join + end + $VERBOSE = @previous_verbose end diff --git a/actioncable/test/test_helper.rb b/actioncable/test/test_helper.rb index 39855ea252..a47032753b 100644 --- a/actioncable/test/test_helper.rb +++ b/actioncable/test/test_helper.rb @@ -13,41 +13,7 @@ end # Require all the stubs and models Dir[File.dirname(__FILE__) + "/stubs/*.rb"].each { |file| require file } -if ENV["FAYE"].present? - require "faye/websocket" - class << Faye::WebSocket - remove_method :ensure_reactor_running - - # We don't want Faye to start the EM reactor in tests because it makes testing much harder. - # We want to be able to start and stop EM loop in tests to make things simpler. - def ensure_reactor_running - # no-op - end - end -end - -module EventMachineConcurrencyHelpers - def wait_for_async - EM.run_deferred_callbacks - end - - def run_in_eventmachine - failure = nil - EM.run do - begin - yield - rescue => ex - failure = ex - ensure - wait_for_async - EM.stop if EM.reactor_running? - end - end - raise failure if failure - end -end - -module ConcurrentRubyConcurrencyHelpers +class ActionCable::TestCase < ActiveSupport::TestCase def wait_for_async wait_for_executor Concurrent.global_io_executor end @@ -56,18 +22,14 @@ module ConcurrentRubyConcurrencyHelpers yield wait_for_async end -end - -class ActionCable::TestCase < ActiveSupport::TestCase - if ENV["FAYE"].present? - include EventMachineConcurrencyHelpers - else - include ConcurrentRubyConcurrencyHelpers - end def wait_for_executor(executor) + # do not wait forever, wait 2s + timeout = 2 until executor.completed_task_count == executor.scheduled_task_count sleep 0.1 + timeout -= 0.1 + raise "Executor could not complete all tasks in 2 seconds" unless timeout > 0 end end end |