1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
|
require 'action_dispatch/http/response'
require 'delegate'
module ActionController
# Mix this module in to your controller, and all actions in that controller
# will be able to stream data to the client as it's written.
#
# class MyController < ActionController::Base
# include ActionController::Live
#
# def stream
# response.headers['Content-Type'] = 'text/event-stream'
# 100.times {
# response.stream.write "hello world\n"
# sleep 1
# }
# ensure
# response.stream.close
# end
# end
#
# There are a few caveats with this use. You *cannot* write headers after the
# response has been committed (Response#committed? will return truthy).
# Calling +write+ or +close+ on the response stream will cause the response
# object to be committed. Make sure all headers are set before calling write
# or close on your stream.
#
# You *must* call close on your stream when you're finished, otherwise the
# socket may be left open forever.
#
# The final caveat is that your actions are executed in a separate thread than
# the main thread. Make sure your actions are thread safe, and this shouldn't
# be a problem (don't share state across threads, etc).
module Live
class Buffer < ActionDispatch::Response::Buffer #:nodoc:
def initialize(response)
super(response, SizedQueue.new(10))
end
def write(string)
unless @response.committed?
@response.headers["Cache-Control"] = "no-cache"
@response.headers.delete "Content-Length"
end
super
end
def each
while str = @buf.pop
yield str
end
end
def close
super
@buf.push nil
end
end
class Response < ActionDispatch::Response #:nodoc: all
class Header < DelegateClass(Hash)
def initialize(response, header)
@response = response
super(header)
end
def []=(k,v)
if @response.committed?
raise ActionDispatch::IllegalStateError, 'header already sent'
end
super
end
def merge(other)
self.class.new @response, __getobj__.merge(other)
end
def to_hash
__getobj__.dup
end
end
def commit!
headers.freeze
super
end
private
def build_buffer(response, body)
buf = Live::Buffer.new response
body.each { |part| buf.write part }
buf
end
def merge_default_headers(original, default)
Header.new self, super
end
def handle_conditional_get!
super unless committed?
end
end
def process(name)
t1 = Thread.current
locals = t1.keys.map { |key| [key, t1[key]] }
# This processes the action in a child thread. It lets us return the
# response code and headers back up the rack stack, and still process
# the body in parallel with sending data to the client
Thread.new {
t2 = Thread.current
t2.abort_on_exception = true
# Since we're processing the view in a different thread, copy the
# thread locals from the main thread to the child thread. :'(
locals.each { |k,v| t2[k] = v }
begin
super(name)
ensure
@_response.commit!
end
}
@_response.await_commit
end
def response_body=(body)
super
response.stream.close if response
end
def set_response!(request)
if request.env["HTTP_VERSION"] == "HTTP/1.0"
super
else
@_response = Live::Response.new
@_response.request = request
end
end
end
end
|