aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable/lib/action_cable')
-rw-r--r--actioncable/lib/action_cable/channel/base.rb20
-rw-r--r--actioncable/lib/action_cable/channel/periodic_timers.rb2
-rw-r--r--actioncable/lib/action_cable/channel/streams.rb19
-rw-r--r--actioncable/lib/action_cable/connection/base.rb52
-rw-r--r--actioncable/lib/action_cable/connection/client_socket.rb15
-rw-r--r--actioncable/lib/action_cable/connection/identification.rb2
-rw-r--r--actioncable/lib/action_cable/connection/message_buffer.rb5
-rw-r--r--actioncable/lib/action_cable/connection/stream.rb18
-rw-r--r--actioncable/lib/action_cable/connection/subscriptions.rb6
-rw-r--r--actioncable/lib/action_cable/engine.rb44
-rw-r--r--actioncable/lib/action_cable/gem_version.rb2
-rw-r--r--actioncable/lib/action_cable/helpers/action_cable_helper.rb27
-rw-r--r--actioncable/lib/action_cable/remote_connections.rb4
-rw-r--r--actioncable/lib/action_cable/server/base.rb23
-rw-r--r--actioncable/lib/action_cable/server/broadcasting.rb12
-rw-r--r--actioncable/lib/action_cable/server/configuration.rb19
-rw-r--r--actioncable/lib/action_cable/server/connections.rb9
-rw-r--r--actioncable/lib/action_cable/server/worker.rb52
-rw-r--r--actioncable/lib/action_cable/server/worker/active_record_connection_management.rb5
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/evented_redis.rb12
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/redis.rb12
21 files changed, 228 insertions, 132 deletions
diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb
index 874ebe2e71..714d9887d4 100644
--- a/actioncable/lib/action_cable/channel/base.rb
+++ b/actioncable/lib/action_cable/channel/base.rb
@@ -32,8 +32,8 @@ module ActionCable
#
# == Action processing
#
- # Unlike subclasses of ActionController::Base, channels do not follow a REST
- # constraint form for their actions. Instead, ActionCable operates through a
+ # Unlike subclasses of ActionController::Base, channels do not follow a RESTful
+ # constraint form for their actions. Instead, Action Cable operates through a
# remote-procedure call model. You can declare any public method on the
# channel (optionally taking a <tt>data</tt> argument), and this method is
# automatically exposed as callable to the client.
@@ -63,10 +63,10 @@ module ActionCable
# end
# end
#
- # In this example, subscribed/unsubscribed are not callable methods, as they
+ # In this example, the subscribed and unsubscribed methods are not callable methods, as they
# were already declared in ActionCable::Channel::Base, but <tt>#appear</tt>
# and <tt>#away</tt> are. <tt>#generate_connection_token</tt> is also not
- # callable as it's a private method. You'll see that appear accepts a data
+ # callable, since it's a private method. You'll see that appear accepts a data
# parameter, which it then uses as part of its model call. <tt>#away</tt>
# does not, since it's simply a trigger action.
#
@@ -125,7 +125,7 @@ module ActionCable
protected
# action_methods are cached and there is sometimes need to refresh
# them. ::clear_action_methods! allows you to do that, so next time
- # you run action_methods, they will be recalculated
+ # you run action_methods, they will be recalculated.
def clear_action_methods!
@action_methods = nil
end
@@ -166,9 +166,9 @@ module ActionCable
end
end
- # Called by the cable connection when its cut so the channel has a chance to cleanup with callbacks.
+ # Called by the cable connection when it's cut, so the channel has a chance to cleanup with callbacks.
# This method is not intended to be called directly by the user. Instead, overwrite the #unsubscribed callback.
- def unsubscribe_from_channel
+ def unsubscribe_from_channel # :nodoc:
run_callbacks :unsubscribe do
unsubscribed
end
@@ -183,7 +183,7 @@ module ActionCable
end
# Called once a consumer has cut its cable connection. Can be used for cleaning up connections or marking
- # people as offline or the like.
+ # users as offline or the like.
def unsubscribed
# Override in subclasses
end
@@ -191,7 +191,7 @@ module ActionCable
# Transmit a hash of data to the subscriber. The hash will automatically be wrapped in a JSON envelope with
# the proper channel identifier marked as the recipient.
def transmit(data, via: nil)
- logger.info "#{self.class.name} transmitting #{data.inspect}".tap { |m| m << " (via #{via})" if via }
+ logger.info "#{self.class.name} transmitting #{data.inspect.truncate(300)}".tap { |m| m << " (via #{via})" if via }
connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, message: data)
end
@@ -224,7 +224,6 @@ module ActionCable
end
end
-
def subscribe_to_channel
run_callbacks :subscribe do
subscribed
@@ -237,7 +236,6 @@ module ActionCable
end
end
-
def extract_action(data)
(data['action'].presence || :receive).to_sym
end
diff --git a/actioncable/lib/action_cable/channel/periodic_timers.rb b/actioncable/lib/action_cable/channel/periodic_timers.rb
index 56597d02d7..0f6e854520 100644
--- a/actioncable/lib/action_cable/channel/periodic_timers.rb
+++ b/actioncable/lib/action_cable/channel/periodic_timers.rb
@@ -12,7 +12,7 @@ module ActionCable
end
module ClassMethods
- # Allow you to call a private method <tt>every</tt> so often seconds. This periodic timer can be useful
+ # Allows you to call a private method <tt>every</tt> so often seconds. This periodic timer can be useful
# for sending a steady flow of updates to a client based off an object that was configured on subscription.
# It's an alternative to using streams if the channel is able to do the work internally.
def periodically(callback, every:)
diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb
index 3158f30814..431a5c1063 100644
--- a/actioncable/lib/action_cable/channel/streams.rb
+++ b/actioncable/lib/action_cable/channel/streams.rb
@@ -1,8 +1,8 @@
module ActionCable
module Channel
- # Streams allow channels to route broadcastings to the subscriber. A broadcasting is, as discussed elsewhere, a pub/sub queue where any data
- # put into it is automatically sent to the clients that are connected at that time. It's purely an online queue, though. If you're not
- # streaming a broadcasting at the very moment it sends out an update, you'll not get that update when connecting later.
+ # Streams allow channels to route broadcastings to the subscriber. A broadcasting is, as discussed elsewhere, a pubsub queue where any data
+ # placed into it is automatically sent to the clients that are connected at that time. It's purely an online queue, though. If you're not
+ # streaming a broadcasting at the very moment it sends out an update, you will not get that update, if you connect after it has been sent.
#
# Most commonly, the streamed broadcast is sent straight to the subscriber on the client-side. The channel just acts as a connector between
# the two parties (the broadcaster and the channel subscriber). Here's an example of a channel that allows subscribers to get all new
@@ -18,8 +18,10 @@ module ActionCable
# end
# end
#
- # So the subscribers of this channel will get whatever data is put into the, let's say, `comments_for_45` broadcasting as soon as it's put there.
- # That looks like so from that side of things:
+ # Based on the above example, the subscribers of this channel will get whatever data is put into the,
+ # let's say, `comments_for_45` broadcasting as soon as it's put there.
+ #
+ # An example broadcasting for this channel looks like so:
#
# ActionCable.server.broadcast "comments_for_45", author: 'DHH', content: 'Rails is just swell'
#
@@ -37,8 +39,8 @@ module ActionCable
#
# CommentsChannel.broadcast_to(@post, @comment)
#
- # If you don't just want to parlay the broadcast unfiltered to the subscriber, you can supply a callback that lets you alter what goes out.
- # Example below shows how you can use this to provide performance introspection in the process:
+ # If you don't just want to parlay the broadcast unfiltered to the subscriber, you can also supply a callback that lets you alter what is sent out.
+ # The below example shows how you can use this to provide performance introspection in the process:
#
# class ChatChannel < ApplicationCable::Channel
# def subscribed
@@ -70,7 +72,8 @@ module ActionCable
# Start streaming from the named <tt>broadcasting</tt> pubsub queue. Optionally, you can pass a <tt>callback</tt> that'll be used
# instead of the default of just transmitting the updates straight to the subscriber.
def stream_from(broadcasting, callback = nil)
- # Hold off the confirmation until pubsub#subscribe is successful
+ broadcasting = String(broadcasting)
+ # Don't send the confirmation until pubsub#subscribe is successful
defer_subscription_confirmation!
callback ||= default_stream_callback(broadcasting)
diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb
index b5f898436a..afe0d958d7 100644
--- a/actioncable/lib/action_cable/connection/base.rb
+++ b/actioncable/lib/action_cable/connection/base.rb
@@ -2,9 +2,9 @@ require 'action_dispatch'
module ActionCable
module Connection
- # For every WebSocket the cable server is accepting, a Connection object will be instantiated. This instance becomes the parent
- # of all the channel subscriptions that are created from there on. Incoming messages are then routed to these channel subscriptions
- # based on an identifier sent by the cable consumer. The Connection itself does not deal with any specific application logic beyond
+ # For every WebSocket the Action Cable server accepts, a Connection object will be instantiated. This instance becomes the parent
+ # of all of the channel subscriptions that are created from there on. Incoming messages are then routed to these channel subscriptions
+ # based on an identifier sent by the Action Cable consumer. The Connection itself does not deal with any specific application logic beyond
# authentication and authorization.
#
# Here's a basic example:
@@ -33,9 +33,9 @@ module ActionCable
# end
# end
#
- # First, we declare that this connection can be identified by its current_user. This allows us later to be able to find all connections
- # established for that current_user (and potentially disconnect them if the user was removed from an account). You can declare as many
- # identification indexes as you like. Declaring an identification means that a attr_accessor is automatically set for that key.
+ # First, we declare that this connection can be identified by its current_user. This allows us to later be able to find all connections
+ # established for that current_user (and potentially disconnect them). You can declare as many
+ # identification indexes as you like. Declaring an identification means that an attr_accessor is automatically set for that key.
#
# Second, we rely on the fact that the WebSocket connection is established with the cookies from the domain being sent along. This makes
# it easy to use signed cookies that were set when logging in via a web interface to authorize the WebSocket connection.
@@ -48,12 +48,13 @@ module ActionCable
include InternalChannel
include Authorization
- attr_reader :server, :env, :subscriptions, :logger
- delegate :stream_event_loop, :worker_pool, :pubsub, to: :server
+ attr_reader :server, :env, :subscriptions, :logger, :worker_pool
+ delegate :stream_event_loop, :pubsub, to: :server
def initialize(server, env)
@server, @env = server, env
+ @worker_pool = server.worker_pool
@logger = new_tagged_logger
@websocket = ActionCable::Connection::WebSocket.new(env, self, stream_event_loop)
@@ -65,8 +66,8 @@ module ActionCable
end
# Called by the server when a new WebSocket connection is established. This configures the callbacks intended for overwriting by the user.
- # This method should not be called directly. Rely on the #connect (and #disconnect) callback instead.
- def process
+ # This method should not be called directly -- instead rely upon on the #connect (and #disconnect) callbacks.
+ def process # :nodoc:
logger.info started_request_message
if websocket.possible? && allow_request_origin?
@@ -76,7 +77,7 @@ module ActionCable
end
end
- # Data received over the cable is handled by this method. It's expected that everything inbound is JSON encoded.
+ # Data received over the WebSocket connection is handled by this method. It's expected that everything inbound is JSON encoded.
# The data is routed to the proper channel that the connection has subscribed to.
def receive(data_in_json)
if websocket.alive?
@@ -88,7 +89,7 @@ module ActionCable
# Send raw data straight back down the WebSocket. This is not intended to be called directly. Use the #transmit available on the
# Channel instead, as that'll automatically address the correct subscriber and wrap the message in JSON.
- def transmit(data)
+ def transmit(data) # :nodoc:
websocket.transmit data
end
@@ -154,7 +155,7 @@ module ActionCable
def handle_open
connect if respond_to?(:connect)
subscribe_to_internal_channel
- beat
+ confirm_connection_monitor_subscription
message_buffer.process!
server.add_connection(self)
@@ -173,6 +174,13 @@ module ActionCable
disconnect if respond_to?(:disconnect)
end
+ def confirm_connection_monitor_subscription
+ # Send confirmation message to the internal connection monitor channel.
+ # This ensures the connection monitor state is reset after a successful
+ # websocket connection.
+ transmit ActiveSupport::JSON.encode(identifier: ActionCable::INTERNAL[:identifiers][:ping], type: ActionCable::INTERNAL[:message_types][:confirmation])
+ end
+
def allow_request_origin?
return true if server.config.disable_request_forgery_protection
@@ -185,12 +193,14 @@ module ActionCable
end
def respond_to_successful_request
+ logger.info successful_request_message
websocket.rack_response
end
def respond_to_invalid_request
close if websocket.alive?
+ logger.error invalid_request_message
logger.info finished_request_message
[ 404, { 'Content-Type' => 'text/plain' }, [ 'Page not found' ] ]
end
@@ -205,7 +215,7 @@ module ActionCable
'Started %s "%s"%s for %s at %s' % [
request.request_method,
request.filtered_path,
- websocket.possible? ? ' [WebSocket]' : '',
+ websocket.possible? ? ' [WebSocket]' : '[non-WebSocket]',
request.ip,
Time.now.to_s ]
end
@@ -213,10 +223,22 @@ module ActionCable
def finished_request_message
'Finished "%s"%s for %s at %s' % [
request.filtered_path,
- websocket.possible? ? ' [WebSocket]' : '',
+ websocket.possible? ? ' [WebSocket]' : '[non-WebSocket]',
request.ip,
Time.now.to_s ]
end
+
+ def invalid_request_message
+ 'Failed to upgrade to WebSocket (REQUEST_METHOD: %s, HTTP_CONNECTION: %s, HTTP_UPGRADE: %s)' % [
+ env["REQUEST_METHOD"], env["HTTP_CONNECTION"], env["HTTP_UPGRADE"]
+ ]
+ end
+
+ def successful_request_message
+ 'Successfully upgraded to WebSocket (REQUEST_METHOD: %s, HTTP_CONNECTION: %s, HTTP_UPGRADE: %s)' % [
+ env["REQUEST_METHOD"], env["HTTP_CONNECTION"], env["HTTP_UPGRADE"]
+ ]
+ end
end
end
end
diff --git a/actioncable/lib/action_cable/connection/client_socket.rb b/actioncable/lib/action_cable/connection/client_socket.rb
index ef937d7c16..f6b11e93f0 100644
--- a/actioncable/lib/action_cable/connection/client_socket.rb
+++ b/actioncable/lib/action_cable/connection/client_socket.rb
@@ -50,14 +50,16 @@ module ActionCable
@driver.on(:error) { |e| emit_error(e.message) }
@stream = ActionCable::Connection::Stream.new(@stream_event_loop, self)
+ end
+
+ def start_driver
+ return if @driver.nil? || @driver_started
+ @stream.hijack_rack_socket
if callback = @env['async.callback']
callback.call([101, {}, @stream])
end
- end
- def start_driver
- return if @driver.nil? || @driver_started
@driver_started = true
@driver.start
end
@@ -132,11 +134,8 @@ module ActionCable
@ready_state = CLOSING
@close_params = [reason, code]
- if @stream
- @stream.shutdown
- else
- finalize_close
- end
+ @stream.shutdown if @stream
+ finalize_close
end
def finalize_close
diff --git a/actioncable/lib/action_cable/connection/identification.rb b/actioncable/lib/action_cable/connection/identification.rb
index 885ff3f102..4a54044aff 100644
--- a/actioncable/lib/action_cable/connection/identification.rb
+++ b/actioncable/lib/action_cable/connection/identification.rb
@@ -12,7 +12,7 @@ module ActionCable
class_methods do
# Mark a key as being a connection identifier index that can then be used to find the specific connection again later.
- # Common identifiers are current_user and current_account, but could be anything really.
+ # Common identifiers are current_user and current_account, but could be anything, really.
#
# Note that anything marked as an identifier will automatically create a delegate by the same name on any
# channel instances created off the connection.
diff --git a/actioncable/lib/action_cable/connection/message_buffer.rb b/actioncable/lib/action_cable/connection/message_buffer.rb
index 2f65a1e84a..19f2e6e918 100644
--- a/actioncable/lib/action_cable/connection/message_buffer.rb
+++ b/actioncable/lib/action_cable/connection/message_buffer.rb
@@ -1,8 +1,7 @@
module ActionCable
module Connection
- # Allows us to buffer messages received from the WebSocket before the Connection has been fully initialized and is ready to receive them.
- # Entirely internal operation and should not be used directly by the user.
- class MessageBuffer
+ # Allows us to buffer messages received from the WebSocket before the Connection has been fully initialized, and is ready to receive them.
+ class MessageBuffer # :nodoc:
def initialize(connection)
@connection = connection
@buffered_messages = []
diff --git a/actioncable/lib/action_cable/connection/stream.rb b/actioncable/lib/action_cable/connection/stream.rb
index ace250cd16..2d97b28c09 100644
--- a/actioncable/lib/action_cable/connection/stream.rb
+++ b/actioncable/lib/action_cable/connection/stream.rb
@@ -4,15 +4,13 @@ module ActionCable
# This class is heavily based on faye-websocket-ruby
#
# Copyright (c) 2010-2015 James Coglan
- class Stream
+ class Stream # :nodoc:
def initialize(event_loop, socket)
@event_loop = event_loop
@socket_object = socket
@stream_send = socket.env['stream.send']
@rack_hijack_io = nil
-
- hijack_rack_socket
end
def each(&callback)
@@ -39,16 +37,16 @@ module ActionCable
@socket_object.parse(data)
end
- private
- def hijack_rack_socket
- return unless @socket_object.env['rack.hijack']
+ def hijack_rack_socket
+ return unless @socket_object.env['rack.hijack']
- @socket_object.env['rack.hijack'].call
- @rack_hijack_io = @socket_object.env['rack.hijack_io']
+ @socket_object.env['rack.hijack'].call
+ @rack_hijack_io = @socket_object.env['rack.hijack_io']
- @event_loop.attach(@rack_hijack_io, self)
- end
+ @event_loop.attach(@rack_hijack_io, self)
+ end
+ private
def clean_rack_hijack
return unless @rack_hijack_io
@event_loop.detach(@rack_hijack_io, self)
diff --git a/actioncable/lib/action_cable/connection/subscriptions.rb b/actioncable/lib/action_cable/connection/subscriptions.rb
index d7f95e6a62..3742f248d1 100644
--- a/actioncable/lib/action_cable/connection/subscriptions.rb
+++ b/actioncable/lib/action_cable/connection/subscriptions.rb
@@ -3,8 +3,8 @@ require 'active_support/core_ext/hash/indifferent_access'
module ActionCable
module Connection
# Collection class for all the channel subscriptions established on a given connection. Responsible for routing incoming commands that arrive on
- # the connection to the proper channel. Should not be used directly by the user.
- class Subscriptions
+ # the connection to the proper channel.
+ class Subscriptions # :nodoc:
def initialize(connection)
@connection = connection
@subscriptions = {}
@@ -54,7 +54,7 @@ module ActionCable
end
def unsubscribe_from_all
- subscriptions.each { |id, channel| channel.unsubscribe_from_channel }
+ subscriptions.each { |id, channel| remove_subscription(channel) }
end
protected
diff --git a/actioncable/lib/action_cable/engine.rb b/actioncable/lib/action_cable/engine.rb
index f5e233e091..c90aadaf2c 100644
--- a/actioncable/lib/action_cable/engine.rb
+++ b/actioncable/lib/action_cable/engine.rb
@@ -6,7 +6,7 @@ require "active_support/core_ext/hash/indifferent_access"
module ActionCable
class Railtie < Rails::Engine # :nodoc:
config.action_cable = ActiveSupport::OrderedOptions.new
- config.action_cable.url = '/cable'
+ config.action_cable.mount_path = '/cable'
config.eager_load_namespaces << ActionCable
@@ -31,8 +31,50 @@ module ActionCable
self.cable = Rails.application.config_for(config_path).with_indifferent_access
end
+ if 'ApplicationCable::Connection'.safe_constantize
+ self.connection_class = ApplicationCable::Connection
+ end
+
+ self.channel_paths = Rails.application.paths['app/channels'].existent
+
options.each { |k,v| send("#{k}=", v) }
end
end
+
+ initializer "action_cable.routes" do
+ config.after_initialize do |app|
+ config = app.config
+ unless config.action_cable.mount_path.nil?
+ app.routes.prepend do
+ mount ActionCable.server => config.action_cable.mount_path, internal: true
+ end
+ end
+ end
+ end
+
+ initializer "action_cable.set_work_hooks" do |app|
+ ActiveSupport.on_load(:action_cable) do
+ ActionCable::Server::Worker.set_callback :work, :around, prepend: true do |_, inner|
+ app.executor.wrap do
+ # If we took a while to get the lock, we may have been halted
+ # in the meantime. As we haven't started doing any real work
+ # yet, we should pretend that we never made it off the queue.
+ unless stopping?
+ inner.call
+ end
+ end
+ end
+
+ wrap = lambda do |_, inner|
+ app.executor.wrap(&inner)
+ end
+ ActionCable::Channel::Base.set_callback :subscribe, :around, prepend: true, &wrap
+ ActionCable::Channel::Base.set_callback :unsubscribe, :around, prepend: true, &wrap
+
+ app.reloader.before_class_unload do
+ ActionCable.server.restart
+ end
+ end
+ end
end
end
diff --git a/actioncable/lib/action_cable/gem_version.rb b/actioncable/lib/action_cable/gem_version.rb
index a71603e61a..67adeefaff 100644
--- a/actioncable/lib/action_cable/gem_version.rb
+++ b/actioncable/lib/action_cable/gem_version.rb
@@ -8,7 +8,7 @@ module ActionCable
MAJOR = 5
MINOR = 0
TINY = 0
- PRE = "beta2"
+ PRE = "beta3"
STRING = [MAJOR, MINOR, TINY, PRE].compact.join(".")
end
diff --git a/actioncable/lib/action_cable/helpers/action_cable_helper.rb b/actioncable/lib/action_cable/helpers/action_cable_helper.rb
index b82751468a..2081a37db6 100644
--- a/actioncable/lib/action_cable/helpers/action_cable_helper.rb
+++ b/actioncable/lib/action_cable/helpers/action_cable_helper.rb
@@ -1,28 +1,39 @@
module ActionCable
module Helpers
module ActionCableHelper
- # Returns an "action-cable-url" meta tag with the value of the url specified in your
- # configuration. Ensure this is above your javascript tag:
+ # Returns an "action-cable-url" meta tag with the value of the URL specified in your
+ # configuration. Ensure this is above your JavaScript tag:
#
# <head>
# <%= action_cable_meta_tag %>
# <%= javascript_include_tag 'application', 'data-turbolinks-track' => true %>
# </head>
#
- # This is then used by ActionCable to determine the url of your websocket server.
+ # This is then used by Action Cable to determine the URL of your WebSocket server.
# Your CoffeeScript can then connect to the server without needing to specify the
- # url directly:
+ # URL directly:
#
# #= require cable
# @App = {}
# App.cable = Cable.createConsumer()
#
- # Make sure to specify the correct server location in each of your environments
- # config file:
+ # Make sure to specify the correct server location in each of your environment
+ # config files:
+ #
+ # config.action_cable.mount_path = "/cable123"
+ # <%= action_cable_meta_tag %> would render:
+ # => <meta name="action-cable-url" content="/cable123" />
+ #
+ # config.action_cable.url = "ws://actioncable.com"
+ # <%= action_cable_meta_tag %> would render:
+ # => <meta name="action-cable-url" content="ws://actioncable.com" />
#
- # config.action_cable.url = "ws://example.com:28080"
def action_cable_meta_tag
- tag "meta", name: "action-cable-url", content: Rails.application.config.action_cable.url
+ tag "meta", name: "action-cable-url", content: (
+ ActionCable.server.config.url ||
+ ActionCable.server.config.mount_path ||
+ raise("No Action Cable URL configured -- please configure this at config.action_cable.url")
+ )
end
end
end
diff --git a/actioncable/lib/action_cable/remote_connections.rb b/actioncable/lib/action_cable/remote_connections.rb
index 7ec121308a..aeef8abc72 100644
--- a/actioncable/lib/action_cable/remote_connections.rb
+++ b/actioncable/lib/action_cable/remote_connections.rb
@@ -13,8 +13,8 @@ module ActionCable
# ActionCable.server.remote_connections.where(current_user: User.find(1)).disconnect
#
# This will disconnect all the connections established for
- # <tt>User.find(1)</tt> across all servers running on all machines, because
- # it uses the internal channel that all these servers are subscribed to.
+ # <tt>User.find(1)</tt>, across all servers running on all machines, because
+ # it uses the internal channel that all of these servers are subscribed to.
class RemoteConnections
attr_reader :server
diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb
index fe48c112df..d9a2653cc2 100644
--- a/actioncable/lib/action_cable/server/base.rb
+++ b/actioncable/lib/action_cable/server/base.rb
@@ -2,10 +2,10 @@ require 'thread'
module ActionCable
module Server
- # A singleton ActionCable::Server instance is available via ActionCable.server. It's used by the rack process that starts the cable server, but
- # also by the user to reach the RemoteConnections instead for finding and disconnecting connections across all servers.
+ # A singleton ActionCable::Server instance is available via ActionCable.server. It's used by the Rack process that starts the Action Cable server, but
+ # is also used by the user to reach the RemoteConnections object, which is used for finding and disconnecting connections across all servers.
#
- # Also, this is the server instance used for broadcasting. See Broadcasting for details.
+ # Also, this is the server instance used for broadcasting. See Broadcasting for more information.
class Base
include ActionCable::Server::Broadcasting
include ActionCable::Server::Connections
@@ -19,11 +19,10 @@ module ActionCable
def initialize
@mutex = Mutex.new
-
@remote_connections = @stream_event_loop = @worker_pool = @channel_classes = @pubsub = nil
end
- # Called by rack to setup the server.
+ # Called by Rack to setup the server.
def call(env)
setup_heartbeat_timer
config.connection_class.new(self, env).process
@@ -34,6 +33,16 @@ module ActionCable
remote_connections.where(identifiers).disconnect
end
+ def restart
+ connections.each(&:close)
+
+ @mutex.synchronize do
+ worker_pool.halt if @worker_pool
+
+ @worker_pool = nil
+ end
+ end
+
# Gateway to RemoteConnections. See that class for details.
def remote_connections
@remote_connections || @mutex.synchronize { @remote_connections ||= RemoteConnections.new(self) }
@@ -48,7 +57,7 @@ module ActionCable
@worker_pool || @mutex.synchronize { @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size) }
end
- # Requires and returns a hash of all the channel class constants keyed by name.
+ # Requires and returns a hash of all of the channel class constants, which are keyed by name.
def channel_classes
@channel_classes || @mutex.synchronize do
@channel_classes ||= begin
@@ -63,7 +72,7 @@ module ActionCable
@pubsub || @mutex.synchronize { @pubsub ||= config.pubsub_adapter.new(self) }
end
- # All the identifiers applied to the connection class associated with this server.
+ # All of the identifiers applied to the connection class associated with this server.
def connection_identifiers
config.connection_class.identifiers
end
diff --git a/actioncable/lib/action_cable/server/broadcasting.rb b/actioncable/lib/action_cable/server/broadcasting.rb
index 7e8aef45f4..98025f27f2 100644
--- a/actioncable/lib/action_cable/server/broadcasting.rb
+++ b/actioncable/lib/action_cable/server/broadcasting.rb
@@ -1,6 +1,6 @@
module ActionCable
module Server
- # Broadcasting is how other parts of your application can send messages to the channel subscribers. As explained in Channel, most of the time, these
+ # Broadcasting is how other parts of your application can send messages to a channel's subscribers. As explained in Channel, most of the time, these
# broadcastings are streamed directly to the clients subscribed to the named broadcasting. Let's explain with a full-stack example:
#
# class WebNotificationsChannel < ApplicationCable::Channel
@@ -9,24 +9,24 @@ module ActionCable
# end
# end
#
- # # Somewhere in your app this is called, perhaps from a NewCommentJob
+ # # Somewhere in your app this is called, perhaps from a NewCommentJob:
# ActionCable.server.broadcast \
# "web_notifications_1", { title: "New things!", body: "All that's fit for print" }
#
- # # Client-side CoffeeScript, which assumes you've already requested the right to send web notifications
+ # # Client-side CoffeeScript, which assumes you've already requested the right to send web notifications:
# App.cable.subscriptions.create "WebNotificationsChannel",
# received: (data) ->
# new Notification data['title'], body: data['body']
module Broadcasting
- # Broadcast a hash directly to a named <tt>broadcasting</tt>. It'll automatically be JSON encoded.
+ # Broadcast a hash directly to a named <tt>broadcasting</tt>. This will later be JSON encoded.
def broadcast(broadcasting, message)
broadcaster_for(broadcasting).broadcast(message)
end
- # Returns a broadcaster for a named <tt>broadcasting</tt> that can be reused. Useful when you have a object that
+ # Returns a broadcaster for a named <tt>broadcasting</tt> that can be reused. Useful when you have an object that
# may need multiple spots to transmit to a specific broadcasting over and over.
def broadcaster_for(broadcasting)
- Broadcaster.new(self, broadcasting)
+ Broadcaster.new(self, String(broadcasting))
end
private
diff --git a/actioncable/lib/action_cable/server/configuration.rb b/actioncable/lib/action_cable/server/configuration.rb
index 9a248933c4..9a7301287c 100644
--- a/actioncable/lib/action_cable/server/configuration.rb
+++ b/actioncable/lib/action_cable/server/configuration.rb
@@ -1,31 +1,24 @@
module ActionCable
module Server
- # An instance of this configuration object is available via ActionCable.server.config, which allows you to tweak the configuration points
+ # An instance of this configuration object is available via ActionCable.server.config, which allows you to tweak Action Cable configuration
# in a Rails config initializer.
class Configuration
attr_accessor :logger, :log_tags
attr_accessor :connection_class, :worker_pool_size
- attr_accessor :channel_load_paths
attr_accessor :disable_request_forgery_protection, :allowed_request_origins
- attr_accessor :cable, :url
+ attr_accessor :cable, :url, :mount_path
+
+ attr_accessor :channel_paths # :nodoc:
def initialize
@log_tags = []
- @connection_class = ApplicationCable::Connection
- @worker_pool_size = 100
-
- @channel_load_paths = [Rails.root.join('app/channels')]
+ @connection_class = ActionCable::Connection::Base
+ @worker_pool_size = 100
@disable_request_forgery_protection = false
end
- def channel_paths
- @channel_paths ||= channel_load_paths.flat_map do |path|
- Dir["#{path}/**/*_channel.rb"]
- end
- end
-
def channel_class_names
@channel_class_names ||= channel_paths.collect do |channel_path|
Pathname.new(channel_path).basename.to_s.split('.').first.camelize
diff --git a/actioncable/lib/action_cable/server/connections.rb b/actioncable/lib/action_cable/server/connections.rb
index 8671dd5ebd..4dc8934b25 100644
--- a/actioncable/lib/action_cable/server/connections.rb
+++ b/actioncable/lib/action_cable/server/connections.rb
@@ -1,9 +1,8 @@
module ActionCable
module Server
- # Collection class for all the connections that's been established on this specific server. Remember, usually you'll run many cable servers, so
- # you can't use this collection as an full list of all the connections established against your application. Use RemoteConnections for that.
- # As such, this is primarily for internal use.
- module Connections
+ # Collection class for all the connections that have been established on this specific server. Remember, usually you'll run many Action Cable servers, so
+ # you can't use this collection as a full list of all of the connections established against your application. Instead, use RemoteConnections for that.
+ module Connections # :nodoc:
BEAT_INTERVAL = 3
def connections
@@ -19,7 +18,7 @@ module ActionCable
end
# WebSocket connection implementations differ on when they'll mark a connection as stale. We basically never want a connection to go stale, as you
- # then can't rely on being able to receive and send to it. So there's a 3 second heartbeat running on all connections. If the beat fails, we automatically
+ # then can't rely on being able to communicate with the connection. To solve this, a 3 second heartbeat runs on all connections. If the beat fails, we automatically
# disconnect.
def setup_heartbeat_timer
@heartbeat_timer ||= Concurrent::TimerTask.new(execution_interval: BEAT_INTERVAL) do
diff --git a/actioncable/lib/action_cable/server/worker.rb b/actioncable/lib/action_cable/server/worker.rb
index 3b6c6d44a1..49cbaec0c0 100644
--- a/actioncable/lib/action_cable/server/worker.rb
+++ b/actioncable/lib/action_cable/server/worker.rb
@@ -4,8 +4,8 @@ require 'concurrent'
module ActionCable
module Server
- # Worker used by Server.send_async to do connection work in threads. Only for internal use.
- class Worker
+ # Worker used by Server.send_async to do connection work in threads.
+ class Worker # :nodoc:
include ActiveSupport::Callbacks
thread_mattr_accessor :connection
@@ -20,6 +20,26 @@ module ActionCable
)
end
+ # Stop processing work: any work that has not already started
+ # running will be discarded from the queue
+ def halt
+ @pool.kill
+ end
+
+ def stopping?
+ @pool.shuttingdown?
+ end
+
+ def work(connection)
+ self.connection = connection
+
+ run_callbacks :work do
+ yield
+ end
+ ensure
+ self.connection = nil
+ end
+
def async_invoke(receiver, method, *args)
@pool.post do
invoke(receiver, method, *args)
@@ -27,19 +47,15 @@ module ActionCable
end
def invoke(receiver, method, *args)
- begin
- self.connection = receiver
-
- run_callbacks :work do
+ work(receiver) do
+ begin
receiver.send method, *args
- end
- rescue Exception => e
- logger.error "There was an exception - #{e.class}(#{e.message})"
- logger.error e.backtrace.join("\n")
+ rescue Exception => e
+ logger.error "There was an exception - #{e.class}(#{e.message})"
+ logger.error e.backtrace.join("\n")
- receiver.handle_exception if receiver.respond_to?(:handle_exception)
- ensure
- self.connection = nil
+ receiver.handle_exception if receiver.respond_to?(:handle_exception)
+ end
end
end
@@ -50,14 +66,8 @@ module ActionCable
end
def run_periodic_timer(channel, callback)
- begin
- self.connection = channel.connection
-
- run_callbacks :work do
- callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback)
- end
- ensure
- self.connection = nil
+ work(channel.connection) do
+ callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback)
end
end
diff --git a/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb b/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb
index ecece4e270..c1e4aa8103 100644
--- a/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb
+++ b/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb
@@ -1,7 +1,6 @@
module ActionCable
module Server
class Worker
- # Clear active connections between units of work so the long-running channel or connection processes do not hoard connections.
module ActiveRecordConnectionManagement
extend ActiveSupport::Concern
@@ -13,10 +12,8 @@ module ActionCable
def with_database_connections
connection.logger.tag(ActiveRecord::Base.logger) { yield }
- ensure
- ActiveRecord::Base.clear_active_connections!
end
end
end
end
-end \ No newline at end of file
+end
diff --git a/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb
index d697548cbd..e6c862959b 100644
--- a/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb
@@ -13,6 +13,14 @@ module ActionCable
class EventedRedis < Base # :nodoc:
@@mutex = Mutex.new
+ # Overwrite this factory method for EventMachine Redis connections if you want to use a different Redis connection library than EM::Hiredis.
+ # This is needed, for example, when using Makara proxies for distributed Redis.
+ cattr_accessor(:em_redis_connector) { ->(config) { EM::Hiredis.connect(config[:url]) } }
+
+ # Overwrite this factory method for Redis connections if you want to use a different Redis connection library than Redis.
+ # This is needed, for example, when using Makara proxies for distributed Redis.
+ cattr_accessor(:redis_connector) { ->(config) { ::Redis.new(url: config[:url]) } }
+
def initialize(*)
super
@redis_connection_for_broadcasts = @redis_connection_for_subscriptions = nil
@@ -41,7 +49,7 @@ module ActionCable
def redis_connection_for_subscriptions
ensure_reactor_running
@redis_connection_for_subscriptions || @server.mutex.synchronize do
- @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis|
+ @redis_connection_for_subscriptions ||= self.class.em_redis_connector.call(@server.config.cable).tap do |redis|
redis.on(:reconnect_failed) do
@logger.info "[ActionCable] Redis reconnect failed."
end
@@ -51,7 +59,7 @@ module ActionCable
def redis_connection_for_broadcasts
@redis_connection_for_broadcasts || @server.mutex.synchronize do
- @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable)
+ @redis_connection_for_broadcasts ||= self.class.redis_connector.call(@server.config.cable)
end
end
diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb
index 7076383efe..6b4236e7d3 100644
--- a/actioncable/lib/action_cable/subscription_adapter/redis.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb
@@ -6,6 +6,10 @@ require 'redis'
module ActionCable
module SubscriptionAdapter
class Redis < Base # :nodoc:
+ # Overwrite this factory method for redis connections if you want to use a different Redis library than Redis.
+ # This is needed, for example, when using Makara proxies for distributed Redis.
+ cattr_accessor(:redis_connector) { ->(config) { ::Redis.new(url: config[:url]) } }
+
def initialize(*)
super
@listener = nil
@@ -29,7 +33,7 @@ module ActionCable
end
def redis_connection_for_subscriptions
- ::Redis.new(@server.config.cable)
+ redis_connection
end
private
@@ -39,10 +43,14 @@ module ActionCable
def redis_connection_for_broadcasts
@redis_connection_for_broadcasts || @server.mutex.synchronize do
- @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable)
+ @redis_connection_for_broadcasts ||= redis_connection
end
end
+ def redis_connection
+ self.class.redis_connector.call(@server.config.cable)
+ end
+
class Listener < SubscriberMap
def initialize(adapter)
super()