aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable/server
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable/lib/action_cable/server')
-rw-r--r--actioncable/lib/action_cable/server/base.rb40
-rw-r--r--actioncable/lib/action_cable/server/broadcasting.rb12
-rw-r--r--actioncable/lib/action_cable/server/configuration.rb56
-rw-r--r--actioncable/lib/action_cable/server/connections.rb2
-rw-r--r--actioncable/lib/action_cable/server/worker.rb22
-rw-r--r--actioncable/lib/action_cable/server/worker/active_record_connection_management.rb2
6 files changed, 69 insertions, 65 deletions
diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb
index b1a0e11631..1ee03f6dfc 100644
--- a/actioncable/lib/action_cable/server/base.rb
+++ b/actioncable/lib/action_cable/server/base.rb
@@ -1,4 +1,6 @@
-require 'monitor'
+# frozen_string_literal: true
+
+require "monitor"
module ActionCable
module Server
@@ -10,7 +12,7 @@ module ActionCable
include ActionCable::Server::Broadcasting
include ActionCable::Server::Connections
- cattr_accessor(:config, instance_accessor: true) { ActionCable::Server::Configuration.new }
+ cattr_accessor :config, instance_accessor: true, default: ActionCable::Server::Configuration.new
def self.logger; config.logger; end
delegate :logger, to: :config
@@ -19,16 +21,16 @@ module ActionCable
def initialize
@mutex = Monitor.new
- @remote_connections = @event_loop = @worker_pool = @channel_classes = @pubsub = nil
+ @remote_connections = @event_loop = @worker_pool = @pubsub = nil
end
# Called by Rack to setup the server.
def call(env)
setup_heartbeat_timer
- config.connection_class.new(self, env).process
+ config.connection_class.call.new(self, env).process
end
- # Disconnect all the connections identified by `identifiers` on this server or any others via RemoteConnections.
+ # Disconnect all the connections identified by +identifiers+ on this server or any others via RemoteConnections.
def disconnect(identifiers)
remote_connections.where(identifiers).disconnect
end
@@ -37,9 +39,13 @@ module ActionCable
connections.each(&:close)
@mutex.synchronize do
- worker_pool.halt if @worker_pool
-
+ # Shutdown the worker pool
+ @worker_pool.halt if @worker_pool
@worker_pool = nil
+
+ # Shutdown the pub/sub adapter
+ @pubsub.shutdown if @pubsub
+ @pubsub = nil
end
end
@@ -49,34 +55,24 @@ module ActionCable
end
def event_loop
- @event_loop || @mutex.synchronize { @event_loop ||= config.event_loop_class.new }
+ @event_loop || @mutex.synchronize { @event_loop ||= ActionCable::Connection::StreamEventLoop.new }
end
# The worker pool is where we run connection callbacks and channel actions. We do as little as possible on the server's main thread.
# The worker pool is an executor service that's backed by a pool of threads working from a task queue. The thread pool size maxes out
- # at 4 worker threads by default. Tune the size yourself with config.action_cable.worker_pool_size.
+ # at 4 worker threads by default. Tune the size yourself with <tt>config.action_cable.worker_pool_size</tt>.
#
# Using Active Record, Redis, etc within your channel actions means you'll get a separate connection from each thread in the worker pool.
# Plan your deployment accordingly: 5 servers each running 5 Puma workers each running an 8-thread worker pool means at least 200 database
# connections.
#
# Also, ensure that your database connection pool size is as least as large as your worker pool size. Otherwise, workers may oversubscribe
- # the db connection pool and block while they wait for other workers to release their connections. Use a smaller worker pool or a larger
- # db connection pool instead.
+ # the database connection pool and block while they wait for other workers to release their connections. Use a smaller worker pool or a larger
+ # database connection pool instead.
def worker_pool
@worker_pool || @mutex.synchronize { @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size) }
end
- # Requires and returns a hash of all of the channel class constants, which are keyed by name.
- def channel_classes
- @channel_classes || @mutex.synchronize do
- @channel_classes ||= begin
- config.channel_paths.each { |channel_path| require channel_path }
- config.channel_class_names.each_with_object({}) { |name, hash| hash[name] = name.constantize }
- end
- end
- end
-
# Adapter used for all streams/broadcasting.
def pubsub
@pubsub || @mutex.synchronize { @pubsub ||= config.pubsub_adapter.new(self) }
@@ -84,7 +80,7 @@ module ActionCable
# All of the identifiers applied to the connection class associated with this server.
def connection_identifiers
- config.connection_class.identifiers
+ config.connection_class.call.identifiers
end
end
diff --git a/actioncable/lib/action_cable/server/broadcasting.rb b/actioncable/lib/action_cable/server/broadcasting.rb
index 8f93564113..bc54d784b3 100644
--- a/actioncable/lib/action_cable/server/broadcasting.rb
+++ b/actioncable/lib/action_cable/server/broadcasting.rb
@@ -1,3 +1,5 @@
+# frozen_string_literal: true
+
module ActionCable
module Server
# Broadcasting is how other parts of your application can send messages to a channel's subscribers. As explained in Channel, most of the time, these
@@ -38,9 +40,13 @@ module ActionCable
end
def broadcast(message)
- server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message.inspect}"
- encoded = coder ? coder.encode(message) : message
- server.pubsub.broadcast broadcasting, encoded
+ server.logger.debug "[ActionCable] Broadcasting to #{broadcasting}: #{message.inspect}"
+
+ payload = { broadcasting: broadcasting, message: message, coder: coder }
+ ActiveSupport::Notifications.instrument("broadcast.action_cable", payload) do
+ encoded = coder ? coder.encode(message) : message
+ server.pubsub.broadcast broadcasting, encoded
+ end
end
end
end
diff --git a/actioncable/lib/action_cable/server/configuration.rb b/actioncable/lib/action_cable/server/configuration.rb
index 0bb378cf03..26209537df 100644
--- a/actioncable/lib/action_cable/server/configuration.rb
+++ b/actioncable/lib/action_cable/server/configuration.rb
@@ -1,64 +1,56 @@
+# frozen_string_literal: true
+
module ActionCable
module Server
# An instance of this configuration object is available via ActionCable.server.config, which allows you to tweak Action Cable configuration
# in a Rails config initializer.
class Configuration
attr_accessor :logger, :log_tags
- attr_accessor :use_faye, :connection_class, :worker_pool_size
- attr_accessor :disable_request_forgery_protection, :allowed_request_origins
+ attr_accessor :connection_class, :worker_pool_size
+ attr_accessor :disable_request_forgery_protection, :allowed_request_origins, :allow_same_origin_as_host
attr_accessor :cable, :url, :mount_path
- attr_accessor :channel_paths # :nodoc:
-
def initialize
@log_tags = []
- @connection_class = ActionCable::Connection::Base
+ @connection_class = -> { ActionCable::Connection::Base }
@worker_pool_size = 4
@disable_request_forgery_protection = false
- end
-
- def channel_class_names
- @channel_class_names ||= channel_paths.collect do |channel_path|
- Pathname.new(channel_path).basename.to_s.split('.').first.camelize
- end
+ @allow_same_origin_as_host = true
end
# Returns constant of subscription adapter specified in config/cable.yml.
# If the adapter cannot be found, this will default to the Redis adapter.
# Also makes sure proper dependencies are required.
def pubsub_adapter
- adapter = (cable.fetch('adapter') { 'redis' })
+ adapter = (cable.fetch("adapter") { "redis" })
+
+ # Require the adapter itself and give useful feedback about
+ # 1. Missing adapter gems and
+ # 2. Adapter gems' missing dependencies.
path_to_adapter = "action_cable/subscription_adapter/#{adapter}"
begin
require path_to_adapter
- rescue Gem::LoadError => e
- raise Gem::LoadError, "Specified '#{adapter}' for Action Cable pubsub adapter, but the gem is not loaded. Add `gem '#{e.name}'` to your Gemfile (and ensure its version is at the minimum required by Action Cable)."
rescue LoadError => e
- raise LoadError, "Could not load '#{path_to_adapter}'. Make sure that the adapter in config/cable.yml is valid. If you use an adapter other than 'postgresql' or 'redis' add the necessary adapter gem to the Gemfile.", e.backtrace
+ # We couldn't require the adapter itself. Raise an exception that
+ # points out config typos and missing gems.
+ if e.path == path_to_adapter
+ # We can assume that a non-builtin adapter was specified, so it's
+ # either misspelled or missing from Gemfile.
+ raise e.class, "Could not load the '#{adapter}' Action Cable pubsub adapter. Ensure that the adapter is spelled correctly in config/cable.yml and that you've added the necessary adapter gem to your Gemfile.", e.backtrace
+
+ # Bubbled up from the adapter require. Prefix the exception message
+ # with some guidance about how to address it and reraise.
+ else
+ raise e.class, "Error loading the '#{adapter}' Action Cable pubsub adapter. Missing a gem it depends on? #{e.message}", e.backtrace
+ end
end
adapter = adapter.camelize
- adapter = 'PostgreSQL' if adapter == 'Postgresql'
+ adapter = "PostgreSQL" if adapter == "Postgresql"
"ActionCable::SubscriptionAdapter::#{adapter}".constantize
end
-
- def event_loop_class
- if use_faye
- ActionCable::Connection::FayeEventLoop
- else
- ActionCable::Connection::StreamEventLoop
- end
- end
-
- def client_socket_class
- if use_faye
- ActionCable::Connection::FayeClientSocket
- else
- ActionCable::Connection::ClientSocket
- end
- end
end
end
end
diff --git a/actioncable/lib/action_cable/server/connections.rb b/actioncable/lib/action_cable/server/connections.rb
index 5e61b4e335..39557d63a7 100644
--- a/actioncable/lib/action_cable/server/connections.rb
+++ b/actioncable/lib/action_cable/server/connections.rb
@@ -1,3 +1,5 @@
+# frozen_string_literal: true
+
module ActionCable
module Server
# Collection class for all the connections that have been established on this specific server. Remember, usually you'll run many Action Cable servers, so
diff --git a/actioncable/lib/action_cable/server/worker.rb b/actioncable/lib/action_cable/server/worker.rb
index a638ff72e7..c69cc4ac31 100644
--- a/actioncable/lib/action_cable/server/worker.rb
+++ b/actioncable/lib/action_cable/server/worker.rb
@@ -1,6 +1,8 @@
-require 'active_support/callbacks'
-require 'active_support/core_ext/module/attribute_accessors_per_thread'
-require 'concurrent'
+# frozen_string_literal: true
+
+require "active_support/callbacks"
+require "active_support/core_ext/module/attribute_accessors_per_thread"
+require "concurrent"
module ActionCable
module Server
@@ -25,7 +27,7 @@ module ActionCable
# Stop processing work: any work that has not already started
# running will be discarded from the queue
def halt
- @executor.kill
+ @executor.shutdown
end
def stopping?
@@ -42,16 +44,20 @@ module ActionCable
self.connection = nil
end
- def async_invoke(receiver, method, *args, connection: receiver)
+ def async_exec(receiver, *args, connection:, &block)
+ async_invoke receiver, :instance_exec, *args, connection: connection, &block
+ end
+
+ def async_invoke(receiver, method, *args, connection: receiver, &block)
@executor.post do
- invoke(receiver, method, *args, connection: connection)
+ invoke(receiver, method, *args, connection: connection, &block)
end
end
- def invoke(receiver, method, *args, connection:)
+ def invoke(receiver, method, *args, connection:, &block)
work(connection) do
begin
- receiver.send method, *args
+ receiver.send method, *args, &block
rescue Exception => e
logger.error "There was an exception - #{e.class}(#{e.message})"
logger.error e.backtrace.join("\n")
diff --git a/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb b/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb
index c1e4aa8103..2e378d4bf3 100644
--- a/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb
+++ b/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb
@@ -1,3 +1,5 @@
+# frozen_string_literal: true
+
module ActionCable
module Server
class Worker