diff options
Diffstat (limited to 'actioncable')
-rw-r--r-- | actioncable/CHANGELOG.md | 13 | ||||
-rw-r--r-- | actioncable/lib/action_cable/channel/streams.rb | 8 | ||||
-rw-r--r-- | actioncable/lib/action_cable/connection/base.rb | 2 | ||||
-rw-r--r-- | actioncable/lib/action_cable/connection/stream.rb | 57 | ||||
-rw-r--r-- | actioncable/lib/action_cable/connection/stream_event_loop.rb | 46 | ||||
-rw-r--r-- | actioncable/lib/action_cable/server/base.rb | 2 | ||||
-rw-r--r-- | actioncable/test/stubs/test_server.rb | 4 |
7 files changed, 114 insertions, 18 deletions
diff --git a/actioncable/CHANGELOG.md b/actioncable/CHANGELOG.md index dec6f7c027..137c88d91b 100644 --- a/actioncable/CHANGELOG.md +++ b/actioncable/CHANGELOG.md @@ -1,3 +1,16 @@ +* Prevent race where the client could receive and act upon a + subscription confirmation before the channel's `subscribed` method + completed. + + Fixes #25381. + + *Vladimir Dementyev* + +* Buffer writes to websocket connections, to avoid blocking threads + that could be doing more useful things. + + *Matthew Draper*, *Tinco Andringa* + * Protect against concurrent writes to a websocket connection from multiple threads; the underlying OS write is not always threadsafe. diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index e480b93df0..dbba333353 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -69,8 +69,8 @@ module ActionCable # Start streaming from the named <tt>broadcasting</tt> pubsub queue. Optionally, you can pass a <tt>callback</tt> that'll be used # instead of the default of just transmitting the updates straight to the subscriber. - # Pass `coder: ActiveSupport::JSON` to decode messages as JSON before passing to the callback. - # Defaults to `coder: nil` which does no decoding, passes raw messages. + # Pass <tt>coder: ActiveSupport::JSON</tt> to decode messages as JSON before passing to the callback. + # Defaults to <tt>coder: nil</tt> which does no decoding, passes raw messages. def stream_from(broadcasting, callback = nil, coder: nil, &block) broadcasting = String(broadcasting) @@ -94,8 +94,8 @@ module ActionCable # <tt>callback</tt> that'll be used instead of the default of just transmitting the updates straight # to the subscriber. # - # Pass `coder: ActiveSupport::JSON` to decode messages as JSON before passing to the callback. - # Defaults to `coder: nil` which does no decoding, passes raw messages. + # Pass <tt>coder: ActiveSupport::JSON</tt> to decode messages as JSON before passing to the callback. + # Defaults to <tt>coder: nil</tt> which does no decoding, passes raw messages. def stream_for(model, callback = nil, coder: nil, &block) stream_from(broadcasting_for([ channel_name, model ]), callback || block, coder: coder) end diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb index b0615b08a1..4c7fcc1434 100644 --- a/actioncable/lib/action_cable/connection/base.rb +++ b/actioncable/lib/action_cable/connection/base.rb @@ -105,7 +105,7 @@ module ActionCable worker_pool.async_invoke(self, method, *arguments) end - # Return a basic hash of statistics for the connection keyed with `identifier`, `started_at`, and `subscriptions`. + # Return a basic hash of statistics for the connection keyed with <tt>identifier</tt>, <tt>started_at</tt>, <tt>subscriptions</tt>, and <tt>request_id</tt>. # This can be returned by a health check against the connection. def statistics { diff --git a/actioncable/lib/action_cable/connection/stream.rb b/actioncable/lib/action_cable/connection/stream.rb index 5a2aace0ba..d66e1b4e41 100644 --- a/actioncable/lib/action_cable/connection/stream.rb +++ b/actioncable/lib/action_cable/connection/stream.rb @@ -14,6 +14,9 @@ module ActionCable @rack_hijack_io = nil @write_lock = Mutex.new + + @write_head = nil + @write_buffer = Queue.new end def each(&callback) @@ -30,14 +33,62 @@ module ActionCable end def write(data) - @write_lock.synchronize do - return @rack_hijack_io.write(data) if @rack_hijack_io - return @stream_send.call(data) if @stream_send + if @stream_send + return @stream_send.call(data) end + + if @write_lock.try_lock + begin + if @write_head.nil? && @write_buffer.empty? + written = @rack_hijack_io.write_nonblock(data, exception: false) + + case written + when :wait_writable + # proceed below + when data.bytesize + return data.bytesize + else + @write_head = data.byteslice(written, data.bytesize) + @event_loop.writes_pending @rack_hijack_io + + return data.bytesize + end + end + ensure + @write_lock.unlock + end + end + + @write_buffer << data + @event_loop.writes_pending @rack_hijack_io + + data.bytesize rescue EOFError, Errno::ECONNRESET @socket_object.client_gone end + def flush_write_buffer + @write_lock.synchronize do + loop do + if @write_head.nil? + return true if @write_buffer.empty? + @write_head = @write_buffer.pop + end + + written = @rack_hijack_io.write_nonblock(@write_head, exception: false) + case written + when :wait_writable + return false + when @write_head.bytesize + @write_head = nil + else + @write_head = @write_head.byteslice(written, @write_head.bytesize) + return false + end + end + end + end + def receive(data) @socket_object.parse(data) end diff --git a/actioncable/lib/action_cable/connection/stream_event_loop.rb b/actioncable/lib/action_cable/connection/stream_event_loop.rb index 106b948c45..eec24638b6 100644 --- a/actioncable/lib/action_cable/connection/stream_event_loop.rb +++ b/actioncable/lib/action_cable/connection/stream_event_loop.rb @@ -5,7 +5,7 @@ module ActionCable module Connection class StreamEventLoop def initialize - @nio = @thread = nil + @nio = @executor = @thread = nil @map = {} @stopping = false @todo = Queue.new @@ -20,13 +20,14 @@ module ActionCable def post(task = nil, &block) task ||= block - Concurrent.global_io_executor << task + spawn + @executor << task end def attach(io, stream) @todo << lambda do - @map[io] = stream - @nio.register(io, :r) + @map[io] = @nio.register(io, :r) + @map[io].value = stream end wakeup end @@ -39,6 +40,15 @@ module ActionCable wakeup end + def writes_pending(io) + @todo << lambda do + if monitor = @map[io] + monitor.interests = :rw + end + end + wakeup + end + def stop @stopping = true wakeup if @nio @@ -52,6 +62,13 @@ module ActionCable return if @thread && @thread.status @nio ||= NIO::Selector.new + + @executor ||= Concurrent::ThreadPoolExecutor.new( + min_threads: 1, + max_threads: 10, + max_queue: 0, + ) + @thread = Thread.new { run } return true @@ -77,12 +94,25 @@ module ActionCable monitors.each do |monitor| io = monitor.io - stream = @map[io] + stream = monitor.value begin - stream.receive io.read_nonblock(4096) - rescue IO::WaitReadable - next + if monitor.writable? + if stream.flush_write_buffer + monitor.interests = :r + end + next unless monitor.readable? + end + + incoming = io.read_nonblock(4096, exception: false) + case incoming + when :wait_readable + next + when nil + stream.close + else + stream.receive incoming + end rescue # We expect one of EOFError or Errno::ECONNRESET in # normal operation (when the client goes away). But if diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb index c700297a8d..dd059a553b 100644 --- a/actioncable/lib/action_cable/server/base.rb +++ b/actioncable/lib/action_cable/server/base.rb @@ -54,7 +54,7 @@ module ActionCable # The worker pool is where we run connection callbacks and channel actions. We do as little as possible on the server's main thread. # The worker pool is an executor service that's backed by a pool of threads working from a task queue. The thread pool size maxes out - # at 4 worker threads by default. Tune the size yourself with `config.action_cable.worker_pool_size`. + # at 4 worker threads by default. Tune the size yourself with <tt>config.action_cable.worker_pool_size</tt>. # # Using Active Record, Redis, etc within your channel actions means you'll get a separate connection from each thread in the worker pool. # Plan your deployment accordingly: 5 servers each running 5 Puma workers each running an 8-thread worker pool means at least 200 database diff --git a/actioncable/test/stubs/test_server.rb b/actioncable/test/stubs/test_server.rb index b64ff33789..02be72d0cb 100644 --- a/actioncable/test/stubs/test_server.rb +++ b/actioncable/test/stubs/test_server.rb @@ -28,7 +28,9 @@ class TestServer @event_loop ||= if @config.use_faye ActionCable::Connection::FayeEventLoop.new else - ActionCable::Connection::StreamEventLoop.new + ActionCable::Connection::StreamEventLoop.new.tap do |loop| + loop.instance_variable_set(:@executor, Concurrent.global_io_executor) + end end end |