aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--actionpack/CHANGELOG.md16
-rw-r--r--actionpack/lib/action_controller.rb1
-rw-r--r--actionpack/lib/action_controller/metal/live.rb138
-rw-r--r--actionpack/lib/action_controller/test_case.rb12
-rw-r--r--actionpack/lib/action_dispatch.rb3
-rw-r--r--actionpack/lib/action_dispatch/http/response.rb48
-rw-r--r--actionpack/lib/action_view/helpers/capture_helper.rb2
-rw-r--r--actionpack/test/controller/live_stream_test.rb120
-rw-r--r--actionpack/test/controller/send_file_test.rb6
-rw-r--r--actionpack/test/dispatch/live_response_test.rb66
-rw-r--r--activesupport/lib/active_support/concurrency/latch.rb27
11 files changed, 413 insertions, 26 deletions
diff --git a/actionpack/CHANGELOG.md b/actionpack/CHANGELOG.md
index ec85f67e58..d5326e3d0b 100644
--- a/actionpack/CHANGELOG.md
+++ b/actionpack/CHANGELOG.md
@@ -1,5 +1,21 @@
## Rails 4.0.0 (unreleased) ##
+* Added ActionController::Live. Mix it in to your controller and you can
+ stream data to the client live. For example:
+
+ class FooController < ActionController::Base
+ include ActionController::Live
+
+ def index
+ 100.times {
+ # Client will see this as it's written
+ response.stream.write "hello world\n"
+ sleep 1
+ }
+ response.stream.close
+ end
+ end
+
* Remove ActionDispatch::Head middleware in favor of Rack::Head. *Santiago Pastorino*
* Deprecate `:confirm` in favor of `:data => { :confirm => "Text" }` option for `button_to`, `button_tag`, `image_submit_tag`, `link_to` and `submit_tag` helpers.
diff --git a/actionpack/lib/action_controller.rb b/actionpack/lib/action_controller.rb
index 7c10fcbb8a..1d06c83338 100644
--- a/actionpack/lib/action_controller.rb
+++ b/actionpack/lib/action_controller.rb
@@ -1,5 +1,6 @@
require 'abstract_controller'
require 'action_dispatch'
+require 'action_controller/metal/live'
module ActionController
extend ActiveSupport::Autoload
diff --git a/actionpack/lib/action_controller/metal/live.rb b/actionpack/lib/action_controller/metal/live.rb
new file mode 100644
index 0000000000..4674f0348d
--- /dev/null
+++ b/actionpack/lib/action_controller/metal/live.rb
@@ -0,0 +1,138 @@
+require 'action_dispatch/http/response'
+require 'delegate'
+
+module ActionController # :nodoc:
+ # Mix this module in to your controller, and all actions in that controller
+ # will be able to stream data to the client as it's written. For example:
+ #
+ # class MyController < ActionController::Base
+ # include ActionController::Live
+ #
+ # def stream
+ # response.headers['Content-Type'] = 'text/event-stream'
+ # 100.times {
+ # response.stream.write "hello world\n"
+ # sleep 1
+ # }
+ # response.stream.close
+ # end
+ # end
+ #
+ # There are a few caveats with this use. You *cannot* write headers after the
+ # response has been committed (Response#committed? will return truthy).
+ # Calling +write+ or +close+ on the response stream will cause the response
+ # object to be committed. Make sure all headers are set before calling write
+ # or close on your stream.
+ #
+ # You *must* call close on your stream when you're finished, otherwise the
+ # socket may be left open forever.
+ #
+ # The final caveat is that you actions are executed in a separate thread than
+ # the main thread. Make sure your actions are thread safe, and this shouldn't
+ # be a problem (don't share state across threads, etc).
+ module Live
+ class Buffer < ActionDispatch::Response::Buffer # :nodoc:
+ def initialize(response)
+ super(response, Queue.new)
+ end
+
+ def write(string)
+ unless @response.committed?
+ @response.headers["Cache-Control"] = "no-cache"
+ @response.headers.delete("Content-Length")
+ end
+
+ super
+ end
+
+ def each
+ while str = @buf.pop
+ yield str
+ end
+ end
+
+ def close
+ super
+ @buf.push nil
+ end
+ end
+
+ class Response < ActionDispatch::Response # :nodoc:
+ class Header < DelegateClass(Hash) # :nodoc:
+ def initialize(response, header)
+ @response = response
+ super(header)
+ end
+
+ def []=(k,v)
+ if @response.committed?
+ raise ActionDispatch::IllegalStateError, 'header already sent'
+ end
+
+ super
+ end
+
+ def to_hash
+ __getobj__.dup
+ end
+ end
+
+ def initialize(status = 200, header = {}, body = [])
+ header = Header.new self, header
+ super(status, header, body)
+ end
+
+ def commit!
+ headers.freeze
+ super
+ end
+
+ private
+
+ def build_buffer(response, body)
+ buf = Live::Buffer.new response
+ body.each { |part| buf.write part }
+ buf
+ end
+ end
+
+ def process(name)
+ t1 = Thread.current
+ locals = t1.keys.map { |key| [key, t1[key]] }
+
+ # This processes the action in a child thread. It lets us return the
+ # response code and headers back up the rack stack, and still process
+ # the body in parallel with sending data to the client
+ Thread.new {
+ t2 = Thread.current
+ t2.abort_on_exception = true
+
+ # Since we're processing the view in a different thread, copy the
+ # thread locals from the main thread to the child thread. :'(
+ locals.each { |k,v| t2[k] = v }
+
+ begin
+ super(name)
+ ensure
+ @_response.commit!
+ end
+ }
+
+ @_response.await_commit
+ end
+
+ def response_body=(body)
+ super
+ response.stream.close if response
+ end
+
+ def set_response!(request)
+ if request.env["HTTP_VERSION"] == "HTTP/1.0"
+ super
+ else
+ @_response = Live::Response.new
+ @_response.request = request
+ end
+ end
+ end
+end
diff --git a/actionpack/lib/action_controller/test_case.rb b/actionpack/lib/action_controller/test_case.rb
index c4c825ba6b..ca1ecc43a1 100644
--- a/actionpack/lib/action_controller/test_case.rb
+++ b/actionpack/lib/action_controller/test_case.rb
@@ -517,8 +517,8 @@ module ActionController
end
def setup_controller_request_and_response
- @request = TestRequest.new
- @response = TestResponse.new
+ @request = build_request
+ @response = build_response
@response.request = @request
@controller = nil unless defined? @controller
@@ -539,6 +539,14 @@ module ActionController
end
end
+ def build_request
+ TestRequest.new
+ end
+
+ def build_response
+ TestResponse.new
+ end
+
included do
include ActionController::TemplateAssertions
include ActionDispatch::Assertions
diff --git a/actionpack/lib/action_dispatch.rb b/actionpack/lib/action_dispatch.rb
index 1e4ac70f3d..c259b865cc 100644
--- a/actionpack/lib/action_dispatch.rb
+++ b/actionpack/lib/action_dispatch.rb
@@ -36,6 +36,9 @@ end
module ActionDispatch
extend ActiveSupport::Autoload
+ class IllegalStateError < StandardError
+ end
+
autoload_under 'http' do
autoload :Request
autoload :Response
diff --git a/actionpack/lib/action_dispatch/http/response.rb b/actionpack/lib/action_dispatch/http/response.rb
index 912463bc0e..17e74656af 100644
--- a/actionpack/lib/action_dispatch/http/response.rb
+++ b/actionpack/lib/action_dispatch/http/response.rb
@@ -42,7 +42,7 @@ module ActionDispatch # :nodoc:
alias_method :headers, :header
delegate :[], :[]=, :to => :@header
- delegate :each, :to => :@body
+ delegate :each, :to => :@stream
# Sets the HTTP response's content MIME type. For example, in the controller
# you could write this:
@@ -106,8 +106,6 @@ module ActionDispatch # :nodoc:
@committed = false
@content_type = nil
@charset = nil
- @stream = build_buffer self, @body
-
if content_type = self[CONTENT_TYPE]
type, charset = content_type.split(/;\s*charset=/)
@@ -162,14 +160,14 @@ module ActionDispatch # :nodoc:
def respond_to?(method)
if method.to_sym == :to_path
- @body.respond_to?(:to_path)
+ stream.respond_to?(:to_path)
else
super
end
end
def to_path
- @body.to_path
+ stream.to_path
end
def body
@@ -183,11 +181,17 @@ module ActionDispatch # :nodoc:
def body=(body)
@blank = true if body == EMPTY
- @body = munge_body_object(body)
+ if body.respond_to?(:to_path)
+ @stream = body
+ else
+ @stream = build_buffer self, munge_body_object(body)
+ end
end
def body_parts
- @body
+ parts = []
+ @stream.each { |x| parts << x }
+ parts
end
def set_cookie(key, value)
@@ -208,21 +212,11 @@ module ActionDispatch # :nodoc:
end
def close
- @body.close if @body.respond_to?(:close)
+ stream.close if stream.respond_to?(:close)
end
def to_a
- assign_default_content_type_and_charset!
- handle_conditional_get!
-
- @header[SET_COOKIE] = @header[SET_COOKIE].join("\n") if @header[SET_COOKIE].respond_to?(:join)
-
- if [204, 304].include?(@status)
- @header.delete CONTENT_TYPE
- [@status, @header, []]
- else
- [@status, @header, self]
- end
+ rack_response @status, @header.to_hash
end
alias prepare! to_a
alias to_ary to_a # For implicit splat on 1.9.2
@@ -254,7 +248,7 @@ module ActionDispatch # :nodoc:
body.respond_to?(:each) ? body : [body]
end
- def assign_default_content_type_and_charset!
+ def assign_default_content_type_and_charset!(headers)
return if headers[CONTENT_TYPE].present?
@content_type ||= Mime::HTML
@@ -265,5 +259,19 @@ module ActionDispatch # :nodoc:
headers[CONTENT_TYPE] = type
end
+
+ def rack_response(status, header)
+ assign_default_content_type_and_charset!(header)
+ handle_conditional_get!
+
+ header[SET_COOKIE] = header[SET_COOKIE].join("\n") if header[SET_COOKIE].respond_to?(:join)
+
+ if [204, 304].include?(@status)
+ header.delete CONTENT_TYPE
+ [status, header, []]
+ else
+ [status, header, self]
+ end
+ end
end
end
diff --git a/actionpack/lib/action_view/helpers/capture_helper.rb b/actionpack/lib/action_view/helpers/capture_helper.rb
index 9186855319..651d6e4d16 100644
--- a/actionpack/lib/action_view/helpers/capture_helper.rb
+++ b/actionpack/lib/action_view/helpers/capture_helper.rb
@@ -213,7 +213,7 @@ module ActionView
# Add the output buffer to the response body and start a new one.
def flush_output_buffer #:nodoc:
if output_buffer && !output_buffer.empty?
- response.body_parts << output_buffer
+ response.stream.write output_buffer
self.output_buffer = output_buffer.respond_to?(:clone_empty) ? output_buffer.clone_empty : output_buffer[0, 0]
nil
end
diff --git a/actionpack/test/controller/live_stream_test.rb b/actionpack/test/controller/live_stream_test.rb
new file mode 100644
index 0000000000..5ce1acccf1
--- /dev/null
+++ b/actionpack/test/controller/live_stream_test.rb
@@ -0,0 +1,120 @@
+require 'abstract_unit'
+require 'active_support/concurrency/latch'
+
+module ActionController
+ class LiveStreamTest < ActionController::TestCase
+ class TestController < ActionController::Base
+ include ActionController::Live
+
+ attr_accessor :latch, :tc
+
+ def self.controller_path
+ 'test'
+ end
+
+ def render_text
+ render :text => 'zomg'
+ end
+
+ def default_header
+ response.stream.write "<html><body>hi</body></html>"
+ response.stream.close
+ end
+
+ def basic_stream
+ response.headers['Content-Type'] = 'text/event-stream'
+ %w{ hello world }.each do |word|
+ response.stream.write word
+ end
+ response.stream.close
+ end
+
+ def blocking_stream
+ response.headers['Content-Type'] = 'text/event-stream'
+ %w{ hello world }.each do |word|
+ response.stream.write word
+ latch.await
+ end
+ response.stream.close
+ end
+
+ def thread_locals
+ tc.assert_equal 'aaron', Thread.current[:setting]
+ tc.refute_equal Thread.current.object_id, Thread.current[:originating_thread]
+
+ response.headers['Content-Type'] = 'text/event-stream'
+ %w{ hello world }.each do |word|
+ response.stream.write word
+ end
+ response.stream.close
+ end
+ end
+
+ tests TestController
+
+ class TestResponse < Live::Response
+ def recycle!
+ initialize
+ end
+ end
+
+ def build_response
+ TestResponse.new
+ end
+
+ def test_set_response!
+ @controller.set_response!(@request)
+ assert_kind_of(Live::Response, @controller.response)
+ assert_equal @request, @controller.response.request
+ end
+
+ def test_write_to_stream
+ @controller = TestController.new
+ get :basic_stream
+ assert_equal "helloworld", @response.body
+ assert_equal 'text/event-stream', @response.headers['Content-Type']
+ end
+
+ def test_async_stream
+ @controller.latch = ActiveSupport::Concurrency::Latch.new
+ parts = ['hello', 'world']
+
+ @controller.request = @request
+ @controller.response = @response
+
+ t = Thread.new(@response) { |resp|
+ resp.stream.each do |part|
+ assert_equal parts.shift, part
+ ol = @controller.latch
+ @controller.latch = ActiveSupport::Concurrency::Latch.new
+ ol.release
+ end
+ }
+
+ @controller.process :blocking_stream
+
+ assert t.join
+ end
+
+ def test_thread_locals_get_copied
+ @controller.tc = self
+ Thread.current[:originating_thread] = Thread.current.object_id
+ Thread.current[:setting] = 'aaron'
+
+ get :thread_locals
+ end
+
+ def test_live_stream_default_header
+ @controller.request = @request
+ @controller.response = @response
+ @controller.process :default_header
+ _, headers, _ = @response.prepare!
+ assert headers['Content-Type']
+ end
+
+ def test_render_text
+ get :render_text
+ assert @response.stream.closed?, 'stream should be closed'
+ end
+ end
+end
diff --git a/actionpack/test/controller/send_file_test.rb b/actionpack/test/controller/send_file_test.rb
index 6fc3556e31..97ede35317 100644
--- a/actionpack/test/controller/send_file_test.rb
+++ b/actionpack/test/controller/send_file_test.rb
@@ -51,14 +51,14 @@ class SendFileTest < ActionController::TestCase
response = nil
assert_nothing_raised { response = process('file') }
assert_not_nil response
- assert_respond_to response.body_parts, :each
- assert_respond_to response.body_parts, :to_path
+ assert_respond_to response.stream, :each
+ assert_respond_to response.stream, :to_path
require 'stringio'
output = StringIO.new
output.binmode
output.string.force_encoding(file_data.encoding)
- assert_nothing_raised { response.body_parts.each { |part| output << part.to_s } }
+ response.body_parts.each { |part| output << part.to_s }
assert_equal file_data, output.string
end
diff --git a/actionpack/test/dispatch/live_response_test.rb b/actionpack/test/dispatch/live_response_test.rb
new file mode 100644
index 0000000000..87a6b1383d
--- /dev/null
+++ b/actionpack/test/dispatch/live_response_test.rb
@@ -0,0 +1,66 @@
+require 'abstract_unit'
+require 'active_support/concurrency/latch'
+
+module ActionController
+ module Live
+ class ResponseTest < ActiveSupport::TestCase
+ def setup
+ @response = Live::Response.new
+ end
+
+ def test_parallel
+ latch = ActiveSupport::Concurrency::Latch.new
+
+ t = Thread.new {
+ @response.stream.write 'foo'
+ latch.await
+ @response.stream.close
+ }
+
+ @response.each do |part|
+ assert_equal 'foo', part
+ latch.release
+ end
+ assert t.join
+ end
+
+ def test_setting_body_populates_buffer
+ @response.body = 'omg'
+ @response.close
+ assert_equal ['omg'], @response.body_parts
+ end
+
+ def test_cache_control_is_set
+ @response.stream.write 'omg'
+ assert_equal 'no-cache', @response.headers['Cache-Control']
+ end
+
+ def test_content_length_is_removed
+ @response.headers['Content-Length'] = "1234"
+ @response.stream.write 'omg'
+ assert_nil @response.headers['Content-Length']
+ end
+
+ def test_headers_cannot_be_written_after_write
+ @response.stream.write 'omg'
+
+ assert @response.headers.frozen?
+ e = assert_raises(ActionDispatch::IllegalStateError) do
+ @response.headers['Content-Length'] = "zomg"
+ end
+
+ assert_equal 'header already sent', e.message
+ end
+
+ def test_headers_cannot_be_written_after_close
+ @response.stream.close
+
+ assert @response.headers.frozen?
+ e = assert_raises(ActionDispatch::IllegalStateError) do
+ @response.headers['Content-Length'] = "zomg"
+ end
+ assert_equal 'header already sent', e.message
+ end
+ end
+ end
+end
diff --git a/activesupport/lib/active_support/concurrency/latch.rb b/activesupport/lib/active_support/concurrency/latch.rb
new file mode 100644
index 0000000000..1507de433e
--- /dev/null
+++ b/activesupport/lib/active_support/concurrency/latch.rb
@@ -0,0 +1,27 @@
+require 'thread'
+require 'monitor'
+
+module ActiveSupport
+ module Concurrency
+ class Latch
+ def initialize(count = 1)
+ @count = count
+ @lock = Monitor.new
+ @cv = @lock.new_cond
+ end
+
+ def release
+ @lock.synchronize do
+ @count -= 1 if @count > 0
+ @cv.broadcast if @count.zero?
+ end
+ end
+
+ def await
+ @lock.synchronize do
+ @cv.wait_while { @count > 0 }
+ end
+ end
+ end
+ end
+end