aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable/connection/stream_event_loop.rb
diff options
context:
space:
mode:
authorDavid Heinemeier Hansson <david@loudthinking.com>2016-01-24 16:00:15 +0100
committerDavid Heinemeier Hansson <david@loudthinking.com>2016-01-24 16:00:15 +0100
commit53a9da4cf2d99c73bb2b71c84098c9fe32d3cedc (patch)
tree631e1157afb6bfe4e0e5c4ffdc4a69ef6a713f1b /actioncable/lib/action_cable/connection/stream_event_loop.rb
parent3bebb65e86baac1792499c94679a4d4a0cc4acbd (diff)
parent503fe757c7f5f917deab95acdcd421a1dede05c7 (diff)
downloadrails-53a9da4cf2d99c73bb2b71c84098c9fe32d3cedc.tar.gz
rails-53a9da4cf2d99c73bb2b71c84098c9fe32d3cedc.tar.bz2
rails-53a9da4cf2d99c73bb2b71c84098c9fe32d3cedc.zip
Merge pull request #23152 from matthewd/actioncable-concurrent
Eliminate the EventMachine dependency
Diffstat (limited to 'actioncable/lib/action_cable/connection/stream_event_loop.rb')
-rw-r--r--actioncable/lib/action_cable/connection/stream_event_loop.rb68
1 files changed, 68 insertions, 0 deletions
diff --git a/actioncable/lib/action_cable/connection/stream_event_loop.rb b/actioncable/lib/action_cable/connection/stream_event_loop.rb
new file mode 100644
index 0000000000..f773814973
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/stream_event_loop.rb
@@ -0,0 +1,68 @@
+require 'nio'
+
+module ActionCable
+ module Connection
+ class StreamEventLoop
+ def initialize
+ @nio = NIO::Selector.new
+ @map = {}
+ @stopping = false
+ @todo = Queue.new
+
+ Thread.new do
+ Thread.current.abort_on_exception = true
+ run
+ end
+ end
+
+ def attach(io, stream)
+ @todo << lambda do
+ @map[io] = stream
+ @nio.register(io, :r)
+ end
+ @nio.wakeup
+ end
+
+ def detach(io, stream)
+ @todo << lambda do
+ @nio.deregister(io)
+ @map.delete io
+ end
+ @nio.wakeup
+ end
+
+ def stop
+ @stopping = true
+ @nio.wakeup
+ end
+
+ def run
+ loop do
+ if @stopping
+ @nio.close
+ break
+ end
+
+ until @todo.empty?
+ @todo.pop(true).call
+ end
+
+ if monitors = @nio.select
+ monitors.each do |monitor|
+ io = monitor.io
+ stream = @map[io]
+
+ begin
+ stream.receive io.read_nonblock(4096)
+ rescue IO::WaitReadable
+ next
+ rescue EOFError
+ stream.close
+ end
+ end
+ end
+ end
+ end
+ end
+ end
+end