diff options
Diffstat (limited to 'actioncable/lib/action_cable/server')
6 files changed, 295 insertions, 0 deletions
diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb new file mode 100644 index 0000000000..c3b64299e3 --- /dev/null +++ b/actioncable/lib/action_cable/server/base.rb @@ -0,0 +1,73 @@ +require 'thread' + +module ActionCable + module Server + # A singleton ActionCable::Server instance is available via ActionCable.server. It's used by the Rack process that starts the Action Cable server, but + # is also used by the user to reach the RemoteConnections object, which is used for finding and disconnecting connections across all servers. + # + # Also, this is the server instance used for broadcasting. See Broadcasting for more information. + class Base + include ActionCable::Server::Broadcasting + include ActionCable::Server::Connections + + cattr_accessor(:config, instance_accessor: true) { ActionCable::Server::Configuration.new } + + def self.logger; config.logger; end + delegate :logger, to: :config + + attr_reader :mutex + + def initialize + @mutex = Mutex.new + @remote_connections = @stream_event_loop = @worker_pool = @channel_classes = @pubsub = nil + end + + # Called by Rack to setup the server. + def call(env) + setup_heartbeat_timer + config.connection_class.new(self, env).process + end + + # Disconnect all the connections identified by `identifiers` on this server or any others via RemoteConnections. + def disconnect(identifiers) + remote_connections.where(identifiers).disconnect + end + + # Gateway to RemoteConnections. See that class for details. + def remote_connections + @remote_connections || @mutex.synchronize { @remote_connections ||= RemoteConnections.new(self) } + end + + def stream_event_loop + @stream_event_loop || @mutex.synchronize { @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new } + end + + # The thread worker pool for handling all the connection work on this server. Default size is set by config.worker_pool_size. + def worker_pool + @worker_pool || @mutex.synchronize { @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size) } + end + + # Requires and returns a hash of all of the channel class constants, which are keyed by name. + def channel_classes + @channel_classes || @mutex.synchronize do + @channel_classes ||= begin + config.channel_paths.each { |channel_path| require channel_path } + config.channel_class_names.each_with_object({}) { |name, hash| hash[name] = name.constantize } + end + end + end + + # Adapter used for all streams/broadcasting. + def pubsub + @pubsub || @mutex.synchronize { @pubsub ||= config.pubsub_adapter.new(self) } + end + + # All of the identifiers applied to the connection class associated with this server. + def connection_identifiers + config.connection_class.identifiers + end + end + + ActiveSupport.run_load_hooks(:action_cable, Base.config) + end +end diff --git a/actioncable/lib/action_cable/server/broadcasting.rb b/actioncable/lib/action_cable/server/broadcasting.rb new file mode 100644 index 0000000000..98025f27f2 --- /dev/null +++ b/actioncable/lib/action_cable/server/broadcasting.rb @@ -0,0 +1,47 @@ +module ActionCable + module Server + # Broadcasting is how other parts of your application can send messages to a channel's subscribers. As explained in Channel, most of the time, these + # broadcastings are streamed directly to the clients subscribed to the named broadcasting. Let's explain with a full-stack example: + # + # class WebNotificationsChannel < ApplicationCable::Channel + # def subscribed + # stream_from "web_notifications_#{current_user.id}" + # end + # end + # + # # Somewhere in your app this is called, perhaps from a NewCommentJob: + # ActionCable.server.broadcast \ + # "web_notifications_1", { title: "New things!", body: "All that's fit for print" } + # + # # Client-side CoffeeScript, which assumes you've already requested the right to send web notifications: + # App.cable.subscriptions.create "WebNotificationsChannel", + # received: (data) -> + # new Notification data['title'], body: data['body'] + module Broadcasting + # Broadcast a hash directly to a named <tt>broadcasting</tt>. This will later be JSON encoded. + def broadcast(broadcasting, message) + broadcaster_for(broadcasting).broadcast(message) + end + + # Returns a broadcaster for a named <tt>broadcasting</tt> that can be reused. Useful when you have an object that + # may need multiple spots to transmit to a specific broadcasting over and over. + def broadcaster_for(broadcasting) + Broadcaster.new(self, String(broadcasting)) + end + + private + class Broadcaster + attr_reader :server, :broadcasting + + def initialize(server, broadcasting) + @server, @broadcasting = server, broadcasting + end + + def broadcast(message) + server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message}" + server.pubsub.broadcast broadcasting, ActiveSupport::JSON.encode(message) + end + end + end + end +end diff --git a/actioncable/lib/action_cable/server/configuration.rb b/actioncable/lib/action_cable/server/configuration.rb new file mode 100644 index 0000000000..9a7301287c --- /dev/null +++ b/actioncable/lib/action_cable/server/configuration.rb @@ -0,0 +1,48 @@ +module ActionCable + module Server + # An instance of this configuration object is available via ActionCable.server.config, which allows you to tweak Action Cable configuration + # in a Rails config initializer. + class Configuration + attr_accessor :logger, :log_tags + attr_accessor :connection_class, :worker_pool_size + attr_accessor :disable_request_forgery_protection, :allowed_request_origins + attr_accessor :cable, :url, :mount_path + + attr_accessor :channel_paths # :nodoc: + + def initialize + @log_tags = [] + + @connection_class = ActionCable::Connection::Base + @worker_pool_size = 100 + + @disable_request_forgery_protection = false + end + + def channel_class_names + @channel_class_names ||= channel_paths.collect do |channel_path| + Pathname.new(channel_path).basename.to_s.split('.').first.camelize + end + end + + # Returns constant of subscription adapter specified in config/cable.yml. + # If the adapter cannot be found, this will default to the Redis adapter. + # Also makes sure proper dependencies are required. + def pubsub_adapter + adapter = (cable.fetch('adapter') { 'redis' }) + path_to_adapter = "action_cable/subscription_adapter/#{adapter}" + begin + require path_to_adapter + rescue Gem::LoadError => e + raise Gem::LoadError, "Specified '#{adapter}' for Action Cable pubsub adapter, but the gem is not loaded. Add `gem '#{e.name}'` to your Gemfile (and ensure its version is at the minimum required by Action Cable)." + rescue LoadError => e + raise LoadError, "Could not load '#{path_to_adapter}'. Make sure that the adapter in config/cable.yml is valid. If you use an adapter other than 'postgresql' or 'redis' add the necessary adapter gem to the Gemfile.", e.backtrace + end + + adapter = adapter.camelize + adapter = 'PostgreSQL' if adapter == 'Postgresql' + "ActionCable::SubscriptionAdapter::#{adapter}".constantize + end + end + end +end diff --git a/actioncable/lib/action_cable/server/connections.rb b/actioncable/lib/action_cable/server/connections.rb new file mode 100644 index 0000000000..4dc8934b25 --- /dev/null +++ b/actioncable/lib/action_cable/server/connections.rb @@ -0,0 +1,34 @@ +module ActionCable + module Server + # Collection class for all the connections that have been established on this specific server. Remember, usually you'll run many Action Cable servers, so + # you can't use this collection as a full list of all of the connections established against your application. Instead, use RemoteConnections for that. + module Connections # :nodoc: + BEAT_INTERVAL = 3 + + def connections + @connections ||= [] + end + + def add_connection(connection) + connections << connection + end + + def remove_connection(connection) + connections.delete connection + end + + # WebSocket connection implementations differ on when they'll mark a connection as stale. We basically never want a connection to go stale, as you + # then can't rely on being able to communicate with the connection. To solve this, a 3 second heartbeat runs on all connections. If the beat fails, we automatically + # disconnect. + def setup_heartbeat_timer + @heartbeat_timer ||= Concurrent::TimerTask.new(execution_interval: BEAT_INTERVAL) do + Concurrent.global_io_executor.post { connections.map(&:beat) } + end.tap(&:execute) + end + + def open_connections_statistics + connections.map(&:statistics) + end + end + end +end diff --git a/actioncable/lib/action_cable/server/worker.rb b/actioncable/lib/action_cable/server/worker.rb new file mode 100644 index 0000000000..b920b880db --- /dev/null +++ b/actioncable/lib/action_cable/server/worker.rb @@ -0,0 +1,71 @@ +require 'active_support/callbacks' +require 'active_support/core_ext/module/attribute_accessors_per_thread' +require 'concurrent' + +module ActionCable + module Server + # Worker used by Server.send_async to do connection work in threads. + class Worker # :nodoc: + include ActiveSupport::Callbacks + + thread_mattr_accessor :connection + define_callbacks :work + include ActiveRecordConnectionManagement + + def initialize(max_size: 5) + @pool = Concurrent::ThreadPoolExecutor.new( + min_threads: 1, + max_threads: max_size, + max_queue: 0, + ) + end + + def async_invoke(receiver, method, *args) + @pool.post do + invoke(receiver, method, *args) + end + end + + def invoke(receiver, method, *args) + begin + self.connection = receiver + + run_callbacks :work do + receiver.send method, *args + end + rescue Exception => e + logger.error "There was an exception - #{e.class}(#{e.message})" + logger.error e.backtrace.join("\n") + + receiver.handle_exception if receiver.respond_to?(:handle_exception) + ensure + self.connection = nil + end + end + + def async_run_periodic_timer(channel, callback) + @pool.post do + run_periodic_timer(channel, callback) + end + end + + def run_periodic_timer(channel, callback) + begin + self.connection = channel.connection + + run_callbacks :work do + callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback) + end + ensure + self.connection = nil + end + end + + private + + def logger + ActionCable.server.logger + end + end + end +end diff --git a/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb b/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb new file mode 100644 index 0000000000..1ac8934410 --- /dev/null +++ b/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb @@ -0,0 +1,22 @@ +module ActionCable + module Server + class Worker + # Clear active connections between units of work so that way long-running channels or connection processes do not hoard connections. + module ActiveRecordConnectionManagement + extend ActiveSupport::Concern + + included do + if defined?(ActiveRecord::Base) + set_callback :work, :around, :with_database_connections + end + end + + def with_database_connections + connection.logger.tag(ActiveRecord::Base.logger) { yield } + ensure + ActiveRecord::Base.clear_active_connections! + end + end + end + end +end |