diff options
Diffstat (limited to 'actioncable/lib/action_cable/connection/stream_event_loop.rb')
-rw-r--r-- | actioncable/lib/action_cable/connection/stream_event_loop.rb | 53 |
1 files changed, 43 insertions, 10 deletions
diff --git a/actioncable/lib/action_cable/connection/stream_event_loop.rb b/actioncable/lib/action_cable/connection/stream_event_loop.rb index 2abad09c03..d95afc50ba 100644 --- a/actioncable/lib/action_cable/connection/stream_event_loop.rb +++ b/actioncable/lib/action_cable/connection/stream_event_loop.rb @@ -1,11 +1,13 @@ -require 'nio' -require 'thread' +# frozen_string_literal: true + +require "nio" +require "thread" module ActionCable module Connection class StreamEventLoop def initialize - @nio = @thread = nil + @nio = @executor = @thread = nil @map = {} @stopping = false @todo = Queue.new @@ -20,13 +22,14 @@ module ActionCable def post(task = nil, &block) task ||= block - Concurrent.global_io_executor << task + spawn + @executor << task end def attach(io, stream) @todo << lambda do - @map[io] = stream - @nio.register(io, :r) + @map[io] = @nio.register(io, :r) + @map[io].value = stream end wakeup end @@ -35,6 +38,16 @@ module ActionCable @todo << lambda do @nio.deregister io @map.delete io + io.close + end + wakeup + end + + def writes_pending(io) + @todo << lambda do + if monitor = @map[io] + monitor.interests = :rw + end end wakeup end @@ -52,6 +65,13 @@ module ActionCable return if @thread && @thread.status @nio ||= NIO::Selector.new + + @executor ||= Concurrent::ThreadPoolExecutor.new( + min_threads: 1, + max_threads: 10, + max_queue: 0, + ) + @thread = Thread.new { run } return true @@ -77,12 +97,25 @@ module ActionCable monitors.each do |monitor| io = monitor.io - stream = @map[io] + stream = monitor.value begin - stream.receive io.read_nonblock(4096) - rescue IO::WaitReadable - next + if monitor.writable? + if stream.flush_write_buffer + monitor.interests = :r + end + next unless monitor.readable? + end + + incoming = io.read_nonblock(4096, exception: false) + case incoming + when :wait_readable + next + when nil + stream.close + else + stream.receive incoming + end rescue # We expect one of EOFError or Errno::ECONNRESET in # normal operation (when the client goes away). But if |