diff options
Diffstat (limited to 'actioncable/lib/action_cable/connection/stream.rb')
-rw-r--r-- | actioncable/lib/action_cable/connection/stream.rb | 72 |
1 files changed, 66 insertions, 6 deletions
diff --git a/actioncable/lib/action_cable/connection/stream.rb b/actioncable/lib/action_cable/connection/stream.rb index 0cf59091bc..e658948a55 100644 --- a/actioncable/lib/action_cable/connection/stream.rb +++ b/actioncable/lib/action_cable/connection/stream.rb @@ -1,3 +1,7 @@ +# frozen_string_literal: true + +require "thread" + module ActionCable module Connection #-- @@ -8,9 +12,13 @@ module ActionCable def initialize(event_loop, socket) @event_loop = event_loop @socket_object = socket - @stream_send = socket.env['stream.send'] + @stream_send = socket.env["stream.send"] @rack_hijack_io = nil + @write_lock = Mutex.new + + @write_head = nil + @write_buffer = Queue.new end def each(&callback) @@ -27,21 +35,73 @@ module ActionCable end def write(data) - return @rack_hijack_io.write(data) if @rack_hijack_io - return @stream_send.call(data) if @stream_send + if @stream_send + return @stream_send.call(data) + end + + if @write_lock.try_lock + begin + if @write_head.nil? && @write_buffer.empty? + written = @rack_hijack_io.write_nonblock(data, exception: false) + + case written + when :wait_writable + # proceed below + when data.bytesize + return data.bytesize + else + @write_head = data.byteslice(written, data.bytesize) + @event_loop.writes_pending @rack_hijack_io + + return data.bytesize + end + end + ensure + @write_lock.unlock + end + end + + @write_buffer << data + @event_loop.writes_pending @rack_hijack_io + + data.bytesize rescue EOFError, Errno::ECONNRESET @socket_object.client_gone end + def flush_write_buffer + @write_lock.synchronize do + loop do + if @write_head.nil? + return true if @write_buffer.empty? + @write_head = @write_buffer.pop + end + + written = @rack_hijack_io.write_nonblock(@write_head, exception: false) + case written + when :wait_writable + return false + when @write_head.bytesize + @write_head = nil + else + @write_head = @write_head.byteslice(written, @write_head.bytesize) + return false + end + end + end + end + def receive(data) @socket_object.parse(data) end def hijack_rack_socket - return unless @socket_object.env['rack.hijack'] + return unless @socket_object.env["rack.hijack"] - @socket_object.env['rack.hijack'].call - @rack_hijack_io = @socket_object.env['rack.hijack_io'] + # This should return the underlying io according to the SPEC: + @rack_hijack_io = @socket_object.env["rack.hijack"].call + # Retain existing behaviour if required: + @rack_hijack_io ||= @socket_object.env["rack.hijack_io"] @event_loop.attach(@rack_hijack_io, self) end |