aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable/connection
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable/lib/action_cable/connection')
-rw-r--r--actioncable/lib/action_cable/connection/base.rb2
-rw-r--r--actioncable/lib/action_cable/connection/client_socket.rb6
-rw-r--r--actioncable/lib/action_cable/connection/stream_event_loop.rb68
3 files changed, 50 insertions, 26 deletions
diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb
index de7ff96d7b..e23789978c 100644
--- a/actioncable/lib/action_cable/connection/base.rb
+++ b/actioncable/lib/action_cable/connection/base.rb
@@ -129,7 +129,7 @@ module ActionCable
# ignore
end
- def on_close # :nodoc:
+ def on_close(reason, code) # :nodoc:
send_async :handle_close
end
diff --git a/actioncable/lib/action_cable/connection/client_socket.rb b/actioncable/lib/action_cable/connection/client_socket.rb
index 62dd753646..ef937d7c16 100644
--- a/actioncable/lib/action_cable/connection/client_socket.rb
+++ b/actioncable/lib/action_cable/connection/client_socket.rb
@@ -37,6 +37,7 @@ module ActionCable
@url = ClientSocket.determine_url(@env)
@driver = @driver_started = nil
+ @close_params = ['', 1006]
@ready_state = CONNECTING
@@ -142,10 +143,7 @@ module ActionCable
return if @ready_state == CLOSED
@ready_state = CLOSED
- reason = @close_params ? @close_params[0] : ''
- code = @close_params ? @close_params[1] : 1006
-
- @event_target.on_close(code, reason)
+ @event_target.on_close(*@close_params)
end
end
end
diff --git a/actioncable/lib/action_cable/connection/stream_event_loop.rb b/actioncable/lib/action_cable/connection/stream_event_loop.rb
index f773814973..e6335082d2 100644
--- a/actioncable/lib/action_cable/connection/stream_event_loop.rb
+++ b/actioncable/lib/action_cable/connection/stream_event_loop.rb
@@ -1,18 +1,17 @@
require 'nio'
+require 'thread'
module ActionCable
module Connection
class StreamEventLoop
def initialize
- @nio = NIO::Selector.new
+ @nio = @thread = nil
@map = {}
@stopping = false
@todo = Queue.new
- Thread.new do
- Thread.current.abort_on_exception = true
- run
- end
+ @spawn_mutex = Mutex.new
+ spawn
end
def attach(io, stream)
@@ -20,34 +19,53 @@ module ActionCable
@map[io] = stream
@nio.register(io, :r)
end
- @nio.wakeup
+ wakeup
end
def detach(io, stream)
@todo << lambda do
- @nio.deregister(io)
+ @nio.deregister io
@map.delete io
end
- @nio.wakeup
+ wakeup
end
def stop
@stopping = true
- @nio.wakeup
+ wakeup if @nio
end
- def run
- loop do
- if @stopping
- @nio.close
- break
- end
+ private
+ def spawn
+ return if @thread && @thread.status
+
+ @spawn_mutex.synchronize do
+ return if @thread && @thread.status
+
+ @nio ||= NIO::Selector.new
+ @thread = Thread.new { run }
- until @todo.empty?
- @todo.pop(true).call
+ return true
end
+ end
+
+ def wakeup
+ spawn || @nio.wakeup
+ end
+
+ def run
+ loop do
+ if @stopping
+ @nio.close
+ break
+ end
+
+ until @todo.empty?
+ @todo.pop(true).call
+ end
+
+ next unless monitors = @nio.select
- if monitors = @nio.select
monitors.each do |monitor|
io = monitor.io
stream = @map[io]
@@ -56,13 +74,21 @@ module ActionCable
stream.receive io.read_nonblock(4096)
rescue IO::WaitReadable
next
- rescue EOFError
- stream.close
+ rescue
+ # We expect one of EOFError or Errno::ECONNRESET in
+ # normal operation (when the client goes away). But if
+ # anything else goes wrong, this is still the best way
+ # to handle it.
+ begin
+ stream.close
+ rescue
+ @nio.deregister io
+ @map.delete io
+ end
end
end
end
end
- end
end
end
end