aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthew Draper <matthew@trebex.net>2016-01-28 18:46:14 +1030
committerMatthew Draper <matthew@trebex.net>2016-01-30 03:46:37 +1030
commit786ed1b3ad8eeddb911211b67031016730ed55c8 (patch)
tree47ffe90c1ae2cb7c1552e47a10dcb25900ef9486
parentce37de4a19447fc89d2d16f15ba9314fba30d47e (diff)
downloadrails-786ed1b3ad8eeddb911211b67031016730ed55c8.tar.gz
rails-786ed1b3ad8eeddb911211b67031016730ed55c8.tar.bz2
rails-786ed1b3ad8eeddb911211b67031016730ed55c8.zip
Handle more IO errors (especially, ECONNRESET)
Also, address the possibility of the listen thread dying and needing to be respawned. As a bonus, we now defer construction of the thread until we are first given something to monitor.
-rw-r--r--actioncable/lib/action_cable/connection/stream_event_loop.rb68
-rw-r--r--actioncable/test/client/echo_channel.rb5
-rw-r--r--actioncable/test/client_test.rb26
3 files changed, 78 insertions, 21 deletions
diff --git a/actioncable/lib/action_cable/connection/stream_event_loop.rb b/actioncable/lib/action_cable/connection/stream_event_loop.rb
index f773814973..e6335082d2 100644
--- a/actioncable/lib/action_cable/connection/stream_event_loop.rb
+++ b/actioncable/lib/action_cable/connection/stream_event_loop.rb
@@ -1,18 +1,17 @@
require 'nio'
+require 'thread'
module ActionCable
module Connection
class StreamEventLoop
def initialize
- @nio = NIO::Selector.new
+ @nio = @thread = nil
@map = {}
@stopping = false
@todo = Queue.new
- Thread.new do
- Thread.current.abort_on_exception = true
- run
- end
+ @spawn_mutex = Mutex.new
+ spawn
end
def attach(io, stream)
@@ -20,34 +19,53 @@ module ActionCable
@map[io] = stream
@nio.register(io, :r)
end
- @nio.wakeup
+ wakeup
end
def detach(io, stream)
@todo << lambda do
- @nio.deregister(io)
+ @nio.deregister io
@map.delete io
end
- @nio.wakeup
+ wakeup
end
def stop
@stopping = true
- @nio.wakeup
+ wakeup if @nio
end
- def run
- loop do
- if @stopping
- @nio.close
- break
- end
+ private
+ def spawn
+ return if @thread && @thread.status
+
+ @spawn_mutex.synchronize do
+ return if @thread && @thread.status
+
+ @nio ||= NIO::Selector.new
+ @thread = Thread.new { run }
- until @todo.empty?
- @todo.pop(true).call
+ return true
end
+ end
+
+ def wakeup
+ spawn || @nio.wakeup
+ end
+
+ def run
+ loop do
+ if @stopping
+ @nio.close
+ break
+ end
+
+ until @todo.empty?
+ @todo.pop(true).call
+ end
+
+ next unless monitors = @nio.select
- if monitors = @nio.select
monitors.each do |monitor|
io = monitor.io
stream = @map[io]
@@ -56,13 +74,21 @@ module ActionCable
stream.receive io.read_nonblock(4096)
rescue IO::WaitReadable
next
- rescue EOFError
- stream.close
+ rescue
+ # We expect one of EOFError or Errno::ECONNRESET in
+ # normal operation (when the client goes away). But if
+ # anything else goes wrong, this is still the best way
+ # to handle it.
+ begin
+ stream.close
+ rescue
+ @nio.deregister io
+ @map.delete io
+ end
end
end
end
end
- end
end
end
end
diff --git a/actioncable/test/client/echo_channel.rb b/actioncable/test/client/echo_channel.rb
index 9a54080d4d..63e35f194a 100644
--- a/actioncable/test/client/echo_channel.rb
+++ b/actioncable/test/client/echo_channel.rb
@@ -7,6 +7,11 @@ class EchoChannel < ActionCable::Channel::Base
transmit(dong: data['message'])
end
+ def delay(data)
+ sleep 1
+ transmit(dong: data['message'])
+ end
+
def bulk(data)
ActionCable.server.broadcast "global", wide: data['message']
end
diff --git a/actioncable/test/client_test.rb b/actioncable/test/client_test.rb
index b185654c71..7617e93426 100644
--- a/actioncable/test/client_test.rb
+++ b/actioncable/test/client_test.rb
@@ -181,6 +181,15 @@ class ClientTest < ActionCable::TestCase
@ws.close
@closed.wait(WAIT_WHEN_EXPECTING_EVENT)
end
+
+ def close!
+ sock = BasicSocket.for_fd(@ws.instance_variable_get(:@stream).detach)
+
+ # Force a TCP reset
+ sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, [1, 0].pack('ii'))
+
+ sock.close
+ end
end
def faye_client(port)
@@ -235,4 +244,21 @@ class ClientTest < ActionCable::TestCase
clients.map {|c| Concurrent::Future.execute { c.close } }.each(&:wait!)
end
end
+
+ def test_disappearing_client
+ with_puma_server do |port|
+ c = faye_client(port)
+ c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
+ assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
+ c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'delay', message: 'hello')
+ c.close! # disappear before write
+
+ c = faye_client(port)
+ c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
+ assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
+ c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello')
+ assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message)
+ c.close! # disappear before read
+ end
+ end
end