aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable/connection/stream_event_loop.rb
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable/lib/action_cable/connection/stream_event_loop.rb')
-rw-r--r--actioncable/lib/action_cable/connection/stream_event_loop.rb53
1 files changed, 43 insertions, 10 deletions
diff --git a/actioncable/lib/action_cable/connection/stream_event_loop.rb b/actioncable/lib/action_cable/connection/stream_event_loop.rb
index 2abad09c03..d95afc50ba 100644
--- a/actioncable/lib/action_cable/connection/stream_event_loop.rb
+++ b/actioncable/lib/action_cable/connection/stream_event_loop.rb
@@ -1,11 +1,13 @@
-require 'nio'
-require 'thread'
+# frozen_string_literal: true
+
+require "nio"
+require "thread"
module ActionCable
module Connection
class StreamEventLoop
def initialize
- @nio = @thread = nil
+ @nio = @executor = @thread = nil
@map = {}
@stopping = false
@todo = Queue.new
@@ -20,13 +22,14 @@ module ActionCable
def post(task = nil, &block)
task ||= block
- Concurrent.global_io_executor << task
+ spawn
+ @executor << task
end
def attach(io, stream)
@todo << lambda do
- @map[io] = stream
- @nio.register(io, :r)
+ @map[io] = @nio.register(io, :r)
+ @map[io].value = stream
end
wakeup
end
@@ -35,6 +38,16 @@ module ActionCable
@todo << lambda do
@nio.deregister io
@map.delete io
+ io.close
+ end
+ wakeup
+ end
+
+ def writes_pending(io)
+ @todo << lambda do
+ if monitor = @map[io]
+ monitor.interests = :rw
+ end
end
wakeup
end
@@ -52,6 +65,13 @@ module ActionCable
return if @thread && @thread.status
@nio ||= NIO::Selector.new
+
+ @executor ||= Concurrent::ThreadPoolExecutor.new(
+ min_threads: 1,
+ max_threads: 10,
+ max_queue: 0,
+ )
+
@thread = Thread.new { run }
return true
@@ -77,12 +97,25 @@ module ActionCable
monitors.each do |monitor|
io = monitor.io
- stream = @map[io]
+ stream = monitor.value
begin
- stream.receive io.read_nonblock(4096)
- rescue IO::WaitReadable
- next
+ if monitor.writable?
+ if stream.flush_write_buffer
+ monitor.interests = :r
+ end
+ next unless monitor.readable?
+ end
+
+ incoming = io.read_nonblock(4096, exception: false)
+ case incoming
+ when :wait_readable
+ next
+ when nil
+ stream.close
+ else
+ stream.receive incoming
+ end
rescue
# We expect one of EOFError or Errno::ECONNRESET in
# normal operation (when the client goes away). But if