aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable')
-rw-r--r--actioncable/CHANGELOG.md13
-rw-r--r--actioncable/lib/action_cable/channel/streams.rb8
-rw-r--r--actioncable/lib/action_cable/connection/base.rb2
-rw-r--r--actioncable/lib/action_cable/connection/stream.rb57
-rw-r--r--actioncable/lib/action_cable/connection/stream_event_loop.rb46
-rw-r--r--actioncable/lib/action_cable/server/base.rb2
-rw-r--r--actioncable/test/stubs/test_server.rb4
7 files changed, 114 insertions, 18 deletions
diff --git a/actioncable/CHANGELOG.md b/actioncable/CHANGELOG.md
index dec6f7c027..137c88d91b 100644
--- a/actioncable/CHANGELOG.md
+++ b/actioncable/CHANGELOG.md
@@ -1,3 +1,16 @@
+* Prevent race where the client could receive and act upon a
+ subscription confirmation before the channel's `subscribed` method
+ completed.
+
+ Fixes #25381.
+
+ *Vladimir Dementyev*
+
+* Buffer writes to websocket connections, to avoid blocking threads
+ that could be doing more useful things.
+
+ *Matthew Draper*, *Tinco Andringa*
+
* Protect against concurrent writes to a websocket connection from
multiple threads; the underlying OS write is not always threadsafe.
diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb
index e480b93df0..dbba333353 100644
--- a/actioncable/lib/action_cable/channel/streams.rb
+++ b/actioncable/lib/action_cable/channel/streams.rb
@@ -69,8 +69,8 @@ module ActionCable
# Start streaming from the named <tt>broadcasting</tt> pubsub queue. Optionally, you can pass a <tt>callback</tt> that'll be used
# instead of the default of just transmitting the updates straight to the subscriber.
- # Pass `coder: ActiveSupport::JSON` to decode messages as JSON before passing to the callback.
- # Defaults to `coder: nil` which does no decoding, passes raw messages.
+ # Pass <tt>coder: ActiveSupport::JSON</tt> to decode messages as JSON before passing to the callback.
+ # Defaults to <tt>coder: nil</tt> which does no decoding, passes raw messages.
def stream_from(broadcasting, callback = nil, coder: nil, &block)
broadcasting = String(broadcasting)
@@ -94,8 +94,8 @@ module ActionCable
# <tt>callback</tt> that'll be used instead of the default of just transmitting the updates straight
# to the subscriber.
#
- # Pass `coder: ActiveSupport::JSON` to decode messages as JSON before passing to the callback.
- # Defaults to `coder: nil` which does no decoding, passes raw messages.
+ # Pass <tt>coder: ActiveSupport::JSON</tt> to decode messages as JSON before passing to the callback.
+ # Defaults to <tt>coder: nil</tt> which does no decoding, passes raw messages.
def stream_for(model, callback = nil, coder: nil, &block)
stream_from(broadcasting_for([ channel_name, model ]), callback || block, coder: coder)
end
diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb
index b0615b08a1..4c7fcc1434 100644
--- a/actioncable/lib/action_cable/connection/base.rb
+++ b/actioncable/lib/action_cable/connection/base.rb
@@ -105,7 +105,7 @@ module ActionCable
worker_pool.async_invoke(self, method, *arguments)
end
- # Return a basic hash of statistics for the connection keyed with `identifier`, `started_at`, and `subscriptions`.
+ # Return a basic hash of statistics for the connection keyed with <tt>identifier</tt>, <tt>started_at</tt>, <tt>subscriptions</tt>, and <tt>request_id</tt>.
# This can be returned by a health check against the connection.
def statistics
{
diff --git a/actioncable/lib/action_cable/connection/stream.rb b/actioncable/lib/action_cable/connection/stream.rb
index 5a2aace0ba..d66e1b4e41 100644
--- a/actioncable/lib/action_cable/connection/stream.rb
+++ b/actioncable/lib/action_cable/connection/stream.rb
@@ -14,6 +14,9 @@ module ActionCable
@rack_hijack_io = nil
@write_lock = Mutex.new
+
+ @write_head = nil
+ @write_buffer = Queue.new
end
def each(&callback)
@@ -30,14 +33,62 @@ module ActionCable
end
def write(data)
- @write_lock.synchronize do
- return @rack_hijack_io.write(data) if @rack_hijack_io
- return @stream_send.call(data) if @stream_send
+ if @stream_send
+ return @stream_send.call(data)
end
+
+ if @write_lock.try_lock
+ begin
+ if @write_head.nil? && @write_buffer.empty?
+ written = @rack_hijack_io.write_nonblock(data, exception: false)
+
+ case written
+ when :wait_writable
+ # proceed below
+ when data.bytesize
+ return data.bytesize
+ else
+ @write_head = data.byteslice(written, data.bytesize)
+ @event_loop.writes_pending @rack_hijack_io
+
+ return data.bytesize
+ end
+ end
+ ensure
+ @write_lock.unlock
+ end
+ end
+
+ @write_buffer << data
+ @event_loop.writes_pending @rack_hijack_io
+
+ data.bytesize
rescue EOFError, Errno::ECONNRESET
@socket_object.client_gone
end
+ def flush_write_buffer
+ @write_lock.synchronize do
+ loop do
+ if @write_head.nil?
+ return true if @write_buffer.empty?
+ @write_head = @write_buffer.pop
+ end
+
+ written = @rack_hijack_io.write_nonblock(@write_head, exception: false)
+ case written
+ when :wait_writable
+ return false
+ when @write_head.bytesize
+ @write_head = nil
+ else
+ @write_head = @write_head.byteslice(written, @write_head.bytesize)
+ return false
+ end
+ end
+ end
+ end
+
def receive(data)
@socket_object.parse(data)
end
diff --git a/actioncable/lib/action_cable/connection/stream_event_loop.rb b/actioncable/lib/action_cable/connection/stream_event_loop.rb
index 106b948c45..eec24638b6 100644
--- a/actioncable/lib/action_cable/connection/stream_event_loop.rb
+++ b/actioncable/lib/action_cable/connection/stream_event_loop.rb
@@ -5,7 +5,7 @@ module ActionCable
module Connection
class StreamEventLoop
def initialize
- @nio = @thread = nil
+ @nio = @executor = @thread = nil
@map = {}
@stopping = false
@todo = Queue.new
@@ -20,13 +20,14 @@ module ActionCable
def post(task = nil, &block)
task ||= block
- Concurrent.global_io_executor << task
+ spawn
+ @executor << task
end
def attach(io, stream)
@todo << lambda do
- @map[io] = stream
- @nio.register(io, :r)
+ @map[io] = @nio.register(io, :r)
+ @map[io].value = stream
end
wakeup
end
@@ -39,6 +40,15 @@ module ActionCable
wakeup
end
+ def writes_pending(io)
+ @todo << lambda do
+ if monitor = @map[io]
+ monitor.interests = :rw
+ end
+ end
+ wakeup
+ end
+
def stop
@stopping = true
wakeup if @nio
@@ -52,6 +62,13 @@ module ActionCable
return if @thread && @thread.status
@nio ||= NIO::Selector.new
+
+ @executor ||= Concurrent::ThreadPoolExecutor.new(
+ min_threads: 1,
+ max_threads: 10,
+ max_queue: 0,
+ )
+
@thread = Thread.new { run }
return true
@@ -77,12 +94,25 @@ module ActionCable
monitors.each do |monitor|
io = monitor.io
- stream = @map[io]
+ stream = monitor.value
begin
- stream.receive io.read_nonblock(4096)
- rescue IO::WaitReadable
- next
+ if monitor.writable?
+ if stream.flush_write_buffer
+ monitor.interests = :r
+ end
+ next unless monitor.readable?
+ end
+
+ incoming = io.read_nonblock(4096, exception: false)
+ case incoming
+ when :wait_readable
+ next
+ when nil
+ stream.close
+ else
+ stream.receive incoming
+ end
rescue
# We expect one of EOFError or Errno::ECONNRESET in
# normal operation (when the client goes away). But if
diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb
index c700297a8d..dd059a553b 100644
--- a/actioncable/lib/action_cable/server/base.rb
+++ b/actioncable/lib/action_cable/server/base.rb
@@ -54,7 +54,7 @@ module ActionCable
# The worker pool is where we run connection callbacks and channel actions. We do as little as possible on the server's main thread.
# The worker pool is an executor service that's backed by a pool of threads working from a task queue. The thread pool size maxes out
- # at 4 worker threads by default. Tune the size yourself with `config.action_cable.worker_pool_size`.
+ # at 4 worker threads by default. Tune the size yourself with <tt>config.action_cable.worker_pool_size</tt>.
#
# Using Active Record, Redis, etc within your channel actions means you'll get a separate connection from each thread in the worker pool.
# Plan your deployment accordingly: 5 servers each running 5 Puma workers each running an 8-thread worker pool means at least 200 database
diff --git a/actioncable/test/stubs/test_server.rb b/actioncable/test/stubs/test_server.rb
index b64ff33789..02be72d0cb 100644
--- a/actioncable/test/stubs/test_server.rb
+++ b/actioncable/test/stubs/test_server.rb
@@ -28,7 +28,9 @@ class TestServer
@event_loop ||= if @config.use_faye
ActionCable::Connection::FayeEventLoop.new
else
- ActionCable::Connection::StreamEventLoop.new
+ ActionCable::Connection::StreamEventLoop.new.tap do |loop|
+ loop.instance_variable_set(:@executor, Concurrent.global_io_executor)
+ end
end
end