aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable/server/base.rb
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable/lib/action_cable/server/base.rb')
-rw-r--r--actioncable/lib/action_cable/server/base.rb61
1 files changed, 37 insertions, 24 deletions
diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb
index fe48c112df..419eccd73c 100644
--- a/actioncable/lib/action_cable/server/base.rb
+++ b/actioncable/lib/action_cable/server/base.rb
@@ -1,11 +1,11 @@
-require 'thread'
+require "monitor"
module ActionCable
module Server
- # A singleton ActionCable::Server instance is available via ActionCable.server. It's used by the rack process that starts the cable server, but
- # also by the user to reach the RemoteConnections instead for finding and disconnecting connections across all servers.
+ # A singleton ActionCable::Server instance is available via ActionCable.server. It's used by the Rack process that starts the Action Cable server, but
+ # is also used by the user to reach the RemoteConnections object, which is used for finding and disconnecting connections across all servers.
#
- # Also, this is the server instance used for broadcasting. See Broadcasting for details.
+ # Also, this is the server instance used for broadcasting. See Broadcasting for more information.
class Base
include ActionCable::Server::Broadcasting
include ActionCable::Server::Connections
@@ -18,15 +18,14 @@ module ActionCable
attr_reader :mutex
def initialize
- @mutex = Mutex.new
-
- @remote_connections = @stream_event_loop = @worker_pool = @channel_classes = @pubsub = nil
+ @mutex = Monitor.new
+ @remote_connections = @event_loop = @worker_pool = @pubsub = nil
end
- # Called by rack to setup the server.
+ # Called by Rack to setup the server.
def call(env)
setup_heartbeat_timer
- config.connection_class.new(self, env).process
+ config.connection_class.call.new(self, env).process
end
# Disconnect all the connections identified by `identifiers` on this server or any others via RemoteConnections.
@@ -34,38 +33,52 @@ module ActionCable
remote_connections.where(identifiers).disconnect
end
+ def restart
+ connections.each(&:close)
+
+ @mutex.synchronize do
+ # Shutdown the worker pool
+ @worker_pool.halt if @worker_pool
+ @worker_pool = nil
+
+ # Shutdown the pub/sub adapter
+ @pubsub.shutdown if @pubsub
+ @pubsub = nil
+ end
+ end
+
# Gateway to RemoteConnections. See that class for details.
def remote_connections
@remote_connections || @mutex.synchronize { @remote_connections ||= RemoteConnections.new(self) }
end
- def stream_event_loop
- @stream_event_loop || @mutex.synchronize { @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new }
+ def event_loop
+ @event_loop || @mutex.synchronize { @event_loop ||= ActionCable::Connection::StreamEventLoop.new }
end
- # The thread worker pool for handling all the connection work on this server. Default size is set by config.worker_pool_size.
+ # The worker pool is where we run connection callbacks and channel actions. We do as little as possible on the server's main thread.
+ # The worker pool is an executor service that's backed by a pool of threads working from a task queue. The thread pool size maxes out
+ # at 4 worker threads by default. Tune the size yourself with <tt>config.action_cable.worker_pool_size</tt>.
+ #
+ # Using Active Record, Redis, etc within your channel actions means you'll get a separate connection from each thread in the worker pool.
+ # Plan your deployment accordingly: 5 servers each running 5 Puma workers each running an 8-thread worker pool means at least 200 database
+ # connections.
+ #
+ # Also, ensure that your database connection pool size is as least as large as your worker pool size. Otherwise, workers may oversubscribe
+ # the database connection pool and block while they wait for other workers to release their connections. Use a smaller worker pool or a larger
+ # database connection pool instead.
def worker_pool
@worker_pool || @mutex.synchronize { @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size) }
end
- # Requires and returns a hash of all the channel class constants keyed by name.
- def channel_classes
- @channel_classes || @mutex.synchronize do
- @channel_classes ||= begin
- config.channel_paths.each { |channel_path| require channel_path }
- config.channel_class_names.each_with_object({}) { |name, hash| hash[name] = name.constantize }
- end
- end
- end
-
# Adapter used for all streams/broadcasting.
def pubsub
@pubsub || @mutex.synchronize { @pubsub ||= config.pubsub_adapter.new(self) }
end
- # All the identifiers applied to the connection class associated with this server.
+ # All of the identifiers applied to the connection class associated with this server.
def connection_identifiers
- config.connection_class.identifiers
+ config.connection_class.call.identifiers
end
end