aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable/connection/stream_event_loop.rb
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable/lib/action_cable/connection/stream_event_loop.rb')
-rw-r--r--actioncable/lib/action_cable/connection/stream_event_loop.rb58
1 files changed, 49 insertions, 9 deletions
diff --git a/actioncable/lib/action_cable/connection/stream_event_loop.rb b/actioncable/lib/action_cable/connection/stream_event_loop.rb
index e6335082d2..2d1af0ff9f 100644
--- a/actioncable/lib/action_cable/connection/stream_event_loop.rb
+++ b/actioncable/lib/action_cable/connection/stream_event_loop.rb
@@ -1,23 +1,33 @@
-require 'nio'
-require 'thread'
+require "nio"
+require "thread"
module ActionCable
module Connection
class StreamEventLoop
def initialize
- @nio = @thread = nil
+ @nio = @executor = @thread = nil
@map = {}
@stopping = false
@todo = Queue.new
@spawn_mutex = Mutex.new
+ end
+
+ def timer(interval, &block)
+ Concurrent::TimerTask.new(execution_interval: interval, &block).tap(&:execute)
+ end
+
+ def post(task = nil, &block)
+ task ||= block
+
spawn
+ @executor << task
end
def attach(io, stream)
@todo << lambda do
- @map[io] = stream
- @nio.register(io, :r)
+ @map[io] = @nio.register(io, :r)
+ @map[io].value = stream
end
wakeup
end
@@ -26,6 +36,16 @@ module ActionCable
@todo << lambda do
@nio.deregister io
@map.delete io
+ io.close
+ end
+ wakeup
+ end
+
+ def writes_pending(io)
+ @todo << lambda do
+ if monitor = @map[io]
+ monitor.interests = :rw
+ end
end
wakeup
end
@@ -43,6 +63,13 @@ module ActionCable
return if @thread && @thread.status
@nio ||= NIO::Selector.new
+
+ @executor ||= Concurrent::ThreadPoolExecutor.new(
+ min_threads: 1,
+ max_threads: 10,
+ max_queue: 0,
+ )
+
@thread = Thread.new { run }
return true
@@ -68,12 +95,25 @@ module ActionCable
monitors.each do |monitor|
io = monitor.io
- stream = @map[io]
+ stream = monitor.value
begin
- stream.receive io.read_nonblock(4096)
- rescue IO::WaitReadable
- next
+ if monitor.writable?
+ if stream.flush_write_buffer
+ monitor.interests = :r
+ end
+ next unless monitor.readable?
+ end
+
+ incoming = io.read_nonblock(4096, exception: false)
+ case incoming
+ when :wait_readable
+ next
+ when nil
+ stream.close
+ else
+ stream.receive incoming
+ end
rescue
# We expect one of EOFError or Errno::ECONNRESET in
# normal operation (when the client goes away). But if