aboutsummaryrefslogtreecommitdiffstats
path: root/actionpack
diff options
context:
space:
mode:
authorMatthew Draper <matthew@trebex.net>2014-06-01 10:35:00 +0930
committerMatthew Draper <matthew@trebex.net>2014-06-08 07:21:14 +0930
commit6a89850dfe1e8c8331fd8482525aa4b9b2530cad (patch)
tree1f46c8533080b07a1f0ab9cef99ad45fc3be961c /actionpack
parentdc73e39b4d40f0965b000f84568f77f126ec8290 (diff)
downloadrails-6a89850dfe1e8c8331fd8482525aa4b9b2530cad.tar.gz
rails-6a89850dfe1e8c8331fd8482525aa4b9b2530cad.tar.bz2
rails-6a89850dfe1e8c8331fd8482525aa4b9b2530cad.zip
Handle client disconnect during live streaming
.. even when the producer is blocked for a write.
Diffstat (limited to 'actionpack')
-rw-r--r--actionpack/CHANGELOG.md5
-rw-r--r--actionpack/lib/action_controller/metal/live.rb48
-rw-r--r--actionpack/lib/action_dispatch/http/response.rb60
-rw-r--r--actionpack/test/controller/live_stream_test.rb89
4 files changed, 189 insertions, 13 deletions
diff --git a/actionpack/CHANGELOG.md b/actionpack/CHANGELOG.md
index 3b20aec20d..dfd5ddeedf 100644
--- a/actionpack/CHANGELOG.md
+++ b/actionpack/CHANGELOG.md
@@ -1,3 +1,8 @@
+* Ensure the controller is always notified as soon as the client disconnects
+ during live streaming, even when the controller is blocked on a write.
+
+ *Nicholas Jakobsen*, *Matthew Draper*
+
* Routes specifying 'to:' must be a string that contains a "#" or a rack
application. Use of a symbol should be replaced with `action: symbol`.
Use of a string without a "#" should be replaced with `controller: string`.
diff --git a/actionpack/lib/action_controller/metal/live.rb b/actionpack/lib/action_controller/metal/live.rb
index 4c0554d27b..67875141cb 100644
--- a/actionpack/lib/action_controller/metal/live.rb
+++ b/actionpack/lib/action_controller/metal/live.rb
@@ -107,12 +107,25 @@ module ActionController
end
end
+ class ClientDisconnected < RuntimeError
+ end
+
class Buffer < ActionDispatch::Response::Buffer #:nodoc:
include MonitorMixin
+ # Ignore that the client has disconnected.
+ #
+ # If this value is `true`, calling `write` after the client
+ # disconnects will result in the written content being silently
+ # discarded. If this value is `false` (the default), a
+ # ClientDisconnected exception will be raised.
+ attr_accessor :ignore_disconnect
+
def initialize(response)
@error_callback = lambda { true }
@cv = new_cond
+ @aborted = false
+ @ignore_disconnect = false
super(response, SizedQueue.new(10))
end
@@ -123,6 +136,17 @@ module ActionController
end
super
+
+ unless connected?
+ @buf.clear
+
+ unless @ignore_disconnect
+ # Raise ClientDisconnected, which is a RuntimeError (not an
+ # IOError), because that's more appropriate for something beyond
+ # the developer's control.
+ raise ClientDisconnected, "client disconnected"
+ end
+ end
end
def each
@@ -133,6 +157,10 @@ module ActionController
@response.sent!
end
+ # Write a 'close' event to the buffer; the producer/writing thread
+ # uses this to notify us that it's finished supplying content.
+ #
+ # See also #abort.
def close
synchronize do
super
@@ -141,6 +169,26 @@ module ActionController
end
end
+ # Inform the producer/writing thread that the client has
+ # disconnected; the reading thread is no longer interested in
+ # anything that's being written.
+ #
+ # See also #close.
+ def abort
+ synchronize do
+ @aborted = true
+ @buf.clear
+ end
+ end
+
+ # Is the client still connected and waiting for content?
+ #
+ # The result of calling `write` when this is `false` is determined
+ # by `ignore_disconnect`.
+ def connected?
+ !@aborted
+ end
+
def await_close
synchronize do
@cv.wait_until { @closed }
diff --git a/actionpack/lib/action_dispatch/http/response.rb b/actionpack/lib/action_dispatch/http/response.rb
index eaea93b730..2fab6be1a5 100644
--- a/actionpack/lib/action_dispatch/http/response.rb
+++ b/actionpack/lib/action_dispatch/http/response.rb
@@ -97,6 +97,9 @@ module ActionDispatch # :nodoc:
x
end
+ def abort
+ end
+
def close
@response.commit!
@closed = true
@@ -207,18 +210,6 @@ module ActionDispatch # :nodoc:
end
alias_method :status_message, :message
- def respond_to?(method, include_private = false)
- if method.to_s == 'to_path'
- stream.respond_to?(method)
- else
- super
- end
- end
-
- def to_path
- stream.to_path
- end
-
# Returns the content of the response as a string. This contains the contents
# of any calls to <tt>render</tt>.
def body
@@ -271,6 +262,17 @@ module ActionDispatch # :nodoc:
stream.close if stream.respond_to?(:close)
end
+ def abort
+ if stream.respond_to?(:abort)
+ stream.abort
+ elsif stream.respond_to?(:close)
+ # `stream.close` should really be reserved for a close from the
+ # other direction, but we must fall back to it for
+ # compatibility.
+ stream.close
+ end
+ end
+
# Turns the Response into a Rack-compatible array of the status, headers,
# and body.
def to_a
@@ -337,6 +339,38 @@ module ActionDispatch # :nodoc:
!@sending_file && @charset != false
end
+ class RackBody
+ def initialize(response)
+ @response = response
+ end
+
+ def each(*args, &block)
+ @response.each(*args, &block)
+ end
+
+ def close
+ # Rack "close" maps to Response#abort, and *not* Response#close
+ # (which is used when the controller's finished writing)
+ @response.abort
+ end
+
+ def body
+ @response.body
+ end
+
+ def respond_to?(method, include_private = false)
+ if method.to_s == 'to_path'
+ @response.stream.respond_to?(method)
+ else
+ super
+ end
+ end
+
+ def to_path
+ @response.stream.to_path
+ end
+ end
+
def rack_response(status, header)
assign_default_content_type_and_charset!(header)
handle_conditional_get!
@@ -347,7 +381,7 @@ module ActionDispatch # :nodoc:
header.delete CONTENT_TYPE
[status, header, []]
else
- [status, header, Rack::BodyProxy.new(self){}]
+ [status, header, RackBody.new(self)]
end
end
end
diff --git a/actionpack/test/controller/live_stream_test.rb b/actionpack/test/controller/live_stream_test.rb
index 1dca36374a..0500b7c789 100644
--- a/actionpack/test/controller/live_stream_test.rb
+++ b/actionpack/test/controller/live_stream_test.rb
@@ -202,6 +202,39 @@ module ActionController
response.stream.write ''
response.stream.write params[:widget][:didnt_check_for_nil]
end
+
+ def overfill_buffer_and_die
+ # Write until the buffer is full. It doesn't expose that
+ # information directly, so we must hard-code its size:
+ 10.times do
+ response.stream.write '.'
+ end
+ # .. plus one more, because the #each frees up a slot:
+ response.stream.write '.'
+
+ latch.release
+
+ # This write will block, and eventually raise
+ response.stream.write 'x'
+
+ 20.times do
+ response.stream.write '.'
+ end
+ end
+
+ def ignore_client_disconnect
+ response.stream.ignore_disconnect = true
+
+ response.stream.write '' # commit
+
+ # These writes will be ignored
+ 15.times do
+ response.stream.write 'x'
+ end
+
+ logger.info 'Work complete'
+ latch.release
+ end
end
tests TestController
@@ -264,6 +297,62 @@ module ActionController
assert t.join(3), 'timeout expired before the thread terminated'
end
+ def test_abort_with_full_buffer
+ @controller.latch = ActiveSupport::Concurrency::Latch.new
+
+ @request.parameters[:format] = 'plain'
+ @controller.request = @request
+ @controller.response = @response
+
+ got_error = ActiveSupport::Concurrency::Latch.new
+ @response.stream.on_error do
+ ActionController::Base.logger.warn 'Error while streaming'
+ got_error.release
+ end
+
+ t = Thread.new(@response) { |resp|
+ resp.await_commit
+ _, _, body = resp.to_a
+ body.each do |part|
+ @controller.latch.await
+ body.close
+ break
+ end
+ }
+
+ capture_log_output do |output|
+ @controller.process :overfill_buffer_and_die
+ t.join
+ got_error.await
+ assert_match 'Error while streaming', output.rewind && output.read
+ end
+ end
+
+ def test_ignore_client_disconnect
+ @controller.latch = ActiveSupport::Concurrency::Latch.new
+
+ @controller.request = @request
+ @controller.response = @response
+
+ t = Thread.new(@response) { |resp|
+ resp.await_commit
+ _, _, body = resp.to_a
+ body.each do |part|
+ body.close
+ break
+ end
+ }
+
+ capture_log_output do |output|
+ @controller.process :ignore_client_disconnect
+ t.join
+ Timeout.timeout(3) do
+ @controller.latch.await
+ end
+ assert_match 'Work complete', output.rewind && output.read
+ end
+ end
+
def test_thread_locals_get_copied
@controller.tc = self
Thread.current[:originating_thread] = Thread.current.object_id