aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthew Draper <matthew@trebex.net>2016-09-28 06:44:23 +0930
committerMatthew Draper <matthew@trebex.net>2016-09-28 06:44:23 +0930
commit5d92089bcae42275953ad310b24f5dd66ff0ec6e (patch)
treed97c8b35275ffd6353840090d3e261cc931cc917
parent21ecf42730de2004a95d913038c1f417c24496be (diff)
downloadrails-5d92089bcae42275953ad310b24f5dd66ff0ec6e.tar.gz
rails-5d92089bcae42275953ad310b24f5dd66ff0ec6e.tar.bz2
rails-5d92089bcae42275953ad310b24f5dd66ff0ec6e.zip
Buffer writes to the cable sockets
Otherwise, they can sometimes block, leading to reduced system throughput.
-rw-r--r--actioncable/CHANGELOG.md5
-rw-r--r--actioncable/lib/action_cable/connection/stream.rb57
-rw-r--r--actioncable/lib/action_cable/connection/stream_event_loop.rb46
-rw-r--r--actioncable/test/stubs/test_server.rb4
4 files changed, 100 insertions, 12 deletions
diff --git a/actioncable/CHANGELOG.md b/actioncable/CHANGELOG.md
index dec6f7c027..2e6c2f2afc 100644
--- a/actioncable/CHANGELOG.md
+++ b/actioncable/CHANGELOG.md
@@ -1,3 +1,8 @@
+* Buffer writes to websocket connections, to avoid blocking threads
+ that could be doing more useful things.
+
+ *Matthew Draper*, *Tinco Andringa*
+
* Protect against concurrent writes to a websocket connection from
multiple threads; the underlying OS write is not always threadsafe.
diff --git a/actioncable/lib/action_cable/connection/stream.rb b/actioncable/lib/action_cable/connection/stream.rb
index 5a2aace0ba..d66e1b4e41 100644
--- a/actioncable/lib/action_cable/connection/stream.rb
+++ b/actioncable/lib/action_cable/connection/stream.rb
@@ -14,6 +14,9 @@ module ActionCable
@rack_hijack_io = nil
@write_lock = Mutex.new
+
+ @write_head = nil
+ @write_buffer = Queue.new
end
def each(&callback)
@@ -30,14 +33,62 @@ module ActionCable
end
def write(data)
- @write_lock.synchronize do
- return @rack_hijack_io.write(data) if @rack_hijack_io
- return @stream_send.call(data) if @stream_send
+ if @stream_send
+ return @stream_send.call(data)
end
+
+ if @write_lock.try_lock
+ begin
+ if @write_head.nil? && @write_buffer.empty?
+ written = @rack_hijack_io.write_nonblock(data, exception: false)
+
+ case written
+ when :wait_writable
+ # proceed below
+ when data.bytesize
+ return data.bytesize
+ else
+ @write_head = data.byteslice(written, data.bytesize)
+ @event_loop.writes_pending @rack_hijack_io
+
+ return data.bytesize
+ end
+ end
+ ensure
+ @write_lock.unlock
+ end
+ end
+
+ @write_buffer << data
+ @event_loop.writes_pending @rack_hijack_io
+
+ data.bytesize
rescue EOFError, Errno::ECONNRESET
@socket_object.client_gone
end
+ def flush_write_buffer
+ @write_lock.synchronize do
+ loop do
+ if @write_head.nil?
+ return true if @write_buffer.empty?
+ @write_head = @write_buffer.pop
+ end
+
+ written = @rack_hijack_io.write_nonblock(@write_head, exception: false)
+ case written
+ when :wait_writable
+ return false
+ when @write_head.bytesize
+ @write_head = nil
+ else
+ @write_head = @write_head.byteslice(written, @write_head.bytesize)
+ return false
+ end
+ end
+ end
+ end
+
def receive(data)
@socket_object.parse(data)
end
diff --git a/actioncable/lib/action_cable/connection/stream_event_loop.rb b/actioncable/lib/action_cable/connection/stream_event_loop.rb
index 106b948c45..eec24638b6 100644
--- a/actioncable/lib/action_cable/connection/stream_event_loop.rb
+++ b/actioncable/lib/action_cable/connection/stream_event_loop.rb
@@ -5,7 +5,7 @@ module ActionCable
module Connection
class StreamEventLoop
def initialize
- @nio = @thread = nil
+ @nio = @executor = @thread = nil
@map = {}
@stopping = false
@todo = Queue.new
@@ -20,13 +20,14 @@ module ActionCable
def post(task = nil, &block)
task ||= block
- Concurrent.global_io_executor << task
+ spawn
+ @executor << task
end
def attach(io, stream)
@todo << lambda do
- @map[io] = stream
- @nio.register(io, :r)
+ @map[io] = @nio.register(io, :r)
+ @map[io].value = stream
end
wakeup
end
@@ -39,6 +40,15 @@ module ActionCable
wakeup
end
+ def writes_pending(io)
+ @todo << lambda do
+ if monitor = @map[io]
+ monitor.interests = :rw
+ end
+ end
+ wakeup
+ end
+
def stop
@stopping = true
wakeup if @nio
@@ -52,6 +62,13 @@ module ActionCable
return if @thread && @thread.status
@nio ||= NIO::Selector.new
+
+ @executor ||= Concurrent::ThreadPoolExecutor.new(
+ min_threads: 1,
+ max_threads: 10,
+ max_queue: 0,
+ )
+
@thread = Thread.new { run }
return true
@@ -77,12 +94,25 @@ module ActionCable
monitors.each do |monitor|
io = monitor.io
- stream = @map[io]
+ stream = monitor.value
begin
- stream.receive io.read_nonblock(4096)
- rescue IO::WaitReadable
- next
+ if monitor.writable?
+ if stream.flush_write_buffer
+ monitor.interests = :r
+ end
+ next unless monitor.readable?
+ end
+
+ incoming = io.read_nonblock(4096, exception: false)
+ case incoming
+ when :wait_readable
+ next
+ when nil
+ stream.close
+ else
+ stream.receive incoming
+ end
rescue
# We expect one of EOFError or Errno::ECONNRESET in
# normal operation (when the client goes away). But if
diff --git a/actioncable/test/stubs/test_server.rb b/actioncable/test/stubs/test_server.rb
index b64ff33789..02be72d0cb 100644
--- a/actioncable/test/stubs/test_server.rb
+++ b/actioncable/test/stubs/test_server.rb
@@ -28,7 +28,9 @@ class TestServer
@event_loop ||= if @config.use_faye
ActionCable::Connection::FayeEventLoop.new
else
- ActionCable::Connection::StreamEventLoop.new
+ ActionCable::Connection::StreamEventLoop.new.tap do |loop|
+ loop.instance_variable_set(:@executor, Concurrent.global_io_executor)
+ end
end
end