aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable/connection/stream.rb
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable/lib/action_cable/connection/stream.rb')
-rw-r--r--actioncable/lib/action_cable/connection/stream.rb72
1 files changed, 66 insertions, 6 deletions
diff --git a/actioncable/lib/action_cable/connection/stream.rb b/actioncable/lib/action_cable/connection/stream.rb
index 0cf59091bc..e658948a55 100644
--- a/actioncable/lib/action_cable/connection/stream.rb
+++ b/actioncable/lib/action_cable/connection/stream.rb
@@ -1,3 +1,7 @@
+# frozen_string_literal: true
+
+require "thread"
+
module ActionCable
module Connection
#--
@@ -8,9 +12,13 @@ module ActionCable
def initialize(event_loop, socket)
@event_loop = event_loop
@socket_object = socket
- @stream_send = socket.env['stream.send']
+ @stream_send = socket.env["stream.send"]
@rack_hijack_io = nil
+ @write_lock = Mutex.new
+
+ @write_head = nil
+ @write_buffer = Queue.new
end
def each(&callback)
@@ -27,21 +35,73 @@ module ActionCable
end
def write(data)
- return @rack_hijack_io.write(data) if @rack_hijack_io
- return @stream_send.call(data) if @stream_send
+ if @stream_send
+ return @stream_send.call(data)
+ end
+
+ if @write_lock.try_lock
+ begin
+ if @write_head.nil? && @write_buffer.empty?
+ written = @rack_hijack_io.write_nonblock(data, exception: false)
+
+ case written
+ when :wait_writable
+ # proceed below
+ when data.bytesize
+ return data.bytesize
+ else
+ @write_head = data.byteslice(written, data.bytesize)
+ @event_loop.writes_pending @rack_hijack_io
+
+ return data.bytesize
+ end
+ end
+ ensure
+ @write_lock.unlock
+ end
+ end
+
+ @write_buffer << data
+ @event_loop.writes_pending @rack_hijack_io
+
+ data.bytesize
rescue EOFError, Errno::ECONNRESET
@socket_object.client_gone
end
+ def flush_write_buffer
+ @write_lock.synchronize do
+ loop do
+ if @write_head.nil?
+ return true if @write_buffer.empty?
+ @write_head = @write_buffer.pop
+ end
+
+ written = @rack_hijack_io.write_nonblock(@write_head, exception: false)
+ case written
+ when :wait_writable
+ return false
+ when @write_head.bytesize
+ @write_head = nil
+ else
+ @write_head = @write_head.byteslice(written, @write_head.bytesize)
+ return false
+ end
+ end
+ end
+ end
+
def receive(data)
@socket_object.parse(data)
end
def hijack_rack_socket
- return unless @socket_object.env['rack.hijack']
+ return unless @socket_object.env["rack.hijack"]
- @socket_object.env['rack.hijack'].call
- @rack_hijack_io = @socket_object.env['rack.hijack_io']
+ # This should return the underlying io according to the SPEC:
+ @rack_hijack_io = @socket_object.env["rack.hijack"].call
+ # Retain existing behaviour if required:
+ @rack_hijack_io ||= @socket_object.env["rack.hijack_io"]
@event_loop.attach(@rack_hijack_io, self)
end