aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--actionpack/lib/action_controller/metal/live.rb28
-rw-r--r--actionpack/lib/action_controller/test_case.rb12
-rw-r--r--actionpack/test/controller/live_stream_test.rb72
3 files changed, 105 insertions, 7 deletions
diff --git a/actionpack/lib/action_controller/metal/live.rb b/actionpack/lib/action_controller/metal/live.rb
index efeeefda9d..48818cb8e4 100644
--- a/actionpack/lib/action_controller/metal/live.rb
+++ b/actionpack/lib/action_controller/metal/live.rb
@@ -6,8 +6,7 @@ module ActionController
class Response < ActionDispatch::Response
class Buffer < ActionDispatch::Response::Buffer # :nodoc:
def initialize(response)
- @response = response
- @buf = Queue.new
+ super(response, Queue.new)
end
def write(string)
@@ -59,5 +58,30 @@ module ActionController
buf
end
end
+
+ def process(name)
+ t1 = Thread.current
+ locals = t1.keys.map { |key| [key, t1[key]] }
+
+ # This processes the action in a child thread. It lets us return the
+ # response code and headers back up the rack stack, and still process
+ # the body in parallel with sending data to the client
+ Thread.new {
+ t2 = Thread.current
+ t2.abort_on_exception = true
+
+ # Since we're processing the view in a different thread, copy the
+ # thread locals from the main thread to the child thread. :'(
+ locals.each { |k,v| t2[k] = v }
+
+ begin
+ super(name)
+ ensure
+ @_response.commit!
+ end
+ }
+
+ @_response.await_commit
+ end
end
end
diff --git a/actionpack/lib/action_controller/test_case.rb b/actionpack/lib/action_controller/test_case.rb
index c4c825ba6b..ca1ecc43a1 100644
--- a/actionpack/lib/action_controller/test_case.rb
+++ b/actionpack/lib/action_controller/test_case.rb
@@ -517,8 +517,8 @@ module ActionController
end
def setup_controller_request_and_response
- @request = TestRequest.new
- @response = TestResponse.new
+ @request = build_request
+ @response = build_response
@response.request = @request
@controller = nil unless defined? @controller
@@ -539,6 +539,14 @@ module ActionController
end
end
+ def build_request
+ TestRequest.new
+ end
+
+ def build_response
+ TestResponse.new
+ end
+
included do
include ActionController::TemplateAssertions
include ActionDispatch::Assertions
diff --git a/actionpack/test/controller/live_stream_test.rb b/actionpack/test/controller/live_stream_test.rb
index 6ee6444065..5bb37a271d 100644
--- a/actionpack/test/controller/live_stream_test.rb
+++ b/actionpack/test/controller/live_stream_test.rb
@@ -1,16 +1,41 @@
require 'abstract_unit'
+require 'active_support/concurrency/latch'
module ActionController
- class StreamingResponseTest < ActionController::TestCase
+ class LiveStreamTest < ActionController::TestCase
class TestController < ActionController::Base
+ include ActionController::Live
+
+ attr_accessor :latch, :tc
+
def self.controller_path
'test'
end
def basic_stream
+ response.headers['Content-Type'] = 'text/event-stream'
+ %w{ hello world }.each do |word|
+ response.stream.write word
+ end
+ response.stream.close
+ end
+
+ def blocking_stream
+ response.headers['Content-Type'] = 'text/event-stream'
+ %w{ hello world }.each do |word|
+ response.stream.write word
+ latch.await
+ end
+ response.stream.close
+ end
+
+ def thread_locals
+ tc.assert_equal 'aaron', Thread.current[:setting]
+ tc.refute_equal Thread.current.object_id, Thread.current[:originating_thread]
+
+ response.headers['Content-Type'] = 'text/event-stream'
%w{ hello world }.each do |word|
response.stream.write word
- response.stream.write "\n"
end
response.stream.close
end
@@ -18,9 +43,50 @@ module ActionController
tests TestController
+ class TestResponse < Live::Response
+ def recycle!
+ initialize
+ end
+ end
+
+ def build_response
+ TestResponse.new
+ end
+
def test_write_to_stream
+ @controller = TestController.new
get :basic_stream
- assert_equal "hello\nworld\n", @response.body
+ assert_equal "helloworld", @response.body
+ assert_equal 'text/event-stream', @response.headers['Content-Type']
+ end
+
+ def test_async_stream
+ @controller.latch = ActiveSupport::Concurrency::Latch.new
+ parts = ['hello', 'world']
+
+ @controller.request = @request
+ @controller.response = @response
+
+ t = Thread.new(@response) { |resp|
+ resp.stream.each do |part|
+ assert_equal parts.shift, part
+ ol = @controller.latch
+ @controller.latch = ActiveSupport::Concurrency::Latch.new
+ ol.release
+ end
+ }
+
+ @controller.process :blocking_stream
+
+ assert t.join
+ end
+
+ def test_thread_locals_get_copied
+ @controller.tc = self
+ Thread.current[:originating_thread] = Thread.current.object_id
+ Thread.current[:setting] = 'aaron'
+
+ get :thread_locals
end
end
end