diff options
Diffstat (limited to 'actioncable/lib/action_cable/server')
-rw-r--r-- | actioncable/lib/action_cable/server/base.rb | 12 | ||||
-rw-r--r-- | actioncable/lib/action_cable/server/broadcasting.rb | 19 | ||||
-rw-r--r-- | actioncable/lib/action_cable/server/configuration.rb | 2 | ||||
-rw-r--r-- | actioncable/lib/action_cable/server/worker.rb | 30 |
4 files changed, 32 insertions, 31 deletions
diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb index 778f5ffeed..b1a0e11631 100644 --- a/actioncable/lib/action_cable/server/base.rb +++ b/actioncable/lib/action_cable/server/base.rb @@ -52,7 +52,17 @@ module ActionCable @event_loop || @mutex.synchronize { @event_loop ||= config.event_loop_class.new } end - # The thread worker pool for handling all the connection work on this server. Default size is set by config.worker_pool_size. + # The worker pool is where we run connection callbacks and channel actions. We do as little as possible on the server's main thread. + # The worker pool is an executor service that's backed by a pool of threads working from a task queue. The thread pool size maxes out + # at 4 worker threads by default. Tune the size yourself with config.action_cable.worker_pool_size. + # + # Using Active Record, Redis, etc within your channel actions means you'll get a separate connection from each thread in the worker pool. + # Plan your deployment accordingly: 5 servers each running 5 Puma workers each running an 8-thread worker pool means at least 200 database + # connections. + # + # Also, ensure that your database connection pool size is as least as large as your worker pool size. Otherwise, workers may oversubscribe + # the db connection pool and block while they wait for other workers to release their connections. Use a smaller worker pool or a larger + # db connection pool instead. def worker_pool @worker_pool || @mutex.synchronize { @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size) } end diff --git a/actioncable/lib/action_cable/server/broadcasting.rb b/actioncable/lib/action_cable/server/broadcasting.rb index 98025f27f2..8f93564113 100644 --- a/actioncable/lib/action_cable/server/broadcasting.rb +++ b/actioncable/lib/action_cable/server/broadcasting.rb @@ -19,27 +19,28 @@ module ActionCable # new Notification data['title'], body: data['body'] module Broadcasting # Broadcast a hash directly to a named <tt>broadcasting</tt>. This will later be JSON encoded. - def broadcast(broadcasting, message) - broadcaster_for(broadcasting).broadcast(message) + def broadcast(broadcasting, message, coder: ActiveSupport::JSON) + broadcaster_for(broadcasting, coder: coder).broadcast(message) end # Returns a broadcaster for a named <tt>broadcasting</tt> that can be reused. Useful when you have an object that # may need multiple spots to transmit to a specific broadcasting over and over. - def broadcaster_for(broadcasting) - Broadcaster.new(self, String(broadcasting)) + def broadcaster_for(broadcasting, coder: ActiveSupport::JSON) + Broadcaster.new(self, String(broadcasting), coder: coder) end private class Broadcaster - attr_reader :server, :broadcasting + attr_reader :server, :broadcasting, :coder - def initialize(server, broadcasting) - @server, @broadcasting = server, broadcasting + def initialize(server, broadcasting, coder:) + @server, @broadcasting, @coder = server, broadcasting, coder end def broadcast(message) - server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message}" - server.pubsub.broadcast broadcasting, ActiveSupport::JSON.encode(message) + server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message.inspect}" + encoded = coder ? coder.encode(message) : message + server.pubsub.broadcast broadcasting, encoded end end end diff --git a/actioncable/lib/action_cable/server/configuration.rb b/actioncable/lib/action_cable/server/configuration.rb index 5fe71caed2..0bb378cf03 100644 --- a/actioncable/lib/action_cable/server/configuration.rb +++ b/actioncable/lib/action_cable/server/configuration.rb @@ -14,7 +14,7 @@ module ActionCable @log_tags = [] @connection_class = ActionCable::Connection::Base - @worker_pool_size = 100 + @worker_pool_size = 4 @disable_request_forgery_protection = false end diff --git a/actioncable/lib/action_cable/server/worker.rb b/actioncable/lib/action_cable/server/worker.rb index 49cbaec0c0..a638ff72e7 100644 --- a/actioncable/lib/action_cable/server/worker.rb +++ b/actioncable/lib/action_cable/server/worker.rb @@ -12,8 +12,10 @@ module ActionCable define_callbacks :work include ActiveRecordConnectionManagement + attr_reader :executor + def initialize(max_size: 5) - @pool = Concurrent::ThreadPoolExecutor.new( + @executor = Concurrent::ThreadPoolExecutor.new( min_threads: 1, max_threads: max_size, max_queue: 0, @@ -23,11 +25,11 @@ module ActionCable # Stop processing work: any work that has not already started # running will be discarded from the queue def halt - @pool.kill + @executor.kill end def stopping? - @pool.shuttingdown? + @executor.shuttingdown? end def work(connection) @@ -40,14 +42,14 @@ module ActionCable self.connection = nil end - def async_invoke(receiver, method, *args) - @pool.post do - invoke(receiver, method, *args) + def async_invoke(receiver, method, *args, connection: receiver) + @executor.post do + invoke(receiver, method, *args, connection: connection) end end - def invoke(receiver, method, *args) - work(receiver) do + def invoke(receiver, method, *args, connection:) + work(connection) do begin receiver.send method, *args rescue Exception => e @@ -59,18 +61,6 @@ module ActionCable end end - def async_run_periodic_timer(channel, callback) - @pool.post do - run_periodic_timer(channel, callback) - end - end - - def run_periodic_timer(channel, callback) - work(channel.connection) do - callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback) - end - end - private def logger |