aboutsummaryrefslogtreecommitdiffstats
path: root/actionpack
diff options
context:
space:
mode:
Diffstat (limited to 'actionpack')
-rw-r--r--actionpack/CHANGELOG.md16
-rw-r--r--actionpack/lib/action_controller.rb1
-rw-r--r--actionpack/lib/action_controller/metal/live.rb138
-rw-r--r--actionpack/lib/action_controller/test_case.rb12
-rw-r--r--actionpack/lib/action_dispatch.rb3
-rw-r--r--actionpack/lib/action_dispatch/http/cache.rb30
-rw-r--r--actionpack/lib/action_dispatch/http/response.rb28
-rw-r--r--actionpack/test/controller/live_stream_test.rb120
-rw-r--r--actionpack/test/dispatch/live_response_test.rb66
9 files changed, 389 insertions, 25 deletions
diff --git a/actionpack/CHANGELOG.md b/actionpack/CHANGELOG.md
index ec85f67e58..d5326e3d0b 100644
--- a/actionpack/CHANGELOG.md
+++ b/actionpack/CHANGELOG.md
@@ -1,5 +1,21 @@
## Rails 4.0.0 (unreleased) ##
+* Added ActionController::Live. Mix it in to your controller and you can
+ stream data to the client live. For example:
+
+ class FooController < ActionController::Base
+ include ActionController::Live
+
+ def index
+ 100.times {
+ # Client will see this as it's written
+ response.stream.write "hello world\n"
+ sleep 1
+ }
+ response.stream.close
+ end
+ end
+
* Remove ActionDispatch::Head middleware in favor of Rack::Head. *Santiago Pastorino*
* Deprecate `:confirm` in favor of `:data => { :confirm => "Text" }` option for `button_to`, `button_tag`, `image_submit_tag`, `link_to` and `submit_tag` helpers.
diff --git a/actionpack/lib/action_controller.rb b/actionpack/lib/action_controller.rb
index 7c10fcbb8a..1d06c83338 100644
--- a/actionpack/lib/action_controller.rb
+++ b/actionpack/lib/action_controller.rb
@@ -1,5 +1,6 @@
require 'abstract_controller'
require 'action_dispatch'
+require 'action_controller/metal/live'
module ActionController
extend ActiveSupport::Autoload
diff --git a/actionpack/lib/action_controller/metal/live.rb b/actionpack/lib/action_controller/metal/live.rb
new file mode 100644
index 0000000000..e6ada0c106
--- /dev/null
+++ b/actionpack/lib/action_controller/metal/live.rb
@@ -0,0 +1,138 @@
+require 'action_dispatch/http/response'
+require 'delegate'
+
+module ActionController
+ # Mix this module in to your controller, and all actions in that controller
+ # will be able to stream data to the client as it's written.
+ #
+ # class MyController < ActionController::Base
+ # include ActionController::Live
+ #
+ # def stream
+ # response.headers['Content-Type'] = 'text/event-stream'
+ # 100.times {
+ # response.stream.write "hello world\n"
+ # sleep 1
+ # }
+ # response.stream.close
+ # end
+ # end
+ #
+ # There are a few caveats with this use. You *cannot* write headers after the
+ # response has been committed (Response#committed? will return truthy).
+ # Calling +write+ or +close+ on the response stream will cause the response
+ # object to be committed. Make sure all headers are set before calling write
+ # or close on your stream.
+ #
+ # You *must* call close on your stream when you're finished, otherwise the
+ # socket may be left open forever.
+ #
+ # The final caveat is that your actions are executed in a separate thread than
+ # the main thread. Make sure your actions are thread safe, and this shouldn't
+ # be a problem (don't share state across threads, etc).
+ module Live
+ class Buffer < ActionDispatch::Response::Buffer #:nodoc:
+ def initialize(response)
+ super(response, Queue.new)
+ end
+
+ def write(string)
+ unless @response.committed?
+ @response.headers["Cache-Control"] = "no-cache"
+ @response.headers.delete("Content-Length")
+ end
+
+ super
+ end
+
+ def each
+ while str = @buf.pop
+ yield str
+ end
+ end
+
+ def close
+ super
+ @buf.push nil
+ end
+ end
+
+ class Response < ActionDispatch::Response #:nodoc: all
+ class Header < DelegateClass(Hash)
+ def initialize(response, header)
+ @response = response
+ super(header)
+ end
+
+ def []=(k,v)
+ if @response.committed?
+ raise ActionDispatch::IllegalStateError, 'header already sent'
+ end
+
+ super
+ end
+
+ def to_hash
+ __getobj__.dup
+ end
+ end
+
+ def initialize(status = 200, header = {}, body = [])
+ header = Header.new self, header
+ super(status, header, body)
+ end
+
+ def commit!
+ headers.freeze
+ super
+ end
+
+ private
+
+ def build_buffer(response, body)
+ buf = Live::Buffer.new response
+ body.each { |part| buf.write part }
+ buf
+ end
+ end
+
+ def process(name)
+ t1 = Thread.current
+ locals = t1.keys.map { |key| [key, t1[key]] }
+
+ # This processes the action in a child thread. It lets us return the
+ # response code and headers back up the rack stack, and still process
+ # the body in parallel with sending data to the client
+ Thread.new {
+ t2 = Thread.current
+ t2.abort_on_exception = true
+
+ # Since we're processing the view in a different thread, copy the
+ # thread locals from the main thread to the child thread. :'(
+ locals.each { |k,v| t2[k] = v }
+
+ begin
+ super(name)
+ ensure
+ @_response.commit!
+ end
+ }
+
+ @_response.await_commit
+ end
+
+ def response_body=(body)
+ super
+ response.stream.close if response
+ end
+
+ def set_response!(request)
+ if request.env["HTTP_VERSION"] == "HTTP/1.0"
+ super
+ else
+ @_response = Live::Response.new
+ @_response.request = request
+ end
+ end
+ end
+end
diff --git a/actionpack/lib/action_controller/test_case.rb b/actionpack/lib/action_controller/test_case.rb
index c4c825ba6b..ca1ecc43a1 100644
--- a/actionpack/lib/action_controller/test_case.rb
+++ b/actionpack/lib/action_controller/test_case.rb
@@ -517,8 +517,8 @@ module ActionController
end
def setup_controller_request_and_response
- @request = TestRequest.new
- @response = TestResponse.new
+ @request = build_request
+ @response = build_response
@response.request = @request
@controller = nil unless defined? @controller
@@ -539,6 +539,14 @@ module ActionController
end
end
+ def build_request
+ TestRequest.new
+ end
+
+ def build_response
+ TestResponse.new
+ end
+
included do
include ActionController::TemplateAssertions
include ActionDispatch::Assertions
diff --git a/actionpack/lib/action_dispatch.rb b/actionpack/lib/action_dispatch.rb
index 1e4ac70f3d..c259b865cc 100644
--- a/actionpack/lib/action_dispatch.rb
+++ b/actionpack/lib/action_dispatch.rb
@@ -36,6 +36,9 @@ end
module ActionDispatch
extend ActiveSupport::Autoload
+ class IllegalStateError < StandardError
+ end
+
autoload_under 'http' do
autoload :Request
autoload :Response
diff --git a/actionpack/lib/action_dispatch/http/cache.rb b/actionpack/lib/action_dispatch/http/cache.rb
index 0b895e7860..647a9e3c19 100644
--- a/actionpack/lib/action_dispatch/http/cache.rb
+++ b/actionpack/lib/action_dispatch/http/cache.rb
@@ -86,21 +86,29 @@ module ActionDispatch
CACHE_CONTROL = "Cache-Control".freeze
SPESHUL_KEYS = %w[extras no-cache max-age public must-revalidate]
+ def cache_control_segments
+ if cache_control = self[CACHE_CONTROL]
+ cache_control.delete(' ').split(',')
+ else
+ []
+ end
+ end
+
def cache_control_headers
cache_control = {}
- if cc = self[CACHE_CONTROL]
- cc.delete(' ').split(',').each do |segment|
- directive, argument = segment.split('=', 2)
- case directive
- when *SPESHUL_KEYS
- key = directive.tr('-', '_')
- cache_control[key.to_sym] = argument || true
- else
- cache_control[:extras] ||= []
- cache_control[:extras] << segment
- end
+
+ cache_control_segments.each do |segment|
+ directive, argument = segment.split('=', 2)
+
+ if SPESHUL_KEYS.include? directive
+ key = directive.tr('-', '_')
+ cache_control[key.to_sym] = argument || true
+ else
+ cache_control[:extras] ||= []
+ cache_control[:extras] << segment
end
end
+
cache_control
end
diff --git a/actionpack/lib/action_dispatch/http/response.rb b/actionpack/lib/action_dispatch/http/response.rb
index fa48594c93..17e74656af 100644
--- a/actionpack/lib/action_dispatch/http/response.rb
+++ b/actionpack/lib/action_dispatch/http/response.rb
@@ -216,17 +216,7 @@ module ActionDispatch # :nodoc:
end
def to_a
- assign_default_content_type_and_charset!
- handle_conditional_get!
-
- @header[SET_COOKIE] = @header[SET_COOKIE].join("\n") if @header[SET_COOKIE].respond_to?(:join)
-
- if [204, 304].include?(@status)
- @header.delete CONTENT_TYPE
- [@status, @header, []]
- else
- [@status, @header, self]
- end
+ rack_response @status, @header.to_hash
end
alias prepare! to_a
alias to_ary to_a # For implicit splat on 1.9.2
@@ -258,7 +248,7 @@ module ActionDispatch # :nodoc:
body.respond_to?(:each) ? body : [body]
end
- def assign_default_content_type_and_charset!
+ def assign_default_content_type_and_charset!(headers)
return if headers[CONTENT_TYPE].present?
@content_type ||= Mime::HTML
@@ -269,5 +259,19 @@ module ActionDispatch # :nodoc:
headers[CONTENT_TYPE] = type
end
+
+ def rack_response(status, header)
+ assign_default_content_type_and_charset!(header)
+ handle_conditional_get!
+
+ header[SET_COOKIE] = header[SET_COOKIE].join("\n") if header[SET_COOKIE].respond_to?(:join)
+
+ if [204, 304].include?(@status)
+ header.delete CONTENT_TYPE
+ [status, header, []]
+ else
+ [status, header, self]
+ end
+ end
end
end
diff --git a/actionpack/test/controller/live_stream_test.rb b/actionpack/test/controller/live_stream_test.rb
new file mode 100644
index 0000000000..5ce1acccf1
--- /dev/null
+++ b/actionpack/test/controller/live_stream_test.rb
@@ -0,0 +1,120 @@
+require 'abstract_unit'
+require 'active_support/concurrency/latch'
+
+module ActionController
+ class LiveStreamTest < ActionController::TestCase
+ class TestController < ActionController::Base
+ include ActionController::Live
+
+ attr_accessor :latch, :tc
+
+ def self.controller_path
+ 'test'
+ end
+
+ def render_text
+ render :text => 'zomg'
+ end
+
+ def default_header
+ response.stream.write "<html><body>hi</body></html>"
+ response.stream.close
+ end
+
+ def basic_stream
+ response.headers['Content-Type'] = 'text/event-stream'
+ %w{ hello world }.each do |word|
+ response.stream.write word
+ end
+ response.stream.close
+ end
+
+ def blocking_stream
+ response.headers['Content-Type'] = 'text/event-stream'
+ %w{ hello world }.each do |word|
+ response.stream.write word
+ latch.await
+ end
+ response.stream.close
+ end
+
+ def thread_locals
+ tc.assert_equal 'aaron', Thread.current[:setting]
+ tc.refute_equal Thread.current.object_id, Thread.current[:originating_thread]
+
+ response.headers['Content-Type'] = 'text/event-stream'
+ %w{ hello world }.each do |word|
+ response.stream.write word
+ end
+ response.stream.close
+ end
+ end
+
+ tests TestController
+
+ class TestResponse < Live::Response
+ def recycle!
+ initialize
+ end
+ end
+
+ def build_response
+ TestResponse.new
+ end
+
+ def test_set_response!
+ @controller.set_response!(@request)
+ assert_kind_of(Live::Response, @controller.response)
+ assert_equal @request, @controller.response.request
+ end
+
+ def test_write_to_stream
+ @controller = TestController.new
+ get :basic_stream
+ assert_equal "helloworld", @response.body
+ assert_equal 'text/event-stream', @response.headers['Content-Type']
+ end
+
+ def test_async_stream
+ @controller.latch = ActiveSupport::Concurrency::Latch.new
+ parts = ['hello', 'world']
+
+ @controller.request = @request
+ @controller.response = @response
+
+ t = Thread.new(@response) { |resp|
+ resp.stream.each do |part|
+ assert_equal parts.shift, part
+ ol = @controller.latch
+ @controller.latch = ActiveSupport::Concurrency::Latch.new
+ ol.release
+ end
+ }
+
+ @controller.process :blocking_stream
+
+ assert t.join
+ end
+
+ def test_thread_locals_get_copied
+ @controller.tc = self
+ Thread.current[:originating_thread] = Thread.current.object_id
+ Thread.current[:setting] = 'aaron'
+
+ get :thread_locals
+ end
+
+ def test_live_stream_default_header
+ @controller.request = @request
+ @controller.response = @response
+ @controller.process :default_header
+ _, headers, _ = @response.prepare!
+ assert headers['Content-Type']
+ end
+
+ def test_render_text
+ get :render_text
+ assert @response.stream.closed?, 'stream should be closed'
+ end
+ end
+end
diff --git a/actionpack/test/dispatch/live_response_test.rb b/actionpack/test/dispatch/live_response_test.rb
new file mode 100644
index 0000000000..87a6b1383d
--- /dev/null
+++ b/actionpack/test/dispatch/live_response_test.rb
@@ -0,0 +1,66 @@
+require 'abstract_unit'
+require 'active_support/concurrency/latch'
+
+module ActionController
+ module Live
+ class ResponseTest < ActiveSupport::TestCase
+ def setup
+ @response = Live::Response.new
+ end
+
+ def test_parallel
+ latch = ActiveSupport::Concurrency::Latch.new
+
+ t = Thread.new {
+ @response.stream.write 'foo'
+ latch.await
+ @response.stream.close
+ }
+
+ @response.each do |part|
+ assert_equal 'foo', part
+ latch.release
+ end
+ assert t.join
+ end
+
+ def test_setting_body_populates_buffer
+ @response.body = 'omg'
+ @response.close
+ assert_equal ['omg'], @response.body_parts
+ end
+
+ def test_cache_control_is_set
+ @response.stream.write 'omg'
+ assert_equal 'no-cache', @response.headers['Cache-Control']
+ end
+
+ def test_content_length_is_removed
+ @response.headers['Content-Length'] = "1234"
+ @response.stream.write 'omg'
+ assert_nil @response.headers['Content-Length']
+ end
+
+ def test_headers_cannot_be_written_after_write
+ @response.stream.write 'omg'
+
+ assert @response.headers.frozen?
+ e = assert_raises(ActionDispatch::IllegalStateError) do
+ @response.headers['Content-Length'] = "zomg"
+ end
+
+ assert_equal 'header already sent', e.message
+ end
+
+ def test_headers_cannot_be_written_after_close
+ @response.stream.close
+
+ assert @response.headers.frozen?
+ e = assert_raises(ActionDispatch::IllegalStateError) do
+ @response.headers['Content-Length'] = "zomg"
+ end
+ assert_equal 'header already sent', e.message
+ end
+ end
+ end
+end