diff options
Diffstat (limited to 'actionpack/test/controller/live_stream_test.rb')
-rw-r--r-- | actionpack/test/controller/live_stream_test.rb | 518 |
1 files changed, 518 insertions, 0 deletions
diff --git a/actionpack/test/controller/live_stream_test.rb b/actionpack/test/controller/live_stream_test.rb new file mode 100644 index 0000000000..431fe90b23 --- /dev/null +++ b/actionpack/test/controller/live_stream_test.rb @@ -0,0 +1,518 @@ +# frozen_string_literal: true + +require "abstract_unit" +require "timeout" +require "concurrent/atomic/count_down_latch" +Thread.abort_on_exception = true + +module ActionController + class SSETest < ActionController::TestCase + class SSETestController < ActionController::Base + include ActionController::Live + + def basic_sse + response.headers["Content-Type"] = "text/event-stream" + sse = SSE.new(response.stream) + sse.write("{\"name\":\"John\"}") + sse.write(name: "Ryan") + ensure + sse.close + end + + def sse_with_event + sse = SSE.new(response.stream, event: "send-name") + sse.write("{\"name\":\"John\"}") + sse.write(name: "Ryan") + ensure + sse.close + end + + def sse_with_retry + sse = SSE.new(response.stream, retry: 1000) + sse.write("{\"name\":\"John\"}") + sse.write({ name: "Ryan" }, { retry: 1500 }) + ensure + sse.close + end + + def sse_with_id + sse = SSE.new(response.stream) + sse.write("{\"name\":\"John\"}", id: 1) + sse.write({ name: "Ryan" }, { id: 2 }) + ensure + sse.close + end + + def sse_with_multiple_line_message + sse = SSE.new(response.stream) + sse.write("first line.\nsecond line.") + ensure + sse.close + end + end + + tests SSETestController + + def wait_for_response_stream_close + response.body + end + + def test_basic_sse + get :basic_sse + + wait_for_response_stream_close + assert_match(/data: {\"name\":\"John\"}/, response.body) + assert_match(/data: {\"name\":\"Ryan\"}/, response.body) + end + + def test_sse_with_event_name + get :sse_with_event + + wait_for_response_stream_close + assert_match(/data: {\"name\":\"John\"}/, response.body) + assert_match(/data: {\"name\":\"Ryan\"}/, response.body) + assert_match(/event: send-name/, response.body) + end + + def test_sse_with_retry + get :sse_with_retry + + wait_for_response_stream_close + first_response, second_response = response.body.split("\n\n") + assert_match(/data: {\"name\":\"John\"}/, first_response) + assert_match(/retry: 1000/, first_response) + + assert_match(/data: {\"name\":\"Ryan\"}/, second_response) + assert_match(/retry: 1500/, second_response) + end + + def test_sse_with_id + get :sse_with_id + + wait_for_response_stream_close + first_response, second_response = response.body.split("\n\n") + assert_match(/data: {\"name\":\"John\"}/, first_response) + assert_match(/id: 1/, first_response) + + assert_match(/data: {\"name\":\"Ryan\"}/, second_response) + assert_match(/id: 2/, second_response) + end + + def test_sse_with_multiple_line_message + get :sse_with_multiple_line_message + + wait_for_response_stream_close + first_response, second_response = response.body.split("\n") + assert_match(/data: first line/, first_response) + assert_match(/data: second line/, second_response) + end + end + + class LiveStreamTest < ActionController::TestCase + class Exception < StandardError + end + + class TestController < ActionController::Base + include ActionController::Live + + attr_accessor :latch, :tc, :error_latch + + def self.controller_path + "test" + end + + def set_cookie + cookies[:hello] = "world" + response.stream.write "hello world" + response.close + end + + def render_text + render plain: "zomg" + end + + def default_header + response.stream.write "<html><body>hi</body></html>" + response.stream.close + end + + def basic_stream + response.headers["Content-Type"] = "text/event-stream" + %w{ hello world }.each do |word| + response.stream.write word + end + response.stream.close + end + + def blocking_stream + response.headers["Content-Type"] = "text/event-stream" + %w{ hello world }.each do |word| + response.stream.write word + latch.wait + end + response.stream.close + end + + def write_sleep_autoload + path = File.expand_path("../fixtures", __dir__) + ActiveSupport::Dependencies.autoload_paths << path + + response.headers["Content-Type"] = "text/event-stream" + response.stream.write "before load" + sleep 0.01 + silence_warning do + ::LoadMe + end + response.stream.close + latch.count_down + + ActiveSupport::Dependencies.autoload_paths.reject! { |p| p == path } + end + + def thread_locals + tc.assert_equal "aaron", Thread.current[:setting] + + response.headers["Content-Type"] = "text/event-stream" + %w{ hello world }.each do |word| + response.stream.write word + end + response.stream.close + end + + def with_stale + render plain: "stale" if stale?(etag: "123", template: false) + end + + def exception_in_view + render "doesntexist" + end + + def exception_in_view_after_commit + response.stream.write "" + render "doesntexist" + end + + def exception_with_callback + response.headers["Content-Type"] = "text/event-stream" + + response.stream.on_error do + response.stream.write %(data: "500 Internal Server Error"\n\n) + response.stream.close + end + + response.stream.write "" # make sure the response is committed + raise "An exception occurred..." + end + + def exception_in_controller + raise Exception, "Exception in controller" + end + + def bad_request_error + raise ActionController::BadRequest + end + + def exception_in_exception_callback + response.headers["Content-Type"] = "text/event-stream" + response.stream.on_error do + raise "We need to go deeper." + end + response.stream.write "" + response.stream.write params[:widget][:didnt_check_for_nil] + end + + def overfill_buffer_and_die + logger = ActionController::Base.logger || Logger.new($stdout) + response.stream.on_error do + logger.warn "Error while streaming." + error_latch.count_down + end + + # Write until the buffer is full. It doesn't expose that + # information directly, so we must hard-code its size: + 10.times do + response.stream.write "." + end + # .. plus one more, because the #each frees up a slot: + response.stream.write "." + + latch.count_down + + # This write will block, and eventually raise + response.stream.write "x" + + 20.times do + response.stream.write "." + end + end + + def ignore_client_disconnect + response.stream.ignore_disconnect = true + + response.stream.write "" # commit + + # These writes will be ignored + 15.times do + response.stream.write "x" + end + + logger.info "Work complete" + latch.count_down + end + end + + tests TestController + + def assert_stream_closed + assert response.stream.closed?, "stream should be closed" + assert response.committed?, "response should be committed" + assert response.sent?, "response should be sent" + end + + def capture_log_output + output = StringIO.new + old_logger, ActionController::Base.logger = ActionController::Base.logger, ActiveSupport::Logger.new(output) + + begin + yield output + ensure + ActionController::Base.logger = old_logger + end + end + + def setup + super + + def @controller.new_controller_thread + Thread.new { yield } + end + end + + def test_set_cookie + get :set_cookie + assert_equal({ "hello" => "world" }, @response.cookies) + assert_equal "hello world", @response.body + end + + def test_write_to_stream + get :basic_stream + assert_equal "helloworld", @response.body + assert_equal "text/event-stream", @response.headers["Content-Type"] + end + + def test_delayed_autoload_after_write_within_interlock_hook + # Simulate InterlockHook + ActiveSupport::Dependencies.interlock.start_running + res = get :write_sleep_autoload + res.each {} + ActiveSupport::Dependencies.interlock.done_running + end + + def test_async_stream + rubinius_skip "https://github.com/rubinius/rubinius/issues/2934" + + @controller.latch = Concurrent::CountDownLatch.new + parts = ["hello", "world"] + + get :blocking_stream + + t = Thread.new(response) { |resp| + resp.await_commit + resp.stream.each do |part| + assert_equal parts.shift, part + ol = @controller.latch + @controller.latch = Concurrent::CountDownLatch.new + ol.count_down + end + } + + assert t.join(3), "timeout expired before the thread terminated" + end + + def test_abort_with_full_buffer + @controller.latch = Concurrent::CountDownLatch.new + @controller.error_latch = Concurrent::CountDownLatch.new + + capture_log_output do |output| + get :overfill_buffer_and_die, format: "plain" + + t = Thread.new(response) { |resp| + resp.await_commit + _, _, body = resp.to_a + body.each do + @controller.latch.wait + body.close + break + end + } + + t.join + @controller.error_latch.wait + assert_match "Error while streaming", output.rewind && output.read + end + end + + def test_ignore_client_disconnect + @controller.latch = Concurrent::CountDownLatch.new + + capture_log_output do |output| + get :ignore_client_disconnect + + t = Thread.new(response) { |resp| + resp.await_commit + _, _, body = resp.to_a + body.each do + body.close + break + end + } + + t.join + Timeout.timeout(3) do + @controller.latch.wait + end + assert_match "Work complete", output.rewind && output.read + end + end + + def test_thread_locals_get_copied + @controller.tc = self + Thread.current[:originating_thread] = Thread.current.object_id + Thread.current[:setting] = "aaron" + + get :thread_locals + end + + def test_live_stream_default_header + get :default_header + assert response.headers["Content-Type"] + end + + def test_render_text + get :render_text + assert_equal "zomg", response.body + assert_stream_closed + end + + def test_exception_handling_html + assert_raises(ActionView::MissingTemplate) do + get :exception_in_view + end + + capture_log_output do |output| + get :exception_in_view_after_commit + assert_match %r((window\.location = "/500\.html"</script></html>)$), response.body + assert_match "Missing template test/doesntexist", output.rewind && output.read + assert_stream_closed + end + assert response.body + assert_stream_closed + end + + def test_exception_handling_plain_text + assert_raises(ActionView::MissingTemplate) do + get :exception_in_view, format: :json + end + + capture_log_output do |output| + get :exception_in_view_after_commit, format: :json + assert_equal "", response.body + assert_match "Missing template test/doesntexist", output.rewind && output.read + assert_stream_closed + end + end + + def test_exception_callback_when_committed + current_threads = Thread.list + + capture_log_output do |output| + get :exception_with_callback, format: "text/event-stream" + + # Wait on the execution of all threads + (Thread.list - current_threads).each(&:join) + + assert_equal %(data: "500 Internal Server Error"\n\n), response.body + assert_match "An exception occurred...", output.rewind && output.read + assert_stream_closed + end + end + + def test_exception_in_controller_before_streaming + assert_raises(ActionController::LiveStreamTest::Exception) do + get :exception_in_controller, format: "text/event-stream" + end + end + + def test_bad_request_in_controller_before_streaming + assert_raises(ActionController::BadRequest) do + get :bad_request_error, format: "text/event-stream" + end + end + + def test_exceptions_raised_handling_exceptions_and_committed + capture_log_output do |output| + get :exception_in_exception_callback, format: "text/event-stream" + assert_equal "", response.body + assert_match "We need to go deeper", output.rewind && output.read + assert_stream_closed + end + end + + def test_stale_without_etag + get :with_stale + assert_equal 200, response.status.to_i + end + + def test_stale_with_etag + @request.if_none_match = %(W/"#{ActiveSupport::Digest.hexdigest('123')}") + get :with_stale + assert_equal 304, response.status.to_i + end + end + + class BufferTest < ActionController::TestCase + def test_nil_callback + buf = ActionController::Live::Buffer.new nil + assert buf.call_on_error + end + end +end + +class LiveStreamRouterTest < ActionDispatch::IntegrationTest + class TestController < ActionController::Base + include ActionController::Live + + def index + response.headers["Content-Type"] = "text/event-stream" + sse = SSE.new(response.stream) + sse.write("{\"name\":\"John\"}") + sse.write(name: "Ryan") + ensure + sse.close + end + end + + def self.call(env) + routes.call(env) + end + + def self.routes + @routes ||= ActionDispatch::Routing::RouteSet.new + end + + routes.draw do + get "/test" => "live_stream_router_test/test#index" + end + + def app + self.class + end + + test "streaming served through the router" do + get "/test" + + assert_response :ok + assert_match(/data: {\"name\":\"John\"}/, response.body) + assert_match(/data: {\"name\":\"Ryan\"}/, response.body) + end +end |