diff options
Diffstat (limited to 'actioncable/lib')
25 files changed, 256 insertions, 133 deletions
diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb index 874ebe2e71..714d9887d4 100644 --- a/actioncable/lib/action_cable/channel/base.rb +++ b/actioncable/lib/action_cable/channel/base.rb @@ -32,8 +32,8 @@ module ActionCable # # == Action processing # - # Unlike subclasses of ActionController::Base, channels do not follow a REST - # constraint form for their actions. Instead, ActionCable operates through a + # Unlike subclasses of ActionController::Base, channels do not follow a RESTful + # constraint form for their actions. Instead, Action Cable operates through a # remote-procedure call model. You can declare any public method on the # channel (optionally taking a <tt>data</tt> argument), and this method is # automatically exposed as callable to the client. @@ -63,10 +63,10 @@ module ActionCable # end # end # - # In this example, subscribed/unsubscribed are not callable methods, as they + # In this example, the subscribed and unsubscribed methods are not callable methods, as they # were already declared in ActionCable::Channel::Base, but <tt>#appear</tt> # and <tt>#away</tt> are. <tt>#generate_connection_token</tt> is also not - # callable as it's a private method. You'll see that appear accepts a data + # callable, since it's a private method. You'll see that appear accepts a data # parameter, which it then uses as part of its model call. <tt>#away</tt> # does not, since it's simply a trigger action. # @@ -125,7 +125,7 @@ module ActionCable protected # action_methods are cached and there is sometimes need to refresh # them. ::clear_action_methods! allows you to do that, so next time - # you run action_methods, they will be recalculated + # you run action_methods, they will be recalculated. def clear_action_methods! @action_methods = nil end @@ -166,9 +166,9 @@ module ActionCable end end - # Called by the cable connection when its cut so the channel has a chance to cleanup with callbacks. + # Called by the cable connection when it's cut, so the channel has a chance to cleanup with callbacks. # This method is not intended to be called directly by the user. Instead, overwrite the #unsubscribed callback. - def unsubscribe_from_channel + def unsubscribe_from_channel # :nodoc: run_callbacks :unsubscribe do unsubscribed end @@ -183,7 +183,7 @@ module ActionCable end # Called once a consumer has cut its cable connection. Can be used for cleaning up connections or marking - # people as offline or the like. + # users as offline or the like. def unsubscribed # Override in subclasses end @@ -191,7 +191,7 @@ module ActionCable # Transmit a hash of data to the subscriber. The hash will automatically be wrapped in a JSON envelope with # the proper channel identifier marked as the recipient. def transmit(data, via: nil) - logger.info "#{self.class.name} transmitting #{data.inspect}".tap { |m| m << " (via #{via})" if via } + logger.info "#{self.class.name} transmitting #{data.inspect.truncate(300)}".tap { |m| m << " (via #{via})" if via } connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, message: data) end @@ -224,7 +224,6 @@ module ActionCable end end - def subscribe_to_channel run_callbacks :subscribe do subscribed @@ -237,7 +236,6 @@ module ActionCable end end - def extract_action(data) (data['action'].presence || :receive).to_sym end diff --git a/actioncable/lib/action_cable/channel/periodic_timers.rb b/actioncable/lib/action_cable/channel/periodic_timers.rb index 56597d02d7..0f6e854520 100644 --- a/actioncable/lib/action_cable/channel/periodic_timers.rb +++ b/actioncable/lib/action_cable/channel/periodic_timers.rb @@ -12,7 +12,7 @@ module ActionCable end module ClassMethods - # Allow you to call a private method <tt>every</tt> so often seconds. This periodic timer can be useful + # Allows you to call a private method <tt>every</tt> so often seconds. This periodic timer can be useful # for sending a steady flow of updates to a client based off an object that was configured on subscription. # It's an alternative to using streams if the channel is able to do the work internally. def periodically(callback, every:) diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index 3158f30814..431a5c1063 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -1,8 +1,8 @@ module ActionCable module Channel - # Streams allow channels to route broadcastings to the subscriber. A broadcasting is, as discussed elsewhere, a pub/sub queue where any data - # put into it is automatically sent to the clients that are connected at that time. It's purely an online queue, though. If you're not - # streaming a broadcasting at the very moment it sends out an update, you'll not get that update when connecting later. + # Streams allow channels to route broadcastings to the subscriber. A broadcasting is, as discussed elsewhere, a pubsub queue where any data + # placed into it is automatically sent to the clients that are connected at that time. It's purely an online queue, though. If you're not + # streaming a broadcasting at the very moment it sends out an update, you will not get that update, if you connect after it has been sent. # # Most commonly, the streamed broadcast is sent straight to the subscriber on the client-side. The channel just acts as a connector between # the two parties (the broadcaster and the channel subscriber). Here's an example of a channel that allows subscribers to get all new @@ -18,8 +18,10 @@ module ActionCable # end # end # - # So the subscribers of this channel will get whatever data is put into the, let's say, `comments_for_45` broadcasting as soon as it's put there. - # That looks like so from that side of things: + # Based on the above example, the subscribers of this channel will get whatever data is put into the, + # let's say, `comments_for_45` broadcasting as soon as it's put there. + # + # An example broadcasting for this channel looks like so: # # ActionCable.server.broadcast "comments_for_45", author: 'DHH', content: 'Rails is just swell' # @@ -37,8 +39,8 @@ module ActionCable # # CommentsChannel.broadcast_to(@post, @comment) # - # If you don't just want to parlay the broadcast unfiltered to the subscriber, you can supply a callback that lets you alter what goes out. - # Example below shows how you can use this to provide performance introspection in the process: + # If you don't just want to parlay the broadcast unfiltered to the subscriber, you can also supply a callback that lets you alter what is sent out. + # The below example shows how you can use this to provide performance introspection in the process: # # class ChatChannel < ApplicationCable::Channel # def subscribed @@ -70,7 +72,8 @@ module ActionCable # Start streaming from the named <tt>broadcasting</tt> pubsub queue. Optionally, you can pass a <tt>callback</tt> that'll be used # instead of the default of just transmitting the updates straight to the subscriber. def stream_from(broadcasting, callback = nil) - # Hold off the confirmation until pubsub#subscribe is successful + broadcasting = String(broadcasting) + # Don't send the confirmation until pubsub#subscribe is successful defer_subscription_confirmation! callback ||= default_stream_callback(broadcasting) diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb index b5f898436a..afe0d958d7 100644 --- a/actioncable/lib/action_cable/connection/base.rb +++ b/actioncable/lib/action_cable/connection/base.rb @@ -2,9 +2,9 @@ require 'action_dispatch' module ActionCable module Connection - # For every WebSocket the cable server is accepting, a Connection object will be instantiated. This instance becomes the parent - # of all the channel subscriptions that are created from there on. Incoming messages are then routed to these channel subscriptions - # based on an identifier sent by the cable consumer. The Connection itself does not deal with any specific application logic beyond + # For every WebSocket the Action Cable server accepts, a Connection object will be instantiated. This instance becomes the parent + # of all of the channel subscriptions that are created from there on. Incoming messages are then routed to these channel subscriptions + # based on an identifier sent by the Action Cable consumer. The Connection itself does not deal with any specific application logic beyond # authentication and authorization. # # Here's a basic example: @@ -33,9 +33,9 @@ module ActionCable # end # end # - # First, we declare that this connection can be identified by its current_user. This allows us later to be able to find all connections - # established for that current_user (and potentially disconnect them if the user was removed from an account). You can declare as many - # identification indexes as you like. Declaring an identification means that a attr_accessor is automatically set for that key. + # First, we declare that this connection can be identified by its current_user. This allows us to later be able to find all connections + # established for that current_user (and potentially disconnect them). You can declare as many + # identification indexes as you like. Declaring an identification means that an attr_accessor is automatically set for that key. # # Second, we rely on the fact that the WebSocket connection is established with the cookies from the domain being sent along. This makes # it easy to use signed cookies that were set when logging in via a web interface to authorize the WebSocket connection. @@ -48,12 +48,13 @@ module ActionCable include InternalChannel include Authorization - attr_reader :server, :env, :subscriptions, :logger - delegate :stream_event_loop, :worker_pool, :pubsub, to: :server + attr_reader :server, :env, :subscriptions, :logger, :worker_pool + delegate :stream_event_loop, :pubsub, to: :server def initialize(server, env) @server, @env = server, env + @worker_pool = server.worker_pool @logger = new_tagged_logger @websocket = ActionCable::Connection::WebSocket.new(env, self, stream_event_loop) @@ -65,8 +66,8 @@ module ActionCable end # Called by the server when a new WebSocket connection is established. This configures the callbacks intended for overwriting by the user. - # This method should not be called directly. Rely on the #connect (and #disconnect) callback instead. - def process + # This method should not be called directly -- instead rely upon on the #connect (and #disconnect) callbacks. + def process # :nodoc: logger.info started_request_message if websocket.possible? && allow_request_origin? @@ -76,7 +77,7 @@ module ActionCable end end - # Data received over the cable is handled by this method. It's expected that everything inbound is JSON encoded. + # Data received over the WebSocket connection is handled by this method. It's expected that everything inbound is JSON encoded. # The data is routed to the proper channel that the connection has subscribed to. def receive(data_in_json) if websocket.alive? @@ -88,7 +89,7 @@ module ActionCable # Send raw data straight back down the WebSocket. This is not intended to be called directly. Use the #transmit available on the # Channel instead, as that'll automatically address the correct subscriber and wrap the message in JSON. - def transmit(data) + def transmit(data) # :nodoc: websocket.transmit data end @@ -154,7 +155,7 @@ module ActionCable def handle_open connect if respond_to?(:connect) subscribe_to_internal_channel - beat + confirm_connection_monitor_subscription message_buffer.process! server.add_connection(self) @@ -173,6 +174,13 @@ module ActionCable disconnect if respond_to?(:disconnect) end + def confirm_connection_monitor_subscription + # Send confirmation message to the internal connection monitor channel. + # This ensures the connection monitor state is reset after a successful + # websocket connection. + transmit ActiveSupport::JSON.encode(identifier: ActionCable::INTERNAL[:identifiers][:ping], type: ActionCable::INTERNAL[:message_types][:confirmation]) + end + def allow_request_origin? return true if server.config.disable_request_forgery_protection @@ -185,12 +193,14 @@ module ActionCable end def respond_to_successful_request + logger.info successful_request_message websocket.rack_response end def respond_to_invalid_request close if websocket.alive? + logger.error invalid_request_message logger.info finished_request_message [ 404, { 'Content-Type' => 'text/plain' }, [ 'Page not found' ] ] end @@ -205,7 +215,7 @@ module ActionCable 'Started %s "%s"%s for %s at %s' % [ request.request_method, request.filtered_path, - websocket.possible? ? ' [WebSocket]' : '', + websocket.possible? ? ' [WebSocket]' : '[non-WebSocket]', request.ip, Time.now.to_s ] end @@ -213,10 +223,22 @@ module ActionCable def finished_request_message 'Finished "%s"%s for %s at %s' % [ request.filtered_path, - websocket.possible? ? ' [WebSocket]' : '', + websocket.possible? ? ' [WebSocket]' : '[non-WebSocket]', request.ip, Time.now.to_s ] end + + def invalid_request_message + 'Failed to upgrade to WebSocket (REQUEST_METHOD: %s, HTTP_CONNECTION: %s, HTTP_UPGRADE: %s)' % [ + env["REQUEST_METHOD"], env["HTTP_CONNECTION"], env["HTTP_UPGRADE"] + ] + end + + def successful_request_message + 'Successfully upgraded to WebSocket (REQUEST_METHOD: %s, HTTP_CONNECTION: %s, HTTP_UPGRADE: %s)' % [ + env["REQUEST_METHOD"], env["HTTP_CONNECTION"], env["HTTP_UPGRADE"] + ] + end end end end diff --git a/actioncable/lib/action_cable/connection/client_socket.rb b/actioncable/lib/action_cable/connection/client_socket.rb index ef937d7c16..f6b11e93f0 100644 --- a/actioncable/lib/action_cable/connection/client_socket.rb +++ b/actioncable/lib/action_cable/connection/client_socket.rb @@ -50,14 +50,16 @@ module ActionCable @driver.on(:error) { |e| emit_error(e.message) } @stream = ActionCable::Connection::Stream.new(@stream_event_loop, self) + end + + def start_driver + return if @driver.nil? || @driver_started + @stream.hijack_rack_socket if callback = @env['async.callback'] callback.call([101, {}, @stream]) end - end - def start_driver - return if @driver.nil? || @driver_started @driver_started = true @driver.start end @@ -132,11 +134,8 @@ module ActionCable @ready_state = CLOSING @close_params = [reason, code] - if @stream - @stream.shutdown - else - finalize_close - end + @stream.shutdown if @stream + finalize_close end def finalize_close diff --git a/actioncable/lib/action_cable/connection/identification.rb b/actioncable/lib/action_cable/connection/identification.rb index 885ff3f102..4a54044aff 100644 --- a/actioncable/lib/action_cable/connection/identification.rb +++ b/actioncable/lib/action_cable/connection/identification.rb @@ -12,7 +12,7 @@ module ActionCable class_methods do # Mark a key as being a connection identifier index that can then be used to find the specific connection again later. - # Common identifiers are current_user and current_account, but could be anything really. + # Common identifiers are current_user and current_account, but could be anything, really. # # Note that anything marked as an identifier will automatically create a delegate by the same name on any # channel instances created off the connection. diff --git a/actioncable/lib/action_cable/connection/message_buffer.rb b/actioncable/lib/action_cable/connection/message_buffer.rb index 2f65a1e84a..19f2e6e918 100644 --- a/actioncable/lib/action_cable/connection/message_buffer.rb +++ b/actioncable/lib/action_cable/connection/message_buffer.rb @@ -1,8 +1,7 @@ module ActionCable module Connection - # Allows us to buffer messages received from the WebSocket before the Connection has been fully initialized and is ready to receive them. - # Entirely internal operation and should not be used directly by the user. - class MessageBuffer + # Allows us to buffer messages received from the WebSocket before the Connection has been fully initialized, and is ready to receive them. + class MessageBuffer # :nodoc: def initialize(connection) @connection = connection @buffered_messages = [] diff --git a/actioncable/lib/action_cable/connection/stream.rb b/actioncable/lib/action_cable/connection/stream.rb index ace250cd16..2d97b28c09 100644 --- a/actioncable/lib/action_cable/connection/stream.rb +++ b/actioncable/lib/action_cable/connection/stream.rb @@ -4,15 +4,13 @@ module ActionCable # This class is heavily based on faye-websocket-ruby # # Copyright (c) 2010-2015 James Coglan - class Stream + class Stream # :nodoc: def initialize(event_loop, socket) @event_loop = event_loop @socket_object = socket @stream_send = socket.env['stream.send'] @rack_hijack_io = nil - - hijack_rack_socket end def each(&callback) @@ -39,16 +37,16 @@ module ActionCable @socket_object.parse(data) end - private - def hijack_rack_socket - return unless @socket_object.env['rack.hijack'] + def hijack_rack_socket + return unless @socket_object.env['rack.hijack'] - @socket_object.env['rack.hijack'].call - @rack_hijack_io = @socket_object.env['rack.hijack_io'] + @socket_object.env['rack.hijack'].call + @rack_hijack_io = @socket_object.env['rack.hijack_io'] - @event_loop.attach(@rack_hijack_io, self) - end + @event_loop.attach(@rack_hijack_io, self) + end + private def clean_rack_hijack return unless @rack_hijack_io @event_loop.detach(@rack_hijack_io, self) diff --git a/actioncable/lib/action_cable/connection/subscriptions.rb b/actioncable/lib/action_cable/connection/subscriptions.rb index d7f95e6a62..3742f248d1 100644 --- a/actioncable/lib/action_cable/connection/subscriptions.rb +++ b/actioncable/lib/action_cable/connection/subscriptions.rb @@ -3,8 +3,8 @@ require 'active_support/core_ext/hash/indifferent_access' module ActionCable module Connection # Collection class for all the channel subscriptions established on a given connection. Responsible for routing incoming commands that arrive on - # the connection to the proper channel. Should not be used directly by the user. - class Subscriptions + # the connection to the proper channel. + class Subscriptions # :nodoc: def initialize(connection) @connection = connection @subscriptions = {} @@ -54,7 +54,7 @@ module ActionCable end def unsubscribe_from_all - subscriptions.each { |id, channel| channel.unsubscribe_from_channel } + subscriptions.each { |id, channel| remove_subscription(channel) } end protected diff --git a/actioncable/lib/action_cable/engine.rb b/actioncable/lib/action_cable/engine.rb index f5e233e091..c90aadaf2c 100644 --- a/actioncable/lib/action_cable/engine.rb +++ b/actioncable/lib/action_cable/engine.rb @@ -6,7 +6,7 @@ require "active_support/core_ext/hash/indifferent_access" module ActionCable class Railtie < Rails::Engine # :nodoc: config.action_cable = ActiveSupport::OrderedOptions.new - config.action_cable.url = '/cable' + config.action_cable.mount_path = '/cable' config.eager_load_namespaces << ActionCable @@ -31,8 +31,50 @@ module ActionCable self.cable = Rails.application.config_for(config_path).with_indifferent_access end + if 'ApplicationCable::Connection'.safe_constantize + self.connection_class = ApplicationCable::Connection + end + + self.channel_paths = Rails.application.paths['app/channels'].existent + options.each { |k,v| send("#{k}=", v) } end end + + initializer "action_cable.routes" do + config.after_initialize do |app| + config = app.config + unless config.action_cable.mount_path.nil? + app.routes.prepend do + mount ActionCable.server => config.action_cable.mount_path, internal: true + end + end + end + end + + initializer "action_cable.set_work_hooks" do |app| + ActiveSupport.on_load(:action_cable) do + ActionCable::Server::Worker.set_callback :work, :around, prepend: true do |_, inner| + app.executor.wrap do + # If we took a while to get the lock, we may have been halted + # in the meantime. As we haven't started doing any real work + # yet, we should pretend that we never made it off the queue. + unless stopping? + inner.call + end + end + end + + wrap = lambda do |_, inner| + app.executor.wrap(&inner) + end + ActionCable::Channel::Base.set_callback :subscribe, :around, prepend: true, &wrap + ActionCable::Channel::Base.set_callback :unsubscribe, :around, prepend: true, &wrap + + app.reloader.before_class_unload do + ActionCable.server.restart + end + end + end end end diff --git a/actioncable/lib/action_cable/gem_version.rb b/actioncable/lib/action_cable/gem_version.rb index a71603e61a..67adeefaff 100644 --- a/actioncable/lib/action_cable/gem_version.rb +++ b/actioncable/lib/action_cable/gem_version.rb @@ -8,7 +8,7 @@ module ActionCable MAJOR = 5 MINOR = 0 TINY = 0 - PRE = "beta2" + PRE = "beta3" STRING = [MAJOR, MINOR, TINY, PRE].compact.join(".") end diff --git a/actioncable/lib/action_cable/helpers/action_cable_helper.rb b/actioncable/lib/action_cable/helpers/action_cable_helper.rb index b82751468a..2081a37db6 100644 --- a/actioncable/lib/action_cable/helpers/action_cable_helper.rb +++ b/actioncable/lib/action_cable/helpers/action_cable_helper.rb @@ -1,28 +1,39 @@ module ActionCable module Helpers module ActionCableHelper - # Returns an "action-cable-url" meta tag with the value of the url specified in your - # configuration. Ensure this is above your javascript tag: + # Returns an "action-cable-url" meta tag with the value of the URL specified in your + # configuration. Ensure this is above your JavaScript tag: # # <head> # <%= action_cable_meta_tag %> # <%= javascript_include_tag 'application', 'data-turbolinks-track' => true %> # </head> # - # This is then used by ActionCable to determine the url of your websocket server. + # This is then used by Action Cable to determine the URL of your WebSocket server. # Your CoffeeScript can then connect to the server without needing to specify the - # url directly: + # URL directly: # # #= require cable # @App = {} # App.cable = Cable.createConsumer() # - # Make sure to specify the correct server location in each of your environments - # config file: + # Make sure to specify the correct server location in each of your environment + # config files: + # + # config.action_cable.mount_path = "/cable123" + # <%= action_cable_meta_tag %> would render: + # => <meta name="action-cable-url" content="/cable123" /> + # + # config.action_cable.url = "ws://actioncable.com" + # <%= action_cable_meta_tag %> would render: + # => <meta name="action-cable-url" content="ws://actioncable.com" /> # - # config.action_cable.url = "ws://example.com:28080" def action_cable_meta_tag - tag "meta", name: "action-cable-url", content: Rails.application.config.action_cable.url + tag "meta", name: "action-cable-url", content: ( + ActionCable.server.config.url || + ActionCable.server.config.mount_path || + raise("No Action Cable URL configured -- please configure this at config.action_cable.url") + ) end end end diff --git a/actioncable/lib/action_cable/remote_connections.rb b/actioncable/lib/action_cable/remote_connections.rb index 7ec121308a..aeef8abc72 100644 --- a/actioncable/lib/action_cable/remote_connections.rb +++ b/actioncable/lib/action_cable/remote_connections.rb @@ -13,8 +13,8 @@ module ActionCable # ActionCable.server.remote_connections.where(current_user: User.find(1)).disconnect # # This will disconnect all the connections established for - # <tt>User.find(1)</tt> across all servers running on all machines, because - # it uses the internal channel that all these servers are subscribed to. + # <tt>User.find(1)</tt>, across all servers running on all machines, because + # it uses the internal channel that all of these servers are subscribed to. class RemoteConnections attr_reader :server diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb index fe48c112df..d9a2653cc2 100644 --- a/actioncable/lib/action_cable/server/base.rb +++ b/actioncable/lib/action_cable/server/base.rb @@ -2,10 +2,10 @@ require 'thread' module ActionCable module Server - # A singleton ActionCable::Server instance is available via ActionCable.server. It's used by the rack process that starts the cable server, but - # also by the user to reach the RemoteConnections instead for finding and disconnecting connections across all servers. + # A singleton ActionCable::Server instance is available via ActionCable.server. It's used by the Rack process that starts the Action Cable server, but + # is also used by the user to reach the RemoteConnections object, which is used for finding and disconnecting connections across all servers. # - # Also, this is the server instance used for broadcasting. See Broadcasting for details. + # Also, this is the server instance used for broadcasting. See Broadcasting for more information. class Base include ActionCable::Server::Broadcasting include ActionCable::Server::Connections @@ -19,11 +19,10 @@ module ActionCable def initialize @mutex = Mutex.new - @remote_connections = @stream_event_loop = @worker_pool = @channel_classes = @pubsub = nil end - # Called by rack to setup the server. + # Called by Rack to setup the server. def call(env) setup_heartbeat_timer config.connection_class.new(self, env).process @@ -34,6 +33,16 @@ module ActionCable remote_connections.where(identifiers).disconnect end + def restart + connections.each(&:close) + + @mutex.synchronize do + worker_pool.halt if @worker_pool + + @worker_pool = nil + end + end + # Gateway to RemoteConnections. See that class for details. def remote_connections @remote_connections || @mutex.synchronize { @remote_connections ||= RemoteConnections.new(self) } @@ -48,7 +57,7 @@ module ActionCable @worker_pool || @mutex.synchronize { @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size) } end - # Requires and returns a hash of all the channel class constants keyed by name. + # Requires and returns a hash of all of the channel class constants, which are keyed by name. def channel_classes @channel_classes || @mutex.synchronize do @channel_classes ||= begin @@ -63,7 +72,7 @@ module ActionCable @pubsub || @mutex.synchronize { @pubsub ||= config.pubsub_adapter.new(self) } end - # All the identifiers applied to the connection class associated with this server. + # All of the identifiers applied to the connection class associated with this server. def connection_identifiers config.connection_class.identifiers end diff --git a/actioncable/lib/action_cable/server/broadcasting.rb b/actioncable/lib/action_cable/server/broadcasting.rb index 7e8aef45f4..98025f27f2 100644 --- a/actioncable/lib/action_cable/server/broadcasting.rb +++ b/actioncable/lib/action_cable/server/broadcasting.rb @@ -1,6 +1,6 @@ module ActionCable module Server - # Broadcasting is how other parts of your application can send messages to the channel subscribers. As explained in Channel, most of the time, these + # Broadcasting is how other parts of your application can send messages to a channel's subscribers. As explained in Channel, most of the time, these # broadcastings are streamed directly to the clients subscribed to the named broadcasting. Let's explain with a full-stack example: # # class WebNotificationsChannel < ApplicationCable::Channel @@ -9,24 +9,24 @@ module ActionCable # end # end # - # # Somewhere in your app this is called, perhaps from a NewCommentJob + # # Somewhere in your app this is called, perhaps from a NewCommentJob: # ActionCable.server.broadcast \ # "web_notifications_1", { title: "New things!", body: "All that's fit for print" } # - # # Client-side CoffeeScript, which assumes you've already requested the right to send web notifications + # # Client-side CoffeeScript, which assumes you've already requested the right to send web notifications: # App.cable.subscriptions.create "WebNotificationsChannel", # received: (data) -> # new Notification data['title'], body: data['body'] module Broadcasting - # Broadcast a hash directly to a named <tt>broadcasting</tt>. It'll automatically be JSON encoded. + # Broadcast a hash directly to a named <tt>broadcasting</tt>. This will later be JSON encoded. def broadcast(broadcasting, message) broadcaster_for(broadcasting).broadcast(message) end - # Returns a broadcaster for a named <tt>broadcasting</tt> that can be reused. Useful when you have a object that + # Returns a broadcaster for a named <tt>broadcasting</tt> that can be reused. Useful when you have an object that # may need multiple spots to transmit to a specific broadcasting over and over. def broadcaster_for(broadcasting) - Broadcaster.new(self, broadcasting) + Broadcaster.new(self, String(broadcasting)) end private diff --git a/actioncable/lib/action_cable/server/configuration.rb b/actioncable/lib/action_cable/server/configuration.rb index 9a248933c4..9a7301287c 100644 --- a/actioncable/lib/action_cable/server/configuration.rb +++ b/actioncable/lib/action_cable/server/configuration.rb @@ -1,31 +1,24 @@ module ActionCable module Server - # An instance of this configuration object is available via ActionCable.server.config, which allows you to tweak the configuration points + # An instance of this configuration object is available via ActionCable.server.config, which allows you to tweak Action Cable configuration # in a Rails config initializer. class Configuration attr_accessor :logger, :log_tags attr_accessor :connection_class, :worker_pool_size - attr_accessor :channel_load_paths attr_accessor :disable_request_forgery_protection, :allowed_request_origins - attr_accessor :cable, :url + attr_accessor :cable, :url, :mount_path + + attr_accessor :channel_paths # :nodoc: def initialize @log_tags = [] - @connection_class = ApplicationCable::Connection - @worker_pool_size = 100 - - @channel_load_paths = [Rails.root.join('app/channels')] + @connection_class = ActionCable::Connection::Base + @worker_pool_size = 100 @disable_request_forgery_protection = false end - def channel_paths - @channel_paths ||= channel_load_paths.flat_map do |path| - Dir["#{path}/**/*_channel.rb"] - end - end - def channel_class_names @channel_class_names ||= channel_paths.collect do |channel_path| Pathname.new(channel_path).basename.to_s.split('.').first.camelize diff --git a/actioncable/lib/action_cable/server/connections.rb b/actioncable/lib/action_cable/server/connections.rb index 8671dd5ebd..4dc8934b25 100644 --- a/actioncable/lib/action_cable/server/connections.rb +++ b/actioncable/lib/action_cable/server/connections.rb @@ -1,9 +1,8 @@ module ActionCable module Server - # Collection class for all the connections that's been established on this specific server. Remember, usually you'll run many cable servers, so - # you can't use this collection as an full list of all the connections established against your application. Use RemoteConnections for that. - # As such, this is primarily for internal use. - module Connections + # Collection class for all the connections that have been established on this specific server. Remember, usually you'll run many Action Cable servers, so + # you can't use this collection as a full list of all of the connections established against your application. Instead, use RemoteConnections for that. + module Connections # :nodoc: BEAT_INTERVAL = 3 def connections @@ -19,7 +18,7 @@ module ActionCable end # WebSocket connection implementations differ on when they'll mark a connection as stale. We basically never want a connection to go stale, as you - # then can't rely on being able to receive and send to it. So there's a 3 second heartbeat running on all connections. If the beat fails, we automatically + # then can't rely on being able to communicate with the connection. To solve this, a 3 second heartbeat runs on all connections. If the beat fails, we automatically # disconnect. def setup_heartbeat_timer @heartbeat_timer ||= Concurrent::TimerTask.new(execution_interval: BEAT_INTERVAL) do diff --git a/actioncable/lib/action_cable/server/worker.rb b/actioncable/lib/action_cable/server/worker.rb index 3b6c6d44a1..49cbaec0c0 100644 --- a/actioncable/lib/action_cable/server/worker.rb +++ b/actioncable/lib/action_cable/server/worker.rb @@ -4,8 +4,8 @@ require 'concurrent' module ActionCable module Server - # Worker used by Server.send_async to do connection work in threads. Only for internal use. - class Worker + # Worker used by Server.send_async to do connection work in threads. + class Worker # :nodoc: include ActiveSupport::Callbacks thread_mattr_accessor :connection @@ -20,6 +20,26 @@ module ActionCable ) end + # Stop processing work: any work that has not already started + # running will be discarded from the queue + def halt + @pool.kill + end + + def stopping? + @pool.shuttingdown? + end + + def work(connection) + self.connection = connection + + run_callbacks :work do + yield + end + ensure + self.connection = nil + end + def async_invoke(receiver, method, *args) @pool.post do invoke(receiver, method, *args) @@ -27,19 +47,15 @@ module ActionCable end def invoke(receiver, method, *args) - begin - self.connection = receiver - - run_callbacks :work do + work(receiver) do + begin receiver.send method, *args - end - rescue Exception => e - logger.error "There was an exception - #{e.class}(#{e.message})" - logger.error e.backtrace.join("\n") + rescue Exception => e + logger.error "There was an exception - #{e.class}(#{e.message})" + logger.error e.backtrace.join("\n") - receiver.handle_exception if receiver.respond_to?(:handle_exception) - ensure - self.connection = nil + receiver.handle_exception if receiver.respond_to?(:handle_exception) + end end end @@ -50,14 +66,8 @@ module ActionCable end def run_periodic_timer(channel, callback) - begin - self.connection = channel.connection - - run_callbacks :work do - callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback) - end - ensure - self.connection = nil + work(channel.connection) do + callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback) end end diff --git a/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb b/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb index ecece4e270..c1e4aa8103 100644 --- a/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb +++ b/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb @@ -1,7 +1,6 @@ module ActionCable module Server class Worker - # Clear active connections between units of work so the long-running channel or connection processes do not hoard connections. module ActiveRecordConnectionManagement extend ActiveSupport::Concern @@ -13,10 +12,8 @@ module ActionCable def with_database_connections connection.logger.tag(ActiveRecord::Base.logger) { yield } - ensure - ActiveRecord::Base.clear_active_connections! end end end end -end
\ No newline at end of file +end diff --git a/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb index d697548cbd..e6c862959b 100644 --- a/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb @@ -13,6 +13,14 @@ module ActionCable class EventedRedis < Base # :nodoc: @@mutex = Mutex.new + # Overwrite this factory method for EventMachine Redis connections if you want to use a different Redis connection library than EM::Hiredis. + # This is needed, for example, when using Makara proxies for distributed Redis. + cattr_accessor(:em_redis_connector) { ->(config) { EM::Hiredis.connect(config[:url]) } } + + # Overwrite this factory method for Redis connections if you want to use a different Redis connection library than Redis. + # This is needed, for example, when using Makara proxies for distributed Redis. + cattr_accessor(:redis_connector) { ->(config) { ::Redis.new(url: config[:url]) } } + def initialize(*) super @redis_connection_for_broadcasts = @redis_connection_for_subscriptions = nil @@ -41,7 +49,7 @@ module ActionCable def redis_connection_for_subscriptions ensure_reactor_running @redis_connection_for_subscriptions || @server.mutex.synchronize do - @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis| + @redis_connection_for_subscriptions ||= self.class.em_redis_connector.call(@server.config.cable).tap do |redis| redis.on(:reconnect_failed) do @logger.info "[ActionCable] Redis reconnect failed." end @@ -51,7 +59,7 @@ module ActionCable def redis_connection_for_broadcasts @redis_connection_for_broadcasts || @server.mutex.synchronize do - @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable) + @redis_connection_for_broadcasts ||= self.class.redis_connector.call(@server.config.cable) end end diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb index 7076383efe..6b4236e7d3 100644 --- a/actioncable/lib/action_cable/subscription_adapter/redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb @@ -6,6 +6,10 @@ require 'redis' module ActionCable module SubscriptionAdapter class Redis < Base # :nodoc: + # Overwrite this factory method for redis connections if you want to use a different Redis library than Redis. + # This is needed, for example, when using Makara proxies for distributed Redis. + cattr_accessor(:redis_connector) { ->(config) { ::Redis.new(url: config[:url]) } } + def initialize(*) super @listener = nil @@ -29,7 +33,7 @@ module ActionCable end def redis_connection_for_subscriptions - ::Redis.new(@server.config.cable) + redis_connection end private @@ -39,10 +43,14 @@ module ActionCable def redis_connection_for_broadcasts @redis_connection_for_broadcasts || @server.mutex.synchronize do - @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable) + @redis_connection_for_broadcasts ||= redis_connection end end + def redis_connection + self.class.redis_connector.call(@server.config.cable) + end + class Listener < SubscriberMap def initialize(adapter) super() diff --git a/actioncable/lib/rails/generators/channel/channel_generator.rb b/actioncable/lib/rails/generators/channel/channel_generator.rb index c5d398810a..6debe40c91 100644 --- a/actioncable/lib/rails/generators/channel/channel_generator.rb +++ b/actioncable/lib/rails/generators/channel/channel_generator.rb @@ -15,12 +15,29 @@ module Rails if options[:assets] template "assets/channel.coffee", File.join('app/assets/javascripts/channels', class_path, "#{file_name}.coffee") end + + generate_application_cable_files end protected def file_name @_file_name ||= super.gsub(/\_channel/i, '') end + + # FIXME: Change these files to symlinks once RubyGems 2.5.0 is required. + def generate_application_cable_files + return if self.behavior != :invoke + + files = [ + 'application_cable/channel.rb', + 'application_cable/connection.rb' + ] + + files.each do |name| + path = File.join('app/channels/', name) + template(name, path) if !File.exist?(path) + end + end end end end diff --git a/actioncable/lib/rails/generators/channel/templates/application_cable/channel.rb b/actioncable/lib/rails/generators/channel/templates/application_cable/channel.rb new file mode 100644 index 0000000000..d56fa30f4d --- /dev/null +++ b/actioncable/lib/rails/generators/channel/templates/application_cable/channel.rb @@ -0,0 +1,5 @@ +# Be sure to restart your server when you modify this file. Action Cable runs in a loop that does not support auto reloading. +module ApplicationCable + class Channel < ActionCable::Channel::Base + end +end diff --git a/actioncable/lib/rails/generators/channel/templates/application_cable/connection.rb b/actioncable/lib/rails/generators/channel/templates/application_cable/connection.rb new file mode 100644 index 0000000000..b4f41389ad --- /dev/null +++ b/actioncable/lib/rails/generators/channel/templates/application_cable/connection.rb @@ -0,0 +1,5 @@ +# Be sure to restart your server when you modify this file. Action Cable runs in a loop that does not support auto reloading. +module ApplicationCable + class Connection < ActionCable::Connection::Base + end +end diff --git a/actioncable/lib/rails/generators/channel/templates/channel.rb b/actioncable/lib/rails/generators/channel/templates/channel.rb index 6cf04ee61f..7bff3341c1 100644 --- a/actioncable/lib/rails/generators/channel/templates/channel.rb +++ b/actioncable/lib/rails/generators/channel/templates/channel.rb @@ -1,4 +1,4 @@ -# Be sure to restart your server when you modify this file. Action Cable runs in an EventMachine loop that does not support auto reloading. +# Be sure to restart your server when you modify this file. Action Cable runs in a loop that does not support auto reloading. <% module_namespacing do -%> class <%= class_name %>Channel < ApplicationCable::Channel def subscribed |