aboutsummaryrefslogtreecommitdiffstats
path: root/actionpack/test/controller/live_stream_test.rb
diff options
context:
space:
mode:
Diffstat (limited to 'actionpack/test/controller/live_stream_test.rb')
-rw-r--r--actionpack/test/controller/live_stream_test.rb285
1 files changed, 172 insertions, 113 deletions
diff --git a/actionpack/test/controller/live_stream_test.rb b/actionpack/test/controller/live_stream_test.rb
index 0c65270ec1..e76628b936 100644
--- a/actionpack/test/controller/live_stream_test.rb
+++ b/actionpack/test/controller/live_stream_test.rb
@@ -1,5 +1,5 @@
-require 'abstract_unit'
-require 'active_support/concurrency/latch'
+require "abstract_unit"
+require "concurrent/atomic/count_down_latch"
Thread.abort_on_exception = true
module ActionController
@@ -8,10 +8,10 @@ module ActionController
include ActionController::Live
def basic_sse
- response.headers['Content-Type'] = 'text/event-stream'
+ response.headers["Content-Type"] = "text/event-stream"
sse = SSE.new(response.stream)
sse.write("{\"name\":\"John\"}")
- sse.write({ name: "Ryan" })
+ sse.write(name: "Ryan")
ensure
sse.close
end
@@ -19,7 +19,7 @@ module ActionController
def sse_with_event
sse = SSE.new(response.stream, event: "send-name")
sse.write("{\"name\":\"John\"}")
- sse.write({ name: "Ryan" })
+ sse.write(name: "Ryan")
ensure
sse.close
end
@@ -112,10 +112,10 @@ module ActionController
class TestController < ActionController::Base
include ActionController::Live
- attr_accessor :latch, :tc
+ attr_accessor :latch, :tc, :error_latch
def self.controller_path
- 'test'
+ "test"
end
def set_cookie
@@ -125,7 +125,7 @@ module ActionController
end
def render_text
- render :text => 'zomg'
+ render plain: "zomg"
end
def default_header
@@ -134,7 +134,7 @@ module ActionController
end
def basic_stream
- response.headers['Content-Type'] = 'text/event-stream'
+ response.headers["Content-Type"] = "text/event-stream"
%w{ hello world }.each do |word|
response.stream.write word
end
@@ -142,19 +142,34 @@ module ActionController
end
def blocking_stream
- response.headers['Content-Type'] = 'text/event-stream'
+ response.headers["Content-Type"] = "text/event-stream"
%w{ hello world }.each do |word|
response.stream.write word
- latch.await
+ latch.wait
end
response.stream.close
end
+ def write_sleep_autoload
+ path = File.join(File.dirname(__FILE__), "../fixtures")
+ ActiveSupport::Dependencies.autoload_paths << path
+
+ response.headers["Content-Type"] = "text/event-stream"
+ response.stream.write "before load"
+ sleep 0.01
+ silence_warning do
+ ::LoadMe
+ end
+ response.stream.close
+ latch.count_down
+
+ ActiveSupport::Dependencies.autoload_paths.reject! { |p| p == path }
+ end
+
def thread_locals
- tc.assert_equal 'aaron', Thread.current[:setting]
- tc.assert_not_equal Thread.current.object_id, Thread.current[:originating_thread]
+ tc.assert_equal "aaron", Thread.current[:setting]
- response.headers['Content-Type'] = 'text/event-stream'
+ response.headers["Content-Type"] = "text/event-stream"
%w{ hello world }.each do |word|
response.stream.write word
end
@@ -162,20 +177,20 @@ module ActionController
end
def with_stale
- render text: 'stale' if stale?(etag: "123", template: false)
+ render plain: "stale" if stale?(etag: "123", template: false)
end
def exception_in_view
- render 'doesntexist'
+ render "doesntexist"
end
def exception_in_view_after_commit
response.stream.write ""
- render 'doesntexist'
+ render "doesntexist"
end
def exception_with_callback
- response.headers['Content-Type'] = 'text/event-stream'
+ response.headers["Content-Type"] = "text/event-stream"
response.stream.on_error do
response.stream.write %(data: "500 Internal Server Error"\n\n)
@@ -183,11 +198,11 @@ module ActionController
end
response.stream.write "" # make sure the response is committed
- raise 'An exception occurred...'
+ raise "An exception occurred..."
end
def exception_in_controller
- raise Exception, 'Exception in controller'
+ raise Exception, "Exception in controller"
end
def bad_request_error
@@ -195,53 +210,60 @@ module ActionController
end
def exception_in_exception_callback
- response.headers['Content-Type'] = 'text/event-stream'
+ response.headers["Content-Type"] = "text/event-stream"
response.stream.on_error do
- raise 'We need to go deeper.'
+ raise "We need to go deeper."
end
- response.stream.write ''
+ response.stream.write ""
response.stream.write params[:widget][:didnt_check_for_nil]
end
def overfill_buffer_and_die
+ logger = ActionController::Base.logger || Logger.new($stdout)
+ response.stream.on_error do
+ logger.warn "Error while streaming."
+ error_latch.count_down
+ end
+
# Write until the buffer is full. It doesn't expose that
# information directly, so we must hard-code its size:
10.times do
- response.stream.write '.'
+ response.stream.write "."
end
# .. plus one more, because the #each frees up a slot:
- response.stream.write '.'
+ response.stream.write "."
- latch.release
+ latch.count_down
# This write will block, and eventually raise
- response.stream.write 'x'
+ response.stream.write "x"
20.times do
- response.stream.write '.'
+ response.stream.write "."
end
end
def ignore_client_disconnect
response.stream.ignore_disconnect = true
- response.stream.write '' # commit
+ response.stream.write "" # commit
# These writes will be ignored
15.times do
- response.stream.write 'x'
+ response.stream.write "x"
end
- logger.info 'Work complete'
- latch.release
+ logger.info "Work complete"
+ latch.count_down
end
end
tests TestController
def assert_stream_closed
- assert response.stream.closed?, 'stream should be closed'
- assert response.sent?, 'stream should be sent'
+ assert response.stream.closed?, "stream should be closed"
+ assert response.committed?, "response should be committed"
+ assert response.sent?, "response should be sent"
end
def capture_log_output
@@ -255,125 +277,117 @@ module ActionController
end
end
+ def setup
+ super
+
+ def @controller.new_controller_thread
+ Thread.new { yield }
+ end
+ end
+
def test_set_cookie
- @controller = TestController.new
get :set_cookie
- assert_equal({'hello' => 'world'}, @response.cookies)
+ assert_equal({ "hello" => "world" }, @response.cookies)
assert_equal "hello world", @response.body
end
- def test_set_response!
- @controller.set_response!(@request)
- assert_kind_of(Live::Response, @controller.response)
- assert_equal @request, @controller.response.request
- end
-
def test_write_to_stream
- @controller = TestController.new
get :basic_stream
assert_equal "helloworld", @response.body
- assert_equal 'text/event-stream', @response.headers['Content-Type']
+ assert_equal "text/event-stream", @response.headers["Content-Type"]
+ end
+
+ def test_delayed_autoload_after_write_within_interlock_hook
+ # Simulate InterlockHook
+ ActiveSupport::Dependencies.interlock.start_running
+ res = get :write_sleep_autoload
+ res.each {}
+ ActiveSupport::Dependencies.interlock.done_running
end
def test_async_stream
rubinius_skip "https://github.com/rubinius/rubinius/issues/2934"
- @controller.latch = ActiveSupport::Concurrency::Latch.new
- parts = ['hello', 'world']
+ @controller.latch = Concurrent::CountDownLatch.new
+ parts = ["hello", "world"]
- @controller.request = @request
- @controller.response = @response
+ get :blocking_stream
- t = Thread.new(@response) { |resp|
+ t = Thread.new(response) { |resp|
resp.await_commit
resp.stream.each do |part|
assert_equal parts.shift, part
ol = @controller.latch
- @controller.latch = ActiveSupport::Concurrency::Latch.new
- ol.release
+ @controller.latch = Concurrent::CountDownLatch.new
+ ol.count_down
end
}
- @controller.process :blocking_stream
-
- assert t.join(3), 'timeout expired before the thread terminated'
+ assert t.join(3), "timeout expired before the thread terminated"
end
def test_abort_with_full_buffer
- @controller.latch = ActiveSupport::Concurrency::Latch.new
-
- @request.parameters[:format] = 'plain'
- @controller.request = @request
- @controller.response = @response
-
- got_error = ActiveSupport::Concurrency::Latch.new
- @response.stream.on_error do
- ActionController::Base.logger.warn 'Error while streaming'
- got_error.release
- end
-
- t = Thread.new(@response) { |resp|
- resp.await_commit
- _, _, body = resp.to_a
- body.each do |part|
- @controller.latch.await
- body.close
- break
- end
- }
+ @controller.latch = Concurrent::CountDownLatch.new
+ @controller.error_latch = Concurrent::CountDownLatch.new
capture_log_output do |output|
- @controller.process :overfill_buffer_and_die
+ get :overfill_buffer_and_die, format: "plain"
+
+ t = Thread.new(response) { |resp|
+ resp.await_commit
+ _, _, body = resp.to_a
+ body.each do
+ @controller.latch.wait
+ body.close
+ break
+ end
+ }
+
t.join
- got_error.await
- assert_match 'Error while streaming', output.rewind && output.read
+ @controller.error_latch.wait
+ assert_match "Error while streaming", output.rewind && output.read
end
end
def test_ignore_client_disconnect
- @controller.latch = ActiveSupport::Concurrency::Latch.new
+ @controller.latch = Concurrent::CountDownLatch.new
- @controller.request = @request
- @controller.response = @response
+ capture_log_output do |output|
+ get :ignore_client_disconnect
- t = Thread.new(@response) { |resp|
- resp.await_commit
- _, _, body = resp.to_a
- body.each do |part|
- body.close
- break
- end
- }
+ t = Thread.new(response) { |resp|
+ resp.await_commit
+ _, _, body = resp.to_a
+ body.each do
+ body.close
+ break
+ end
+ }
- capture_log_output do |output|
- @controller.process :ignore_client_disconnect
t.join
Timeout.timeout(3) do
- @controller.latch.await
+ @controller.latch.wait
end
- assert_match 'Work complete', output.rewind && output.read
+ assert_match "Work complete", output.rewind && output.read
end
end
def test_thread_locals_get_copied
@controller.tc = self
Thread.current[:originating_thread] = Thread.current.object_id
- Thread.current[:setting] = 'aaron'
+ Thread.current[:setting] = "aaron"
get :thread_locals
end
def test_live_stream_default_header
- @controller.request = @request
- @controller.response = @response
- @controller.process :default_header
- _, headers, _ = @response.prepare!
- assert headers['Content-Type']
+ get :default_header
+ assert response.headers["Content-Type"]
end
def test_render_text
get :render_text
- assert_equal 'zomg', response.body
+ assert_equal "zomg", response.body
assert_stream_closed
end
@@ -385,7 +399,7 @@ module ActionController
capture_log_output do |output|
get :exception_in_view_after_commit
assert_match %r((window\.location = "/500\.html"</script></html>)$), response.body
- assert_match 'Missing template test/doesntexist', output.rewind && output.read
+ assert_match "Missing template test/doesntexist", output.rewind && output.read
assert_stream_closed
end
assert response.body
@@ -399,51 +413,57 @@ module ActionController
capture_log_output do |output|
get :exception_in_view_after_commit, format: :json
- assert_equal '', response.body
- assert_match 'Missing template test/doesntexist', output.rewind && output.read
+ assert_equal "", response.body
+ assert_match "Missing template test/doesntexist", output.rewind && output.read
assert_stream_closed
end
end
def test_exception_callback_when_committed
+ current_threads = Thread.list
+
capture_log_output do |output|
- get :exception_with_callback, format: 'text/event-stream'
+ get :exception_with_callback, format: "text/event-stream"
+
+ # Wait on the execution of all threads
+ (Thread.list - current_threads).each(&:join)
+
assert_equal %(data: "500 Internal Server Error"\n\n), response.body
- assert_match 'An exception occurred...', output.rewind && output.read
+ assert_match "An exception occurred...", output.rewind && output.read
assert_stream_closed
end
end
def test_exception_in_controller_before_streaming
assert_raises(ActionController::LiveStreamTest::Exception) do
- get :exception_in_controller, format: 'text/event-stream'
+ get :exception_in_controller, format: "text/event-stream"
end
end
def test_bad_request_in_controller_before_streaming
assert_raises(ActionController::BadRequest) do
- get :bad_request_error, format: 'text/event-stream'
+ get :bad_request_error, format: "text/event-stream"
end
end
def test_exceptions_raised_handling_exceptions_and_committed
capture_log_output do |output|
- get :exception_in_exception_callback, format: 'text/event-stream'
- assert_equal '', response.body
- assert_match 'We need to go deeper', output.rewind && output.read
+ get :exception_in_exception_callback, format: "text/event-stream"
+ assert_equal "", response.body
+ assert_match "We need to go deeper", output.rewind && output.read
assert_stream_closed
end
end
def test_stale_without_etag
get :with_stale
- assert_equal 200, @response.status.to_i
+ assert_equal 200, response.status.to_i
end
def test_stale_with_etag
- @request.if_none_match = Digest::MD5.hexdigest("123")
+ @request.if_none_match = %(W/"#{Digest::MD5.hexdigest('123')}")
get :with_stale
- assert_equal 304, @response.status.to_i
+ assert_equal 304, response.status.to_i
end
end
@@ -454,3 +474,42 @@ module ActionController
end
end
end
+
+class LiveStreamRouterTest < ActionDispatch::IntegrationTest
+ class TestController < ActionController::Base
+ include ActionController::Live
+
+ def index
+ response.headers["Content-Type"] = "text/event-stream"
+ sse = SSE.new(response.stream)
+ sse.write("{\"name\":\"John\"}")
+ sse.write(name: "Ryan")
+ ensure
+ sse.close
+ end
+ end
+
+ def self.call(env)
+ routes.call(env)
+ end
+
+ def self.routes
+ @routes ||= ActionDispatch::Routing::RouteSet.new
+ end
+
+ routes.draw do
+ get "/test" => "live_stream_router_test/test#index"
+ end
+
+ def app
+ self.class
+ end
+
+ test "streaming served through the router" do
+ get "/test"
+
+ assert_response :ok
+ assert_match(/data: {\"name\":\"John\"}/, response.body)
+ assert_match(/data: {\"name\":\"Ryan\"}/, response.body)
+ end
+end