aboutsummaryrefslogtreecommitdiffstats
path: root/actionpack/lib/action_controller/metal/live.rb
blob: b0fe0d80ab90ec06ffbb6ce0917ff36339f94846 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
require 'action_dispatch/http/response'
require 'delegate'

module ActionController # :nodoc:
  # Mix this module in to your controller, and all actions in that controller
  # will be able to stream data to the client as it's written.  For example:
  #
  #   class MyController < ActionController::Base
  #     include ActionController::Live
  #
  #     def stream
  #       response.headers['Content-Type'] = 'text/event-stream'
  #       100.times {
  #         response.stream.write "hello world\n"
  #         sleep 1
  #       }
  #       response.stream.close
  #     end
  #   end
  #
  # There are a few caveats with this use.  You *cannot* write headers after the
  # response has been committed (Response#committed? will return truthy).
  # Calling +write+ or +close+ on the response stream will cause the response
  # object to be committed.  Make sure all headers are set before calling write
  # or close on your stream.
  #
  # The other caveat is that you actions are executed in a separate thread than
  # the main thread.  Make sure your actions are thread safe, and this shouldn't
  # be a problem (don't share state across threads, etc).
  module Live
    class Buffer < ActionDispatch::Response::Buffer # :nodoc:
      def initialize(response)
        super(response, Queue.new)
      end

      def write(string)
        unless @response.committed?
          @response.headers["Cache-Control"] = "no-cache"
          @response.headers.delete("Content-Length")
        end

        super
      end

      def each
        while str = @buf.pop
          yield str
        end
      end

      def close
        super
        @buf.push nil
      end
    end

    class Response < ActionDispatch::Response # :nodoc:
      class Header < DelegateClass(Hash) # :nodoc:
        def initialize(response, header)
          @response = response
          super(header)
        end

        def []=(k,v)
          if @response.committed?
            raise ActionDispatch::IllegalStateError, 'header already sent'
          end

          super
        end
      end

      def initialize(status = 200, header = {}, body = [])
        header = Header.new self, header
        super(status, header, body)
      end

      private

      def build_buffer(response, body)
        buf = Live::Buffer.new response
        body.each { |part| buf.write part }
        buf
      end
    end

    def process(name)
      t1 = Thread.current
      locals = t1.keys.map { |key| [key, t1[key]] }

      # This processes the action in a child thread.  It lets us return the
      # response code and headers back up the rack stack, and still process
      # the body in parallel with sending data to the client
      Thread.new {
        t2 = Thread.current
        t2.abort_on_exception = true

        # Since we're processing the view in a different thread, copy the
        # thread locals from the main thread to the child thread. :'(
        locals.each { |k,v| t2[k] = v }

        begin
          super(name)
        ensure
          @_response.commit!
        end
      }

      @_response.await_commit
    end
  end
end