diff options
Diffstat (limited to 'actioncable/lib')
41 files changed, 728 insertions, 358 deletions
diff --git a/actioncable/lib/action_cable.rb b/actioncable/lib/action_cable.rb index 1dc66ef3ad..d353716636 100644 --- a/actioncable/lib/action_cable.rb +++ b/actioncable/lib/action_cable.rb @@ -21,21 +21,22 @@ # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #++ -require 'active_support' -require 'active_support/rails' -require 'action_cable/version' +require "active_support" +require "active_support/rails" +require "action_cable/version" module ActionCable extend ActiveSupport::Autoload INTERNAL = { - identifiers: { - ping: '_ping'.freeze - }, message_types: { - confirmation: 'confirm_subscription'.freeze, - rejection: 'reject_subscription'.freeze - } + welcome: "welcome".freeze, + ping: "ping".freeze, + confirmation: "confirm_subscription".freeze, + rejection: "reject_subscription".freeze + }, + default_mount_path: "/cable".freeze, + protocols: ["actioncable-v1-json".freeze, "actioncable-unsupported".freeze].freeze } # Singleton instance of the server diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb index 874ebe2e71..a866044f95 100644 --- a/actioncable/lib/action_cable/channel/base.rb +++ b/actioncable/lib/action_cable/channel/base.rb @@ -1,4 +1,4 @@ -require 'set' +require "set" module ActionCable module Channel @@ -32,8 +32,8 @@ module ActionCable # # == Action processing # - # Unlike subclasses of ActionController::Base, channels do not follow a REST - # constraint form for their actions. Instead, ActionCable operates through a + # Unlike subclasses of ActionController::Base, channels do not follow a RESTful + # constraint form for their actions. Instead, Action Cable operates through a # remote-procedure call model. You can declare any public method on the # channel (optionally taking a <tt>data</tt> argument), and this method is # automatically exposed as callable to the client. @@ -63,10 +63,10 @@ module ActionCable # end # end # - # In this example, subscribed/unsubscribed are not callable methods, as they + # In this example, the subscribed and unsubscribed methods are not callable methods, as they # were already declared in ActionCable::Channel::Base, but <tt>#appear</tt> # and <tt>#away</tt> are. <tt>#generate_connection_token</tt> is also not - # callable as it's a private method. You'll see that appear accepts a data + # callable, since it's a private method. You'll see that appear accepts a data # parameter, which it then uses as part of its model call. <tt>#away</tt> # does not, since it's simply a trigger action. # @@ -125,7 +125,7 @@ module ActionCable protected # action_methods are cached and there is sometimes need to refresh # them. ::clear_action_methods! allows you to do that, so next time - # you run action_methods, they will be recalculated + # you run action_methods, they will be recalculated. def clear_action_methods! @action_methods = nil end @@ -144,13 +144,14 @@ module ActionCable # When a channel is streaming via pubsub, we want to delay the confirmation # transmission until pubsub subscription is confirmed. - @defer_subscription_confirmation = false + # + # The counter starts at 1 because it's awaiting a call to #subscribe_to_channel + @defer_subscription_confirmation_counter = Concurrent::AtomicFixnum.new(1) @reject_subscription = nil @subscription_confirmation_sent = nil delegate_connection_identifiers - subscribe_to_channel end # Extract the action name from the passed data and process it via the channel. The process will ensure @@ -160,21 +161,34 @@ module ActionCable action = extract_action(data) if processable_action?(action) - dispatch_action(action, data) + payload = { channel_class: self.class.name, action: action, data: data } + ActiveSupport::Notifications.instrument("perform_action.action_cable", payload) do + dispatch_action(action, data) + end else logger.error "Unable to process #{action_signature(action, data)}" end end - # Called by the cable connection when its cut so the channel has a chance to cleanup with callbacks. + # This method is called after subscription has been added to the connection + # and confirms or rejects the subscription. + def subscribe_to_channel + run_callbacks :subscribe do + subscribed + end + + reject_subscription if subscription_rejected? + ensure_confirmation_sent + end + + # Called by the cable connection when it's cut, so the channel has a chance to cleanup with callbacks. # This method is not intended to be called directly by the user. Instead, overwrite the #unsubscribed callback. - def unsubscribe_from_channel + def unsubscribe_from_channel # :nodoc: run_callbacks :unsubscribe do unsubscribed end end - protected # Called once a consumer has become a subscriber of the channel. Usually the place to setup any streams # you want this channel to be sending to the subscriber. @@ -183,7 +197,7 @@ module ActionCable end # Called once a consumer has cut its cable connection. Can be used for cleaning up connections or marking - # people as offline or the like. + # users as offline or the like. def unsubscribed # Override in subclasses end @@ -191,16 +205,26 @@ module ActionCable # Transmit a hash of data to the subscriber. The hash will automatically be wrapped in a JSON envelope with # the proper channel identifier marked as the recipient. def transmit(data, via: nil) - logger.info "#{self.class.name} transmitting #{data.inspect}".tap { |m| m << " (via #{via})" if via } - connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, message: data) + logger.info "#{self.class.name} transmitting #{data.inspect.truncate(300)}".tap { |m| m << " (via #{via})" if via } + + payload = { channel_class: self.class.name, data: data, via: via } + ActiveSupport::Notifications.instrument("transmit.action_cable", payload) do + connection.transmit identifier: @identifier, message: data + end + end + + def ensure_confirmation_sent + return if subscription_rejected? + @defer_subscription_confirmation_counter.decrement + transmit_subscription_confirmation unless defer_subscription_confirmation? end def defer_subscription_confirmation! - @defer_subscription_confirmation = true + @defer_subscription_confirmation_counter.increment end def defer_subscription_confirmation? - @defer_subscription_confirmation + @defer_subscription_confirmation_counter.value > 0 end def subscription_confirmation_sent? @@ -224,26 +248,12 @@ module ActionCable end end - - def subscribe_to_channel - run_callbacks :subscribe do - subscribed - end - - if subscription_rejected? - reject_subscription - else - transmit_subscription_confirmation unless defer_subscription_confirmation? - end - end - - def extract_action(data) - (data['action'].presence || :receive).to_sym + (data["action"].presence || :receive).to_sym end def processable_action?(action) - self.class.action_methods.include?(action.to_s) + self.class.action_methods.include?(action.to_s) unless subscription_rejected? end def dispatch_action(action, data) @@ -258,7 +268,7 @@ module ActionCable def action_signature(action, data) "#{self.class.name}##{action}".tap do |signature| - if (arguments = data.except('action')).any? + if (arguments = data.except("action")).any? signature << "(#{arguments.inspect})" end end @@ -267,8 +277,11 @@ module ActionCable def transmit_subscription_confirmation unless subscription_confirmation_sent? logger.info "#{self.class.name} is transmitting the subscription confirmation" - connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:confirmation]) - @subscription_confirmation_sent = true + + ActiveSupport::Notifications.instrument("transmit_subscription_confirmation.action_cable", channel_class: self.class.name) do + connection.transmit identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:confirmation] + @subscription_confirmation_sent = true + end end end @@ -279,7 +292,10 @@ module ActionCable def transmit_subscription_rejection logger.info "#{self.class.name} is transmitting the subscription rejection" - connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:rejection]) + + ActiveSupport::Notifications.instrument("transmit_subscription_rejection.action_cable", channel_class: self.class.name) do + connection.transmit identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:rejection] + end end end end diff --git a/actioncable/lib/action_cable/channel/broadcasting.rb b/actioncable/lib/action_cable/channel/broadcasting.rb index afc23d7d1a..23ed4ec943 100644 --- a/actioncable/lib/action_cable/channel/broadcasting.rb +++ b/actioncable/lib/action_cable/channel/broadcasting.rb @@ -1,4 +1,4 @@ -require 'active_support/core_ext/object/to_param' +require "active_support/core_ext/object/to_param" module ActionCable module Channel @@ -16,7 +16,7 @@ module ActionCable def broadcasting_for(model) #:nodoc: case when model.is_a?(Array) - model.map { |m| broadcasting_for(m) }.join(':') + model.map { |m| broadcasting_for(m) }.join(":") when model.respond_to?(:to_gid_param) model.to_gid_param else diff --git a/actioncable/lib/action_cable/channel/callbacks.rb b/actioncable/lib/action_cable/channel/callbacks.rb index 295d750e86..c740132c94 100644 --- a/actioncable/lib/action_cable/channel/callbacks.rb +++ b/actioncable/lib/action_cable/channel/callbacks.rb @@ -1,4 +1,4 @@ -require 'active_support/callbacks' +require "active_support/callbacks" module ActionCable module Channel diff --git a/actioncable/lib/action_cable/channel/naming.rb b/actioncable/lib/action_cable/channel/naming.rb index 4c9d53b15a..b7e88bf73d 100644 --- a/actioncable/lib/action_cable/channel/naming.rb +++ b/actioncable/lib/action_cable/channel/naming.rb @@ -10,8 +10,9 @@ module ActionCable # # ChatChannel.channel_name # => 'chat' # Chats::AppearancesChannel.channel_name # => 'chats:appearances' + # FooChats::BarAppearancesChannel.channel_name # => 'foo_chats:bar_appearances' def channel_name - @channel_name ||= name.sub(/Channel$/, '').gsub('::',':').underscore + @channel_name ||= name.sub(/Channel$/, "").gsub("::",":").underscore end end diff --git a/actioncable/lib/action_cable/channel/periodic_timers.rb b/actioncable/lib/action_cable/channel/periodic_timers.rb index 56597d02d7..c9daa0bcd3 100644 --- a/actioncable/lib/action_cable/channel/periodic_timers.rb +++ b/actioncable/lib/action_cable/channel/periodic_timers.rb @@ -12,11 +12,42 @@ module ActionCable end module ClassMethods - # Allow you to call a private method <tt>every</tt> so often seconds. This periodic timer can be useful - # for sending a steady flow of updates to a client based off an object that was configured on subscription. - # It's an alternative to using streams if the channel is able to do the work internally. - def periodically(callback, every:) - self.periodic_timers += [ [ callback, every: every ] ] + # Periodically performs a task on the channel, like updating an online + # user counter, polling a backend for new status messages, sending + # regular "heartbeat" messages, or doing some internal work and giving + # progress updates. + # + # Pass a method name or lambda argument or provide a block to call. + # Specify the calling period in seconds using the <tt>every:</tt> + # keyword argument. + # + # periodically :transmit_progress, every: 5.seconds + # + # periodically every: 3.minutes do + # transmit action: :update_count, count: current_count + # end + # + def periodically(callback_or_method_name = nil, every:, &block) + callback = + if block_given? + raise ArgumentError, "Pass a block or provide a callback arg, not both" if callback_or_method_name + block + else + case callback_or_method_name + when Proc + callback_or_method_name + when Symbol + -> { __send__ callback_or_method_name } + else + raise ArgumentError, "Expected a Symbol method name or a Proc, got #{callback_or_method_name.inspect}" + end + end + + unless every.kind_of?(Numeric) && every > 0 + raise ArgumentError, "Expected every: to be a positive number of seconds, got #{every.inspect}" + end + + self.periodic_timers += [[ callback, every: every ]] end end @@ -27,14 +58,19 @@ module ActionCable def start_periodic_timers self.class.periodic_timers.each do |callback, options| - active_periodic_timers << Concurrent::TimerTask.new(execution_interval: options[:every]) do - connection.worker_pool.async_run_periodic_timer(self, callback) - end + active_periodic_timers << start_periodic_timer(callback, every: options.fetch(:every)) + end + end + + def start_periodic_timer(callback, every:) + connection.server.event_loop.timer every do + connection.worker_pool.async_exec self, connection: connection, &callback end end def stop_periodic_timers active_periodic_timers.each { |timer| timer.shutdown } + active_periodic_timers.clear end end end diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index 3158f30814..dbba333353 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -1,8 +1,8 @@ module ActionCable module Channel - # Streams allow channels to route broadcastings to the subscriber. A broadcasting is, as discussed elsewhere, a pub/sub queue where any data - # put into it is automatically sent to the clients that are connected at that time. It's purely an online queue, though. If you're not - # streaming a broadcasting at the very moment it sends out an update, you'll not get that update when connecting later. + # Streams allow channels to route broadcastings to the subscriber. A broadcasting is, as discussed elsewhere, a pubsub queue where any data + # placed into it is automatically sent to the clients that are connected at that time. It's purely an online queue, though. If you're not + # streaming a broadcasting at the very moment it sends out an update, you will not get that update, even if you connect after it has been sent. # # Most commonly, the streamed broadcast is sent straight to the subscriber on the client-side. The channel just acts as a connector between # the two parties (the broadcaster and the channel subscriber). Here's an example of a channel that allows subscribers to get all new @@ -18,13 +18,15 @@ module ActionCable # end # end # - # So the subscribers of this channel will get whatever data is put into the, let's say, `comments_for_45` broadcasting as soon as it's put there. - # That looks like so from that side of things: + # Based on the above example, the subscribers of this channel will get whatever data is put into the, + # let's say, <tt>comments_for_45</tt> broadcasting as soon as it's put there. + # + # An example broadcasting for this channel looks like so: # # ActionCable.server.broadcast "comments_for_45", author: 'DHH', content: 'Rails is just swell' # # If you have a stream that is related to a model, then the broadcasting used can be generated from the model and channel. - # The following example would subscribe to a broadcasting like `comments:Z2lkOi8vVGVzdEFwcC9Qb3N0LzE` + # The following example would subscribe to a broadcasting like <tt>comments:Z2lkOi8vVGVzdEFwcC9Qb3N0LzE</tt>. # # class CommentsChannel < ApplicationCable::Channel # def subscribed @@ -37,16 +39,14 @@ module ActionCable # # CommentsChannel.broadcast_to(@post, @comment) # - # If you don't just want to parlay the broadcast unfiltered to the subscriber, you can supply a callback that lets you alter what goes out. - # Example below shows how you can use this to provide performance introspection in the process: + # If you don't just want to parlay the broadcast unfiltered to the subscriber, you can also supply a callback that lets you alter what is sent out. + # The below example shows how you can use this to provide performance introspection in the process: # # class ChatChannel < ApplicationCable::Channel # def subscribed # @room = Chat::Room[params[:room_number]] # - # stream_for @room, -> (encoded_message) do - # message = ActiveSupport::JSON.decode(encoded_message) - # + # stream_for @room, coder: ActiveSupport::JSON do |message| # if message['originated_at'].present? # elapsed_time = (Time.now.to_f - message['originated_at']).round(2) # @@ -69,16 +69,22 @@ module ActionCable # Start streaming from the named <tt>broadcasting</tt> pubsub queue. Optionally, you can pass a <tt>callback</tt> that'll be used # instead of the default of just transmitting the updates straight to the subscriber. - def stream_from(broadcasting, callback = nil) - # Hold off the confirmation until pubsub#subscribe is successful + # Pass <tt>coder: ActiveSupport::JSON</tt> to decode messages as JSON before passing to the callback. + # Defaults to <tt>coder: nil</tt> which does no decoding, passes raw messages. + def stream_from(broadcasting, callback = nil, coder: nil, &block) + broadcasting = String(broadcasting) + + # Don't send the confirmation until pubsub#subscribe is successful defer_subscription_confirmation! - callback ||= default_stream_callback(broadcasting) - streams << [ broadcasting, callback ] + # Build a stream handler by wrapping the user-provided callback with + # a decoder or defaulting to a JSON-decoding retransmitter. + handler = worker_pool_stream_handler(broadcasting, callback || block, coder: coder) + streams << [ broadcasting, handler ] - Concurrent.global_io_executor.post do - pubsub.subscribe(broadcasting, callback, lambda do - transmit_subscription_confirmation + connection.server.event_loop.post do + pubsub.subscribe(broadcasting, handler, lambda do + ensure_confirmation_sent logger.info "#{self.class.name} is streaming from #{broadcasting}" end) end @@ -87,8 +93,11 @@ module ActionCable # Start streaming the pubsub queue for the <tt>model</tt> in this channel. Optionally, you can pass a # <tt>callback</tt> that'll be used instead of the default of just transmitting the updates straight # to the subscriber. - def stream_for(model, callback = nil) - stream_from(broadcasting_for([ channel_name, model ]), callback) + # + # Pass <tt>coder: ActiveSupport::JSON</tt> to decode messages as JSON before passing to the callback. + # Defaults to <tt>coder: nil</tt> which does no decoding, passes raw messages. + def stream_for(model, callback = nil, coder: nil, &block) + stream_from(broadcasting_for([ channel_name, model ]), callback || block, coder: coder) end # Unsubscribes all streams associated with this channel from the pubsub queue. @@ -106,11 +115,60 @@ module ActionCable @_streams ||= [] end - def default_stream_callback(broadcasting) + # Always wrap the outermost handler to invoke the user handler on the + # worker pool rather than blocking the event loop. + def worker_pool_stream_handler(broadcasting, user_handler, coder: nil) + handler = stream_handler(broadcasting, user_handler, coder: coder) + + -> message do + connection.worker_pool.async_invoke handler, :call, message, connection: connection + end + end + + # May be overridden to add instrumentation, logging, specialized error + # handling, or other forms of handler decoration. + # + # TODO: Tests demonstrating this. + def stream_handler(broadcasting, user_handler, coder: nil) + if user_handler + stream_decoder user_handler, coder: coder + else + default_stream_handler broadcasting, coder: coder + end + end + + # May be overridden to change the default stream handling behavior + # which decodes JSON and transmits to the client. + # + # TODO: Tests demonstrating this. + # + # TODO: Room for optimization. Update transmit API to be coder-aware + # so we can no-op when pubsub and connection are both JSON-encoded. + # Then we can skip decode+encode if we're just proxying messages. + def default_stream_handler(broadcasting, coder:) + coder ||= ActiveSupport::JSON + stream_transmitter stream_decoder(coder: coder), broadcasting: broadcasting + end + + def stream_decoder(handler = identity_handler, coder:) + if coder + -> message { handler.(coder.decode(message)) } + else + handler + end + end + + def stream_transmitter(handler = identity_handler, broadcasting:) + via = "streamed from #{broadcasting}" + -> (message) do - transmit ActiveSupport::JSON.decode(message), via: "streamed from #{broadcasting}" + transmit handler.(message), via: via end end + + def identity_handler + -> message { message } + end end end end diff --git a/actioncable/lib/action_cable/connection/authorization.rb b/actioncable/lib/action_cable/connection/authorization.rb index 070a70e4e2..85df206445 100644 --- a/actioncable/lib/action_cable/connection/authorization.rb +++ b/actioncable/lib/action_cable/connection/authorization.rb @@ -10,4 +10,4 @@ module ActionCable end end end -end
\ No newline at end of file +end diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb index 1acef93025..dfee123ea2 100644 --- a/actioncable/lib/action_cable/connection/base.rb +++ b/actioncable/lib/action_cable/connection/base.rb @@ -1,10 +1,10 @@ -require 'action_dispatch' +require "action_dispatch" module ActionCable module Connection - # For every WebSocket the cable server is accepting, a Connection object will be instantiated. This instance becomes the parent - # of all the channel subscriptions that are created from there on. Incoming messages are then routed to these channel subscriptions - # based on an identifier sent by the cable consumer. The Connection itself does not deal with any specific application logic beyond + # For every WebSocket connection the Action Cable server accepts, a Connection object will be instantiated. This instance becomes the parent + # of all of the channel subscriptions that are created from there on. Incoming messages are then routed to these channel subscriptions + # based on an identifier sent by the Action Cable consumer. The Connection itself does not deal with any specific application logic beyond # authentication and authorization. # # Here's a basic example: @@ -33,14 +33,14 @@ module ActionCable # end # end # - # First, we declare that this connection can be identified by its current_user. This allows us later to be able to find all connections - # established for that current_user (and potentially disconnect them if the user was removed from an account). You can declare as many + # First, we declare that this connection can be identified by its current_user. This allows us to later be able to find all connections + # established for that current_user (and potentially disconnect them). You can declare as many # identification indexes as you like. Declaring an identification means that an attr_accessor is automatically set for that key. # # Second, we rely on the fact that the WebSocket connection is established with the cookies from the domain being sent along. This makes # it easy to use signed cookies that were set when logging in via a web interface to authorize the WebSocket connection. # - # Finally, we add a tag to the connection-specific logger with name of the current user to easily distinguish their messages in the log. + # Finally, we add a tag to the connection-specific logger with the name of the current user to easily distinguish their messages in the log. # # Pretty simple, eh? class Base @@ -48,15 +48,16 @@ module ActionCable include InternalChannel include Authorization - attr_reader :server, :env, :subscriptions, :logger - delegate :stream_event_loop, :worker_pool, :pubsub, to: :server + attr_reader :server, :env, :subscriptions, :logger, :worker_pool, :protocol + delegate :event_loop, :pubsub, to: :server - def initialize(server, env) - @server, @env = server, env + def initialize(server, env, coder: ActiveSupport::JSON) + @server, @env, @coder = server, env, coder + @worker_pool = server.worker_pool @logger = new_tagged_logger - @websocket = ActionCable::Connection::WebSocket.new(env, self, stream_event_loop) + @websocket = ActionCable::Connection::WebSocket.new(env, self, event_loop) @subscriptions = ActionCable::Connection::Subscriptions.new(self) @message_buffer = ActionCable::Connection::MessageBuffer.new(self) @@ -65,8 +66,8 @@ module ActionCable end # Called by the server when a new WebSocket connection is established. This configures the callbacks intended for overwriting by the user. - # This method should not be called directly. Rely on the #connect (and #disconnect) callback instead. - def process + # This method should not be called directly -- instead rely upon on the #connect (and #disconnect) callbacks. + def process #:nodoc: logger.info started_request_message if websocket.possible? && allow_request_origin? @@ -76,20 +77,22 @@ module ActionCable end end - # Data received over the cable is handled by this method. It's expected that everything inbound is JSON encoded. - # The data is routed to the proper channel that the connection has subscribed to. - def receive(data_in_json) + # Decodes WebSocket messages and dispatches them to subscribed channels. + # WebSocket message transfer encoding is always JSON. + def receive(websocket_message) #:nodoc: + send_async :dispatch_websocket_message, websocket_message + end + + def dispatch_websocket_message(websocket_message) #:nodoc: if websocket.alive? - subscriptions.execute_command ActiveSupport::JSON.decode(data_in_json) + subscriptions.execute_command decode(websocket_message) else - logger.error "Received data without a live WebSocket (#{data_in_json.inspect})" + logger.error "Ignoring message processed after the WebSocket was closed: #{websocket_message.inspect})" end end - # Send raw data straight back down the WebSocket. This is not intended to be called directly. Use the #transmit available on the - # Channel instead, as that'll automatically address the correct subscriber and wrap the message in JSON. - def transmit(data) - websocket.transmit data + def transmit(cable_message) # :nodoc: + websocket.transmit encode(cable_message) end # Close the WebSocket connection. @@ -102,19 +105,19 @@ module ActionCable worker_pool.async_invoke(self, method, *arguments) end - # Return a basic hash of statistics for the connection keyed with `identifier`, `started_at`, and `subscriptions`. + # Return a basic hash of statistics for the connection keyed with <tt>identifier</tt>, <tt>started_at</tt>, <tt>subscriptions</tt>, and <tt>request_id</tt>. # This can be returned by a health check against the connection. def statistics { identifier: connection_identifier, started_at: @started_at, subscriptions: subscriptions.identifiers, - request_id: @env['action_dispatch.request_id'] + request_id: @env["action_dispatch.request_id"] } end def beat - transmit ActiveSupport::JSON.encode(identifier: ActionCable::INTERNAL[:identifiers][:ping], message: Time.now.to_i) + transmit type: ActionCable::INTERNAL[:message_types][:ping], message: Time.now.to_i end def on_open # :nodoc: @@ -151,10 +154,19 @@ module ActionCable attr_reader :message_buffer private + def encode(cable_message) + @coder.encode cable_message + end + + def decode(websocket_message) + @coder.decode websocket_message + end + def handle_open + @protocol = websocket.protocol connect if respond_to?(:connect) subscribe_to_internal_channel - beat + send_welcome_message message_buffer.process! server.add_connection(self) @@ -173,10 +185,20 @@ module ActionCable disconnect if respond_to?(:disconnect) end + def send_welcome_message + # Send welcome message to the internal connection monitor channel. + # This ensures the connection monitor state is reset after a successful + # websocket connection. + transmit type: ActionCable::INTERNAL[:message_types][:welcome] + end + def allow_request_origin? return true if server.config.disable_request_forgery_protection - if Array(server.config.allowed_request_origins).any? { |allowed_origin| allowed_origin === env['HTTP_ORIGIN'] } + proto = Rack::Request.new(env).ssl? ? "https" : "http" + if server.config.allow_same_origin_as_host && env["HTTP_ORIGIN"] == "#{proto}://#{env['HTTP_HOST']}" + true + elsif Array(server.config.allowed_request_origins).any? { |allowed_origin| allowed_origin === env["HTTP_ORIGIN"] } true else logger.error("Request origin not allowed: #{env['HTTP_ORIGIN']}") @@ -194,7 +216,7 @@ module ActionCable logger.error invalid_request_message logger.info finished_request_message - [ 404, { 'Content-Type' => 'text/plain' }, [ 'Page not found' ] ] + [ 404, { "Content-Type" => "text/plain" }, [ "Page not found" ] ] end # Tags are declared in the server but computed in the connection. This allows us per-connection tailored tags. @@ -207,7 +229,7 @@ module ActionCable 'Started %s "%s"%s for %s at %s' % [ request.request_method, request.filtered_path, - websocket.possible? ? ' [WebSocket]' : '[non-WebSocket]', + websocket.possible? ? " [WebSocket]" : "[non-WebSocket]", request.ip, Time.now.to_s ] end @@ -215,19 +237,19 @@ module ActionCable def finished_request_message 'Finished "%s"%s for %s at %s' % [ request.filtered_path, - websocket.possible? ? ' [WebSocket]' : '[non-WebSocket]', + websocket.possible? ? " [WebSocket]" : "[non-WebSocket]", request.ip, Time.now.to_s ] end def invalid_request_message - 'Failed to upgrade to WebSocket (REQUEST_METHOD: %s, HTTP_CONNECTION: %s, HTTP_UPGRADE: %s)' % [ + "Failed to upgrade to WebSocket (REQUEST_METHOD: %s, HTTP_CONNECTION: %s, HTTP_UPGRADE: %s)" % [ env["REQUEST_METHOD"], env["HTTP_CONNECTION"], env["HTTP_UPGRADE"] ] end def successful_request_message - 'Successfully upgraded to WebSocket (REQUEST_METHOD: %s, HTTP_CONNECTION: %s, HTTP_UPGRADE: %s)' % [ + "Successfully upgraded to WebSocket (REQUEST_METHOD: %s, HTTP_CONNECTION: %s, HTTP_UPGRADE: %s)" % [ env["REQUEST_METHOD"], env["HTTP_CONNECTION"], env["HTTP_UPGRADE"] ] end diff --git a/actioncable/lib/action_cable/connection/client_socket.rb b/actioncable/lib/action_cable/connection/client_socket.rb index ef937d7c16..70a2bbecb1 100644 --- a/actioncable/lib/action_cable/connection/client_socket.rb +++ b/actioncable/lib/action_cable/connection/client_socket.rb @@ -1,4 +1,4 @@ -require 'websocket/driver' +require "websocket/driver" module ActionCable module Connection @@ -8,16 +8,16 @@ module ActionCable # Copyright (c) 2010-2015 James Coglan class ClientSocket # :nodoc: def self.determine_url(env) - scheme = secure_request?(env) ? 'wss:' : 'ws:' + scheme = secure_request?(env) ? "wss:" : "ws:" "#{ scheme }//#{ env['HTTP_HOST'] }#{ env['REQUEST_URI'] }" end def self.secure_request?(env) - return true if env['HTTPS'] == 'on' - return true if env['HTTP_X_FORWARDED_SSL'] == 'on' - return true if env['HTTP_X_FORWARDED_SCHEME'] == 'https' - return true if env['HTTP_X_FORWARDED_PROTO'] == 'https' - return true if env['rack.url_scheme'] == 'https' + return true if env["HTTPS"] == "on" + return true if env["HTTP_X_FORWARDED_SSL"] == "on" + return true if env["HTTP_X_FORWARDED_SCHEME"] == "https" + return true if env["HTTP_X_FORWARDED_PROTO"] == "https" + return true if env["rack.url_scheme"] == "https" return false end @@ -29,35 +29,37 @@ module ActionCable attr_reader :env, :url - def initialize(env, event_target, stream_event_loop) - @env = env - @event_target = event_target - @stream_event_loop = stream_event_loop + def initialize(env, event_target, event_loop, protocols) + @env = env + @event_target = event_target + @event_loop = event_loop @url = ClientSocket.determine_url(@env) @driver = @driver_started = nil - @close_params = ['', 1006] + @close_params = ["", 1006] @ready_state = CONNECTING # The driver calls +env+, +url+, and +write+ - @driver = ::WebSocket::Driver.rack(self) + @driver = ::WebSocket::Driver.rack(self, protocols: protocols) @driver.on(:open) { |e| open } @driver.on(:message) { |e| receive_message(e.data) } @driver.on(:close) { |e| begin_close(e.reason, e.code) } @driver.on(:error) { |e| emit_error(e.message) } - @stream = ActionCable::Connection::Stream.new(@stream_event_loop, self) - - if callback = @env['async.callback'] - callback.call([101, {}, @stream]) - end + @stream = ActionCable::Connection::Stream.new(@event_loop, self) end def start_driver return if @driver.nil? || @driver_started + @stream.hijack_rack_socket + + if callback = @env["async.callback"] + callback.call([101, {}, @stream]) + end + @driver_started = true @driver.start end @@ -69,23 +71,25 @@ module ActionCable def write(data) @stream.write(data) + rescue => e + emit_error e.message end def transmit(message) return false if @ready_state > OPEN case message - when Numeric then @driver.text(message.to_s) - when String then @driver.text(message) - when Array then @driver.binary(message) + when Numeric then @driver.text(message.to_s) + when String then @driver.text(message) + when Array then @driver.binary(message) else false end end def close(code = nil, reason = nil) code ||= 1000 - reason ||= '' + reason ||= "" - unless code == 1000 or (code >= 3000 and code <= 4999) + unless code == 1000 || (code >= 3000 && code <= 4999) raise ArgumentError, "Failed to execute 'close' on WebSocket: " + "The code must be either 1000, or between 3000 and 4999. " + "#{code} is neither." @@ -107,6 +111,10 @@ module ActionCable @ready_state == OPEN end + def protocol + @driver.protocol + end + private def open return unless @ready_state == CONNECTING @@ -132,11 +140,8 @@ module ActionCable @ready_state = CLOSING @close_params = [reason, code] - if @stream - @stream.shutdown - else - finalize_close - end + @stream.shutdown if @stream + finalize_close end def finalize_close diff --git a/actioncable/lib/action_cable/connection/identification.rb b/actioncable/lib/action_cable/connection/identification.rb index 885ff3f102..c91a1d1fd7 100644 --- a/actioncable/lib/action_cable/connection/identification.rb +++ b/actioncable/lib/action_cable/connection/identification.rb @@ -1,4 +1,4 @@ -require 'set' +require "set" module ActionCable module Connection @@ -12,7 +12,7 @@ module ActionCable class_methods do # Mark a key as being a connection identifier index that can then be used to find the specific connection again later. - # Common identifiers are current_user and current_account, but could be anything really. + # Common identifiers are current_user and current_account, but could be anything, really. # # Note that anything marked as an identifier will automatically create a delegate by the same name on any # channel instances created off the connection. diff --git a/actioncable/lib/action_cable/connection/internal_channel.rb b/actioncable/lib/action_cable/connection/internal_channel.rb index 27826792b3..8f0ec766c3 100644 --- a/actioncable/lib/action_cable/connection/internal_channel.rb +++ b/actioncable/lib/action_cable/connection/internal_channel.rb @@ -11,26 +11,24 @@ module ActionCable def subscribe_to_internal_channel if connection_identifier.present? - callback = -> (message) { process_internal_message(message) } + callback = -> (message) { process_internal_message decode(message) } @_internal_subscriptions ||= [] @_internal_subscriptions << [ internal_channel, callback ] - Concurrent.global_io_executor.post { pubsub.subscribe(internal_channel, callback) } + server.event_loop.post { pubsub.subscribe(internal_channel, callback) } logger.info "Registered connection (#{connection_identifier})" end end def unsubscribe_from_internal_channel if @_internal_subscriptions.present? - @_internal_subscriptions.each { |channel, callback| Concurrent.global_io_executor.post { pubsub.unsubscribe(channel, callback) } } + @_internal_subscriptions.each { |channel, callback| server.event_loop.post { pubsub.unsubscribe(channel, callback) } } end end def process_internal_message(message) - message = ActiveSupport::JSON.decode(message) - - case message['type'] - when 'disconnect' + case message["type"] + when "disconnect" logger.info "Removing connection (#{connection_identifier})" websocket.close end diff --git a/actioncable/lib/action_cable/connection/message_buffer.rb b/actioncable/lib/action_cable/connection/message_buffer.rb index 2f65a1e84a..6a80770cae 100644 --- a/actioncable/lib/action_cable/connection/message_buffer.rb +++ b/actioncable/lib/action_cable/connection/message_buffer.rb @@ -1,8 +1,7 @@ module ActionCable module Connection - # Allows us to buffer messages received from the WebSocket before the Connection has been fully initialized and is ready to receive them. - # Entirely internal operation and should not be used directly by the user. - class MessageBuffer + # Allows us to buffer messages received from the WebSocket before the Connection has been fully initialized, and is ready to receive them. + class MessageBuffer # :nodoc: def initialize(connection) @connection = connection @buffered_messages = [] @@ -31,7 +30,7 @@ module ActionCable protected attr_reader :connection - attr_accessor :buffered_messages + attr_reader :buffered_messages private def valid?(message) @@ -39,7 +38,7 @@ module ActionCable end def receive(message) - connection.send_async :receive, message + connection.receive message end def buffer(message) diff --git a/actioncable/lib/action_cable/connection/stream.rb b/actioncable/lib/action_cable/connection/stream.rb index ace250cd16..e620b93845 100644 --- a/actioncable/lib/action_cable/connection/stream.rb +++ b/actioncable/lib/action_cable/connection/stream.rb @@ -1,18 +1,22 @@ +require "thread" + module ActionCable module Connection #-- # This class is heavily based on faye-websocket-ruby # # Copyright (c) 2010-2015 James Coglan - class Stream + class Stream # :nodoc: def initialize(event_loop, socket) @event_loop = event_loop @socket_object = socket - @stream_send = socket.env['stream.send'] + @stream_send = socket.env["stream.send"] @rack_hijack_io = nil + @write_lock = Mutex.new - hijack_rack_socket + @write_head = nil + @write_buffer = Queue.new end def each(&callback) @@ -29,26 +33,76 @@ module ActionCable end def write(data) - return @rack_hijack_io.write(data) if @rack_hijack_io - return @stream_send.call(data) if @stream_send - rescue EOFError + if @stream_send + return @stream_send.call(data) + end + + if @write_lock.try_lock + begin + if @write_head.nil? && @write_buffer.empty? + written = @rack_hijack_io.write_nonblock(data, exception: false) + + case written + when :wait_writable + # proceed below + when data.bytesize + return data.bytesize + else + @write_head = data.byteslice(written, data.bytesize) + @event_loop.writes_pending @rack_hijack_io + + return data.bytesize + end + end + ensure + @write_lock.unlock + end + end + + @write_buffer << data + @event_loop.writes_pending @rack_hijack_io + + data.bytesize + rescue EOFError, Errno::ECONNRESET @socket_object.client_gone end + def flush_write_buffer + @write_lock.synchronize do + loop do + if @write_head.nil? + return true if @write_buffer.empty? + @write_head = @write_buffer.pop + end + + written = @rack_hijack_io.write_nonblock(@write_head, exception: false) + case written + when :wait_writable + return false + when @write_head.bytesize + @write_head = nil + else + @write_head = @write_head.byteslice(written, @write_head.bytesize) + return false + end + end + end + end + def receive(data) @socket_object.parse(data) end - private - def hijack_rack_socket - return unless @socket_object.env['rack.hijack'] + def hijack_rack_socket + return unless @socket_object.env["rack.hijack"] - @socket_object.env['rack.hijack'].call - @rack_hijack_io = @socket_object.env['rack.hijack_io'] + @socket_object.env["rack.hijack"].call + @rack_hijack_io = @socket_object.env["rack.hijack_io"] - @event_loop.attach(@rack_hijack_io, self) - end + @event_loop.attach(@rack_hijack_io, self) + end + private def clean_rack_hijack return unless @rack_hijack_io @event_loop.detach(@rack_hijack_io, self) diff --git a/actioncable/lib/action_cable/connection/stream_event_loop.rb b/actioncable/lib/action_cable/connection/stream_event_loop.rb index e6335082d2..2d1af0ff9f 100644 --- a/actioncable/lib/action_cable/connection/stream_event_loop.rb +++ b/actioncable/lib/action_cable/connection/stream_event_loop.rb @@ -1,23 +1,33 @@ -require 'nio' -require 'thread' +require "nio" +require "thread" module ActionCable module Connection class StreamEventLoop def initialize - @nio = @thread = nil + @nio = @executor = @thread = nil @map = {} @stopping = false @todo = Queue.new @spawn_mutex = Mutex.new + end + + def timer(interval, &block) + Concurrent::TimerTask.new(execution_interval: interval, &block).tap(&:execute) + end + + def post(task = nil, &block) + task ||= block + spawn + @executor << task end def attach(io, stream) @todo << lambda do - @map[io] = stream - @nio.register(io, :r) + @map[io] = @nio.register(io, :r) + @map[io].value = stream end wakeup end @@ -26,6 +36,16 @@ module ActionCable @todo << lambda do @nio.deregister io @map.delete io + io.close + end + wakeup + end + + def writes_pending(io) + @todo << lambda do + if monitor = @map[io] + monitor.interests = :rw + end end wakeup end @@ -43,6 +63,13 @@ module ActionCable return if @thread && @thread.status @nio ||= NIO::Selector.new + + @executor ||= Concurrent::ThreadPoolExecutor.new( + min_threads: 1, + max_threads: 10, + max_queue: 0, + ) + @thread = Thread.new { run } return true @@ -68,12 +95,25 @@ module ActionCable monitors.each do |monitor| io = monitor.io - stream = @map[io] + stream = monitor.value begin - stream.receive io.read_nonblock(4096) - rescue IO::WaitReadable - next + if monitor.writable? + if stream.flush_write_buffer + monitor.interests = :r + end + next unless monitor.readable? + end + + incoming = io.read_nonblock(4096, exception: false) + case incoming + when :wait_readable + next + when nil + stream.close + else + stream.receive incoming + end rescue # We expect one of EOFError or Errno::ECONNRESET in # normal operation (when the client goes away). But if diff --git a/actioncable/lib/action_cable/connection/subscriptions.rb b/actioncable/lib/action_cable/connection/subscriptions.rb index d7f95e6a62..00511aead5 100644 --- a/actioncable/lib/action_cable/connection/subscriptions.rb +++ b/actioncable/lib/action_cable/connection/subscriptions.rb @@ -1,20 +1,20 @@ -require 'active_support/core_ext/hash/indifferent_access' +require "active_support/core_ext/hash/indifferent_access" module ActionCable module Connection # Collection class for all the channel subscriptions established on a given connection. Responsible for routing incoming commands that arrive on - # the connection to the proper channel. Should not be used directly by the user. - class Subscriptions + # the connection to the proper channel. + class Subscriptions # :nodoc: def initialize(connection) @connection = connection @subscriptions = {} end def execute_command(data) - case data['command'] - when 'subscribe' then add data - when 'unsubscribe' then remove data - when 'message' then perform_action data + case data["command"] + when "subscribe" then add data + when "unsubscribe" then remove data + when "message" then perform_action data else logger.error "Received unrecognized command in #{data.inspect}" end @@ -23,21 +23,25 @@ module ActionCable end def add(data) - id_key = data['identifier'] + id_key = data["identifier"] id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access - subscription_klass = connection.server.channel_classes[id_options[:channel]] + return if subscriptions.key?(id_key) - if subscription_klass - subscriptions[id_key] ||= subscription_klass.new(connection, id_key, id_options) + subscription_klass = id_options[:channel].safe_constantize + + if subscription_klass && ActionCable::Channel::Base >= subscription_klass + subscription = subscription_klass.new(connection, id_key, id_options) + subscriptions[id_key] = subscription + subscription.subscribe_to_channel else - logger.error "Subscription class not found (#{data.inspect})" + logger.error "Subscription class not found: #{id_options[:channel].inspect}" end end def remove(data) logger.info "Unsubscribing from channel: #{data['identifier']}" - remove_subscription subscriptions[data['identifier']] + remove_subscription subscriptions[data["identifier"]] end def remove_subscription(subscription) @@ -46,7 +50,7 @@ module ActionCable end def perform_action(data) - find(data).perform_action ActiveSupport::JSON.decode(data['data']) + find(data).perform_action ActiveSupport::JSON.decode(data["data"]) end def identifiers @@ -54,7 +58,7 @@ module ActionCable end def unsubscribe_from_all - subscriptions.each { |id, channel| channel.unsubscribe_from_channel } + subscriptions.each { |id, channel| remove_subscription(channel) } end protected @@ -64,7 +68,7 @@ module ActionCable delegate :logger, to: :connection def find(data) - if subscription = subscriptions[data['identifier']] + if subscription = subscriptions[data["identifier"]] subscription else raise "Unable to find subscription with identifier: #{data['identifier']}" diff --git a/actioncable/lib/action_cable/connection/web_socket.rb b/actioncable/lib/action_cable/connection/web_socket.rb index 5e89fb9b72..382141b89f 100644 --- a/actioncable/lib/action_cable/connection/web_socket.rb +++ b/actioncable/lib/action_cable/connection/web_socket.rb @@ -1,11 +1,11 @@ -require 'websocket/driver' +require "websocket/driver" module ActionCable module Connection # Wrap the real socket to minimize the externally-presented API class WebSocket - def initialize(env, event_target, stream_event_loop) - @websocket = ::WebSocket::Driver.websocket?(env) ? ClientSocket.new(env, event_target, stream_event_loop) : nil + def initialize(env, event_target, event_loop, protocols: ActionCable::INTERNAL[:protocols]) + @websocket = ::WebSocket::Driver.websocket?(env) ? ClientSocket.new(env, event_target, event_loop, protocols) : nil end def possible? @@ -24,6 +24,10 @@ module ActionCable websocket.close end + def protocol + websocket.protocol + end + def rack_response websocket.rack_response end diff --git a/actioncable/lib/action_cable/engine.rb b/actioncable/lib/action_cable/engine.rb index f5e233e091..4c5c975cd8 100644 --- a/actioncable/lib/action_cable/engine.rb +++ b/actioncable/lib/action_cable/engine.rb @@ -4,9 +4,9 @@ require "action_cable/helpers/action_cable_helper" require "active_support/core_ext/hash/indifferent_access" module ActionCable - class Railtie < Rails::Engine # :nodoc: + class Engine < Rails::Engine # :nodoc: config.action_cable = ActiveSupport::OrderedOptions.new - config.action_cable.url = '/cable' + config.action_cable.mount_path = ActionCable::INTERNAL[:default_mount_path] config.eager_load_namespaces << ActionCable @@ -22,7 +22,7 @@ module ActionCable initializer "action_cable.set_configs" do |app| options = app.config.action_cable - options.allowed_request_origins ||= "http://localhost:3000" if ::Rails.env.development? + options.allowed_request_origins ||= /https?:\/\/localhost:\d+/ if ::Rails.env.development? app.paths.add "config/cable", with: "config/cable.yml" @@ -31,8 +31,47 @@ module ActionCable self.cable = Rails.application.config_for(config_path).with_indifferent_access end + previous_connection_class = self.connection_class + self.connection_class = -> { "ApplicationCable::Connection".safe_constantize || previous_connection_class.call } + options.each { |k,v| send("#{k}=", v) } end end + + initializer "action_cable.routes" do + config.after_initialize do |app| + config = app.config + unless config.action_cable.mount_path.nil? + app.routes.prepend do + mount ActionCable.server => config.action_cable.mount_path, internal: true + end + end + end + end + + initializer "action_cable.set_work_hooks" do |app| + ActiveSupport.on_load(:action_cable) do + ActionCable::Server::Worker.set_callback :work, :around, prepend: true do |_, inner| + app.executor.wrap do + # If we took a while to get the lock, we may have been halted + # in the meantime. As we haven't started doing any real work + # yet, we should pretend that we never made it off the queue. + unless stopping? + inner.call + end + end + end + + wrap = lambda do |_, inner| + app.executor.wrap(&inner) + end + ActionCable::Channel::Base.set_callback :subscribe, :around, prepend: true, &wrap + ActionCable::Channel::Base.set_callback :unsubscribe, :around, prepend: true, &wrap + + app.reloader.before_class_unload do + ActionCable.server.restart + end + end + end end end diff --git a/actioncable/lib/action_cable/gem_version.rb b/actioncable/lib/action_cable/gem_version.rb index a71603e61a..8ba0230d47 100644 --- a/actioncable/lib/action_cable/gem_version.rb +++ b/actioncable/lib/action_cable/gem_version.rb @@ -6,9 +6,9 @@ module ActionCable module VERSION MAJOR = 5 - MINOR = 0 + MINOR = 1 TINY = 0 - PRE = "beta2" + PRE = "alpha" STRING = [MAJOR, MINOR, TINY, PRE].compact.join(".") end diff --git a/actioncable/lib/action_cable/helpers/action_cable_helper.rb b/actioncable/lib/action_cable/helpers/action_cable_helper.rb index b82751468a..2081a37db6 100644 --- a/actioncable/lib/action_cable/helpers/action_cable_helper.rb +++ b/actioncable/lib/action_cable/helpers/action_cable_helper.rb @@ -1,28 +1,39 @@ module ActionCable module Helpers module ActionCableHelper - # Returns an "action-cable-url" meta tag with the value of the url specified in your - # configuration. Ensure this is above your javascript tag: + # Returns an "action-cable-url" meta tag with the value of the URL specified in your + # configuration. Ensure this is above your JavaScript tag: # # <head> # <%= action_cable_meta_tag %> # <%= javascript_include_tag 'application', 'data-turbolinks-track' => true %> # </head> # - # This is then used by ActionCable to determine the url of your websocket server. + # This is then used by Action Cable to determine the URL of your WebSocket server. # Your CoffeeScript can then connect to the server without needing to specify the - # url directly: + # URL directly: # # #= require cable # @App = {} # App.cable = Cable.createConsumer() # - # Make sure to specify the correct server location in each of your environments - # config file: + # Make sure to specify the correct server location in each of your environment + # config files: + # + # config.action_cable.mount_path = "/cable123" + # <%= action_cable_meta_tag %> would render: + # => <meta name="action-cable-url" content="/cable123" /> + # + # config.action_cable.url = "ws://actioncable.com" + # <%= action_cable_meta_tag %> would render: + # => <meta name="action-cable-url" content="ws://actioncable.com" /> # - # config.action_cable.url = "ws://example.com:28080" def action_cable_meta_tag - tag "meta", name: "action-cable-url", content: Rails.application.config.action_cable.url + tag "meta", name: "action-cable-url", content: ( + ActionCable.server.config.url || + ActionCable.server.config.mount_path || + raise("No Action Cable URL configured -- please configure this at config.action_cable.url") + ) end end end diff --git a/actioncable/lib/action_cable/remote_connections.rb b/actioncable/lib/action_cable/remote_connections.rb index 7ec121308a..720ba52d19 100644 --- a/actioncable/lib/action_cable/remote_connections.rb +++ b/actioncable/lib/action_cable/remote_connections.rb @@ -13,8 +13,8 @@ module ActionCable # ActionCable.server.remote_connections.where(current_user: User.find(1)).disconnect # # This will disconnect all the connections established for - # <tt>User.find(1)</tt> across all servers running on all machines, because - # it uses the internal channel that all these servers are subscribed to. + # <tt>User.find(1)</tt>, across all servers running on all machines, because + # it uses the internal channel that all of these servers are subscribed to. class RemoteConnections attr_reader :server @@ -28,7 +28,7 @@ module ActionCable private # Represents a single remote connection found via <tt>ActionCable.server.remote_connections.where(*)</tt>. - # Exists for the solely for the purpose of calling #disconnect on that connection. + # Exists solely for the purpose of calling #disconnect on that connection. class RemoteConnection class InvalidIdentifiersError < StandardError; end @@ -41,7 +41,7 @@ module ActionCable # Uses the internal channel to disconnect the connection. def disconnect - server.broadcast internal_channel, type: 'disconnect' + server.broadcast internal_channel, type: "disconnect" end # Returns all the identifiers that were applied to this connection. diff --git a/actioncable/lib/action_cable/server.rb b/actioncable/lib/action_cable/server.rb index bd6a3826a3..22f9353825 100644 --- a/actioncable/lib/action_cable/server.rb +++ b/actioncable/lib/action_cable/server.rb @@ -9,7 +9,7 @@ module ActionCable autoload :Configuration autoload :Worker - autoload :ActiveRecordConnectionManagement, 'action_cable/server/worker/active_record_connection_management' + autoload :ActiveRecordConnectionManagement, "action_cable/server/worker/active_record_connection_management" end end end diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb index fe48c112df..419eccd73c 100644 --- a/actioncable/lib/action_cable/server/base.rb +++ b/actioncable/lib/action_cable/server/base.rb @@ -1,11 +1,11 @@ -require 'thread' +require "monitor" module ActionCable module Server - # A singleton ActionCable::Server instance is available via ActionCable.server. It's used by the rack process that starts the cable server, but - # also by the user to reach the RemoteConnections instead for finding and disconnecting connections across all servers. + # A singleton ActionCable::Server instance is available via ActionCable.server. It's used by the Rack process that starts the Action Cable server, but + # is also used by the user to reach the RemoteConnections object, which is used for finding and disconnecting connections across all servers. # - # Also, this is the server instance used for broadcasting. See Broadcasting for details. + # Also, this is the server instance used for broadcasting. See Broadcasting for more information. class Base include ActionCable::Server::Broadcasting include ActionCable::Server::Connections @@ -18,15 +18,14 @@ module ActionCable attr_reader :mutex def initialize - @mutex = Mutex.new - - @remote_connections = @stream_event_loop = @worker_pool = @channel_classes = @pubsub = nil + @mutex = Monitor.new + @remote_connections = @event_loop = @worker_pool = @pubsub = nil end - # Called by rack to setup the server. + # Called by Rack to setup the server. def call(env) setup_heartbeat_timer - config.connection_class.new(self, env).process + config.connection_class.call.new(self, env).process end # Disconnect all the connections identified by `identifiers` on this server or any others via RemoteConnections. @@ -34,38 +33,52 @@ module ActionCable remote_connections.where(identifiers).disconnect end + def restart + connections.each(&:close) + + @mutex.synchronize do + # Shutdown the worker pool + @worker_pool.halt if @worker_pool + @worker_pool = nil + + # Shutdown the pub/sub adapter + @pubsub.shutdown if @pubsub + @pubsub = nil + end + end + # Gateway to RemoteConnections. See that class for details. def remote_connections @remote_connections || @mutex.synchronize { @remote_connections ||= RemoteConnections.new(self) } end - def stream_event_loop - @stream_event_loop || @mutex.synchronize { @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new } + def event_loop + @event_loop || @mutex.synchronize { @event_loop ||= ActionCable::Connection::StreamEventLoop.new } end - # The thread worker pool for handling all the connection work on this server. Default size is set by config.worker_pool_size. + # The worker pool is where we run connection callbacks and channel actions. We do as little as possible on the server's main thread. + # The worker pool is an executor service that's backed by a pool of threads working from a task queue. The thread pool size maxes out + # at 4 worker threads by default. Tune the size yourself with <tt>config.action_cable.worker_pool_size</tt>. + # + # Using Active Record, Redis, etc within your channel actions means you'll get a separate connection from each thread in the worker pool. + # Plan your deployment accordingly: 5 servers each running 5 Puma workers each running an 8-thread worker pool means at least 200 database + # connections. + # + # Also, ensure that your database connection pool size is as least as large as your worker pool size. Otherwise, workers may oversubscribe + # the database connection pool and block while they wait for other workers to release their connections. Use a smaller worker pool or a larger + # database connection pool instead. def worker_pool @worker_pool || @mutex.synchronize { @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size) } end - # Requires and returns a hash of all the channel class constants keyed by name. - def channel_classes - @channel_classes || @mutex.synchronize do - @channel_classes ||= begin - config.channel_paths.each { |channel_path| require channel_path } - config.channel_class_names.each_with_object({}) { |name, hash| hash[name] = name.constantize } - end - end - end - # Adapter used for all streams/broadcasting. def pubsub @pubsub || @mutex.synchronize { @pubsub ||= config.pubsub_adapter.new(self) } end - # All the identifiers applied to the connection class associated with this server. + # All of the identifiers applied to the connection class associated with this server. def connection_identifiers - config.connection_class.identifiers + config.connection_class.call.identifiers end end diff --git a/actioncable/lib/action_cable/server/broadcasting.rb b/actioncable/lib/action_cable/server/broadcasting.rb index b87232671b..1fc58baa3e 100644 --- a/actioncable/lib/action_cable/server/broadcasting.rb +++ b/actioncable/lib/action_cable/server/broadcasting.rb @@ -1,6 +1,6 @@ module ActionCable module Server - # Broadcasting is how other parts of your application can send messages to the channel subscribers. As explained in Channel, most of the time, these + # Broadcasting is how other parts of your application can send messages to a channel's subscribers. As explained in Channel, most of the time, these # broadcastings are streamed directly to the clients subscribed to the named broadcasting. Let's explain with a full-stack example: # # class WebNotificationsChannel < ApplicationCable::Channel @@ -9,37 +9,42 @@ module ActionCable # end # end # - # # Somewhere in your app this is called, perhaps from a NewCommentJob + # # Somewhere in your app this is called, perhaps from a NewCommentJob: # ActionCable.server.broadcast \ # "web_notifications_1", { title: "New things!", body: "All that's fit for print" } # - # # Client-side CoffeeScript, which assumes you've already requested the right to send web notifications + # # Client-side CoffeeScript, which assumes you've already requested the right to send web notifications: # App.cable.subscriptions.create "WebNotificationsChannel", # received: (data) -> # new Notification data['title'], body: data['body'] module Broadcasting - # Broadcast a hash directly to a named <tt>broadcasting</tt>. It'll automatically be JSON encoded. - def broadcast(broadcasting, message) - broadcaster_for(broadcasting).broadcast(message) + # Broadcast a hash directly to a named <tt>broadcasting</tt>. This will later be JSON encoded. + def broadcast(broadcasting, message, coder: ActiveSupport::JSON) + broadcaster_for(broadcasting, coder: coder).broadcast(message) end # Returns a broadcaster for a named <tt>broadcasting</tt> that can be reused. Useful when you have an object that # may need multiple spots to transmit to a specific broadcasting over and over. - def broadcaster_for(broadcasting) - Broadcaster.new(self, broadcasting) + def broadcaster_for(broadcasting, coder: ActiveSupport::JSON) + Broadcaster.new(self, String(broadcasting), coder: coder) end private class Broadcaster - attr_reader :server, :broadcasting + attr_reader :server, :broadcasting, :coder - def initialize(server, broadcasting) - @server, @broadcasting = server, broadcasting + def initialize(server, broadcasting, coder:) + @server, @broadcasting, @coder = server, broadcasting, coder end def broadcast(message) - server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message}" - server.pubsub.broadcast broadcasting, ActiveSupport::JSON.encode(message) + server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message.inspect}" + + payload = { broadcasting: broadcasting, message: message, coder: coder } + ActiveSupport::Notifications.instrument("broadcast.action_cable", payload) do + encoded = coder ? coder.encode(message) : message + server.pubsub.broadcast broadcasting, encoded + end end end end diff --git a/actioncable/lib/action_cable/server/configuration.rb b/actioncable/lib/action_cable/server/configuration.rb index 9a248933c4..17e0dee064 100644 --- a/actioncable/lib/action_cable/server/configuration.rb +++ b/actioncable/lib/action_cable/server/configuration.rb @@ -1,42 +1,28 @@ module ActionCable module Server - # An instance of this configuration object is available via ActionCable.server.config, which allows you to tweak the configuration points + # An instance of this configuration object is available via ActionCable.server.config, which allows you to tweak Action Cable configuration # in a Rails config initializer. class Configuration attr_accessor :logger, :log_tags attr_accessor :connection_class, :worker_pool_size - attr_accessor :channel_load_paths - attr_accessor :disable_request_forgery_protection, :allowed_request_origins - attr_accessor :cable, :url + attr_accessor :disable_request_forgery_protection, :allowed_request_origins, :allow_same_origin_as_host + attr_accessor :cable, :url, :mount_path def initialize @log_tags = [] - @connection_class = ApplicationCable::Connection - @worker_pool_size = 100 - - @channel_load_paths = [Rails.root.join('app/channels')] + @connection_class = -> { ActionCable::Connection::Base } + @worker_pool_size = 4 @disable_request_forgery_protection = false - end - - def channel_paths - @channel_paths ||= channel_load_paths.flat_map do |path| - Dir["#{path}/**/*_channel.rb"] - end - end - - def channel_class_names - @channel_class_names ||= channel_paths.collect do |channel_path| - Pathname.new(channel_path).basename.to_s.split('.').first.camelize - end + @allow_same_origin_as_host = true end # Returns constant of subscription adapter specified in config/cable.yml. # If the adapter cannot be found, this will default to the Redis adapter. # Also makes sure proper dependencies are required. def pubsub_adapter - adapter = (cable.fetch('adapter') { 'redis' }) + adapter = (cable.fetch("adapter") { "redis" }) path_to_adapter = "action_cable/subscription_adapter/#{adapter}" begin require path_to_adapter @@ -47,7 +33,7 @@ module ActionCable end adapter = adapter.camelize - adapter = 'PostgreSQL' if adapter == 'Postgresql' + adapter = "PostgreSQL" if adapter == "Postgresql" "ActionCable::SubscriptionAdapter::#{adapter}".constantize end end diff --git a/actioncable/lib/action_cable/server/connections.rb b/actioncable/lib/action_cable/server/connections.rb index 8671dd5ebd..5e61b4e335 100644 --- a/actioncable/lib/action_cable/server/connections.rb +++ b/actioncable/lib/action_cable/server/connections.rb @@ -1,9 +1,8 @@ module ActionCable module Server - # Collection class for all the connections that's been established on this specific server. Remember, usually you'll run many cable servers, so - # you can't use this collection as an full list of all the connections established against your application. Use RemoteConnections for that. - # As such, this is primarily for internal use. - module Connections + # Collection class for all the connections that have been established on this specific server. Remember, usually you'll run many Action Cable servers, so + # you can't use this collection as a full list of all of the connections established against your application. Instead, use RemoteConnections for that. + module Connections # :nodoc: BEAT_INTERVAL = 3 def connections @@ -19,12 +18,12 @@ module ActionCable end # WebSocket connection implementations differ on when they'll mark a connection as stale. We basically never want a connection to go stale, as you - # then can't rely on being able to receive and send to it. So there's a 3 second heartbeat running on all connections. If the beat fails, we automatically + # then can't rely on being able to communicate with the connection. To solve this, a 3 second heartbeat runs on all connections. If the beat fails, we automatically # disconnect. def setup_heartbeat_timer - @heartbeat_timer ||= Concurrent::TimerTask.new(execution_interval: BEAT_INTERVAL) do - Concurrent.global_io_executor.post { connections.map(&:beat) } - end.tap(&:execute) + @heartbeat_timer ||= event_loop.timer(BEAT_INTERVAL) do + event_loop.post { connections.map(&:beat) } + end end def open_connections_statistics diff --git a/actioncable/lib/action_cable/server/worker.rb b/actioncable/lib/action_cable/server/worker.rb index 3b6c6d44a1..43639c27af 100644 --- a/actioncable/lib/action_cable/server/worker.rb +++ b/actioncable/lib/action_cable/server/worker.rb @@ -1,63 +1,67 @@ -require 'active_support/callbacks' -require 'active_support/core_ext/module/attribute_accessors_per_thread' -require 'concurrent' +require "active_support/callbacks" +require "active_support/core_ext/module/attribute_accessors_per_thread" +require "concurrent" module ActionCable module Server - # Worker used by Server.send_async to do connection work in threads. Only for internal use. - class Worker + # Worker used by Server.send_async to do connection work in threads. + class Worker # :nodoc: include ActiveSupport::Callbacks thread_mattr_accessor :connection define_callbacks :work include ActiveRecordConnectionManagement + attr_reader :executor + def initialize(max_size: 5) - @pool = Concurrent::ThreadPoolExecutor.new( + @executor = Concurrent::ThreadPoolExecutor.new( min_threads: 1, max_threads: max_size, max_queue: 0, ) end - def async_invoke(receiver, method, *args) - @pool.post do - invoke(receiver, method, *args) - end + # Stop processing work: any work that has not already started + # running will be discarded from the queue + def halt + @executor.shutdown end - def invoke(receiver, method, *args) - begin - self.connection = receiver + def stopping? + @executor.shuttingdown? + end - run_callbacks :work do - receiver.send method, *args - end - rescue Exception => e - logger.error "There was an exception - #{e.class}(#{e.message})" - logger.error e.backtrace.join("\n") + def work(connection) + self.connection = connection - receiver.handle_exception if receiver.respond_to?(:handle_exception) - ensure - self.connection = nil + run_callbacks :work do + yield end + ensure + self.connection = nil + end + + def async_exec(receiver, *args, connection:, &block) + async_invoke receiver, :instance_exec, *args, connection: connection, &block end - def async_run_periodic_timer(channel, callback) - @pool.post do - run_periodic_timer(channel, callback) + def async_invoke(receiver, method, *args, connection: receiver, &block) + @executor.post do + invoke(receiver, method, *args, connection: connection, &block) end end - def run_periodic_timer(channel, callback) - begin - self.connection = channel.connection + def invoke(receiver, method, *args, connection:, &block) + work(connection) do + begin + receiver.send method, *args, &block + rescue Exception => e + logger.error "There was an exception - #{e.class}(#{e.message})" + logger.error e.backtrace.join("\n") - run_callbacks :work do - callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback) + receiver.handle_exception if receiver.respond_to?(:handle_exception) end - ensure - self.connection = nil end end diff --git a/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb b/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb index ecece4e270..c1e4aa8103 100644 --- a/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb +++ b/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb @@ -1,7 +1,6 @@ module ActionCable module Server class Worker - # Clear active connections between units of work so the long-running channel or connection processes do not hoard connections. module ActiveRecordConnectionManagement extend ActiveSupport::Concern @@ -13,10 +12,8 @@ module ActionCable def with_database_connections connection.logger.tag(ActiveRecord::Base.logger) { yield } - ensure - ActiveRecord::Base.clear_active_connections! end end end end -end
\ No newline at end of file +end diff --git a/actioncable/lib/action_cable/subscription_adapter/async.rb b/actioncable/lib/action_cable/subscription_adapter/async.rb index cca6894289..46819dbfec 100644 --- a/actioncable/lib/action_cable/subscription_adapter/async.rb +++ b/actioncable/lib/action_cable/subscription_adapter/async.rb @@ -1,20 +1,25 @@ -require 'action_cable/subscription_adapter/inline' +require "action_cable/subscription_adapter/inline" module ActionCable module SubscriptionAdapter class Async < Inline # :nodoc: private def new_subscriber_map - AsyncSubscriberMap.new + AsyncSubscriberMap.new(server.event_loop) end class AsyncSubscriberMap < SubscriberMap + def initialize(event_loop) + @event_loop = event_loop + super() + end + def add_subscriber(*) - Concurrent.global_io_executor.post { super } + @event_loop.post { super } end def invoke_callback(*) - Concurrent.global_io_executor.post { super } + @event_loop.post { super } end end end diff --git a/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb index af04a58c70..bcd46d2a0e 100644 --- a/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb @@ -1,9 +1,9 @@ -require 'thread' +require "thread" -gem 'em-hiredis', '~> 0.3.0' -gem 'redis', '~> 3.0' -require 'em-hiredis' -require 'redis' +gem "em-hiredis", "~> 0.3.0" +gem "redis", "~> 3.0" +require "em-hiredis" +require "redis" EventMachine.epoll if EventMachine.epoll? EventMachine.kqueue if EventMachine.kqueue? @@ -13,11 +13,11 @@ module ActionCable class EventedRedis < Base # :nodoc: @@mutex = Mutex.new - # Overwrite this factory method for EventMachine redis connections if you want to use a different Redis library than EM::Hiredis. + # Overwrite this factory method for EventMachine Redis connections if you want to use a different Redis connection library than EM::Hiredis. # This is needed, for example, when using Makara proxies for distributed Redis. cattr_accessor(:em_redis_connector) { ->(config) { EM::Hiredis.connect(config[:url]) } } - # Overwrite this factory method for redis connections if you want to use a different Redis library than Redis. + # Overwrite this factory method for Redis connections if you want to use a different Redis connection library than Redis. # This is needed, for example, when using Makara proxies for distributed Redis. cattr_accessor(:redis_connector) { ->(config) { ::Redis.new(url: config[:url]) } } @@ -51,7 +51,11 @@ module ActionCable @redis_connection_for_subscriptions || @server.mutex.synchronize do @redis_connection_for_subscriptions ||= self.class.em_redis_connector.call(@server.config.cable).tap do |redis| redis.on(:reconnect_failed) do - @logger.info "[ActionCable] Redis reconnect failed." + @logger.error "[ActionCable] Redis reconnect failed." + end + + redis.on(:failed) do + @logger.error "[ActionCable] Redis connection has failed." end end end diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb index abaeb92e54..bdab5205ec 100644 --- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb +++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb @@ -1,6 +1,6 @@ -gem 'pg', '~> 0.18' -require 'pg' -require 'thread' +gem "pg", "~> 0.18" +require "pg" +require "thread" module ActionCable module SubscriptionAdapter @@ -33,7 +33,7 @@ module ActionCable pg_conn = ar_conn.raw_connection unless pg_conn.is_a?(PG::Connection) - raise 'ActiveRecord database must be Postgres in order to use the Postgres ActionCable storage adapter' + raise "The Active Record database must be PostgreSQL in order to use the PostgreSQL Action Cable storage adapter" end yield pg_conn @@ -42,14 +42,15 @@ module ActionCable private def listener - @listener || @server.mutex.synchronize { @listener ||= Listener.new(self) } + @listener || @server.mutex.synchronize { @listener ||= Listener.new(self, @server.event_loop) } end class Listener < SubscriberMap - def initialize(adapter) + def initialize(adapter, event_loop) super() @adapter = adapter + @event_loop = event_loop @queue = Queue.new @thread = Thread.new do @@ -68,7 +69,7 @@ module ActionCable case action when :listen pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}") - Concurrent.global_io_executor << callback if callback + @event_loop.post(&callback) if callback when :unlisten pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}") when :shutdown @@ -98,7 +99,7 @@ module ActionCable end def invoke_callback(*) - Concurrent.global_io_executor.post { super } + @event_loop.post { super } end end end diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb index ba4934a264..62bd284a6b 100644 --- a/actioncable/lib/action_cable/subscription_adapter/redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb @@ -1,7 +1,7 @@ -require 'thread' +require "thread" -gem 'redis', '~> 3.0' -require 'redis' +gem "redis", "~> 3.0" +require "redis" module ActionCable module SubscriptionAdapter @@ -33,25 +33,30 @@ module ActionCable end def redis_connection_for_subscriptions - ::Redis.new(@server.config.cable) + redis_connection end private def listener - @listener || @server.mutex.synchronize { @listener ||= Listener.new(self) } + @listener || @server.mutex.synchronize { @listener ||= Listener.new(self, @server.event_loop) } end def redis_connection_for_broadcasts @redis_connection_for_broadcasts || @server.mutex.synchronize do - @redis_connection_for_broadcasts ||= self.class.redis_connector.call(@server.config.cable) + @redis_connection_for_broadcasts ||= redis_connection end end + def redis_connection + self.class.redis_connector.call(@server.config.cable) + end + class Listener < SubscriberMap - def initialize(adapter) + def initialize(adapter, event_loop) super() @adapter = adapter + @event_loop = event_loop @subscribe_callbacks = Hash.new { |h, k| h[k] = [] } @subscription_lock = Mutex.new @@ -67,7 +72,7 @@ module ActionCable conn.without_reconnect do original_client = conn.client - conn.subscribe('_action_cable_internal') do |on| + conn.subscribe("_action_cable_internal") do |on| on.subscribe do |chan, count| @subscription_lock.synchronize do if count == 1 @@ -80,7 +85,7 @@ module ActionCable if callbacks = @subscribe_callbacks[chan] next_callback = callbacks.shift - Concurrent.global_io_executor << next_callback if next_callback + @event_loop.post(&next_callback) if next_callback @subscribe_callbacks.delete(chan) if callbacks.empty? end end @@ -106,7 +111,7 @@ module ActionCable return if @thread.nil? when_connected do - send_command('unsubscribe') + send_command("unsubscribe") @raw_client = nil end end @@ -118,18 +123,18 @@ module ActionCable @subscription_lock.synchronize do ensure_listener_running @subscribe_callbacks[channel] << on_success - when_connected { send_command('subscribe', channel) } + when_connected { send_command("subscribe", channel) } end end def remove_channel(channel) @subscription_lock.synchronize do - when_connected { send_command('unsubscribe', channel) } + when_connected { send_command("unsubscribe", channel) } end end def invoke_callback(*) - Concurrent.global_io_executor.post { super } + @event_loop.post { super } end private diff --git a/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb b/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb index 37eed09793..4ec513e3ba 100644 --- a/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb +++ b/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb @@ -32,7 +32,11 @@ module ActionCable end def broadcast(channel, message) - list = @sync.synchronize { @subscribers[channel].dup } + list = @sync.synchronize do + return if !@subscribers.key?(channel) + @subscribers[channel].dup + end + list.each do |subscriber| invoke_callback(subscriber, message) end diff --git a/actioncable/lib/action_cable/version.rb b/actioncable/lib/action_cable/version.rb index e17877202b..d6081409f0 100644 --- a/actioncable/lib/action_cable/version.rb +++ b/actioncable/lib/action_cable/version.rb @@ -1,4 +1,4 @@ -require_relative 'gem_version' +require_relative "gem_version" module ActionCable # Returns the version of the currently loaded Action Cable as a <tt>Gem::Version</tt> diff --git a/actioncable/lib/rails/generators/channel/USAGE b/actioncable/lib/rails/generators/channel/USAGE index 27a934c689..6249553c22 100644 --- a/actioncable/lib/rails/generators/channel/USAGE +++ b/actioncable/lib/rails/generators/channel/USAGE @@ -3,7 +3,7 @@ Description: Stubs out a new cable channel for the server (in Ruby) and client (in CoffeeScript). Pass the channel name, either CamelCased or under_scored, and an optional list of channel actions as arguments. - Note: Turn on the cable connection in app/assets/javascript/cable.coffee after generating any channels. + Note: Turn on the cable connection in app/assets/javascript/cable.js after generating any channels. Example: ======== diff --git a/actioncable/lib/rails/generators/channel/channel_generator.rb b/actioncable/lib/rails/generators/channel/channel_generator.rb index c5d398810a..20d807c033 100644 --- a/actioncable/lib/rails/generators/channel/channel_generator.rb +++ b/actioncable/lib/rails/generators/channel/channel_generator.rb @@ -10,16 +10,37 @@ module Rails check_class_collision suffix: "Channel" def create_channel_file - template "channel.rb", File.join('app/channels', class_path, "#{file_name}_channel.rb") + template "channel.rb", File.join("app/channels", class_path, "#{file_name}_channel.rb") if options[:assets] - template "assets/channel.coffee", File.join('app/assets/javascripts/channels', class_path, "#{file_name}.coffee") + if self.behavior == :invoke + template "assets/cable.js", "app/assets/javascripts/cable.js" + end + + js_template "assets/channel", File.join("app/assets/javascripts/channels", class_path, "#{file_name}") end + + generate_application_cable_files end protected def file_name - @_file_name ||= super.gsub(/\_channel/i, '') + @_file_name ||= super.gsub(/_channel/i, "") + end + + # FIXME: Change these files to symlinks once RubyGems 2.5.0 is required. + def generate_application_cable_files + return if self.behavior != :invoke + + files = [ + "application_cable/channel.rb", + "application_cable/connection.rb" + ] + + files.each do |name| + path = File.join("app/channels/", name) + template(name, path) if !File.exist?(path) + end end end end diff --git a/actioncable/lib/rails/generators/channel/templates/application_cable/channel.rb b/actioncable/lib/rails/generators/channel/templates/application_cable/channel.rb new file mode 100644 index 0000000000..d672697283 --- /dev/null +++ b/actioncable/lib/rails/generators/channel/templates/application_cable/channel.rb @@ -0,0 +1,4 @@ +module ApplicationCable + class Channel < ActionCable::Channel::Base + end +end diff --git a/actioncable/lib/rails/generators/channel/templates/application_cable/connection.rb b/actioncable/lib/rails/generators/channel/templates/application_cable/connection.rb new file mode 100644 index 0000000000..0ff5442f47 --- /dev/null +++ b/actioncable/lib/rails/generators/channel/templates/application_cable/connection.rb @@ -0,0 +1,4 @@ +module ApplicationCable + class Connection < ActionCable::Connection::Base + end +end diff --git a/actioncable/lib/rails/generators/channel/templates/assets/cable.js b/actioncable/lib/rails/generators/channel/templates/assets/cable.js new file mode 100644 index 0000000000..739aa5f022 --- /dev/null +++ b/actioncable/lib/rails/generators/channel/templates/assets/cable.js @@ -0,0 +1,13 @@ +// Action Cable provides the framework to deal with WebSockets in Rails. +// You can generate new channels where WebSocket features live using the `rails generate channel` command. +// +//= require action_cable +//= require_self +//= require_tree ./channels + +(function() { + this.App || (this.App = {}); + + App.cable = ActionCable.createConsumer(); + +}).call(this); diff --git a/actioncable/lib/rails/generators/channel/templates/assets/channel.js b/actioncable/lib/rails/generators/channel/templates/assets/channel.js new file mode 100644 index 0000000000..ab0e68b11a --- /dev/null +++ b/actioncable/lib/rails/generators/channel/templates/assets/channel.js @@ -0,0 +1,18 @@ +App.<%= class_name.underscore %> = App.cable.subscriptions.create("<%= class_name %>Channel", { + connected: function() { + // Called when the subscription is ready for use on the server + }, + + disconnected: function() { + // Called when the subscription has been terminated by the server + }, + + received: function(data) { + // Called when there's incoming data on the websocket for this channel + }<%= actions.any? ? ",\n" : '' %> +<% actions.each do |action| -%> + <%=action %>: function() { + return this.perform('<%= action %>'); + }<%= action == actions[-1] ? '' : ",\n" %> +<% end -%> +}); diff --git a/actioncable/lib/rails/generators/channel/templates/channel.rb b/actioncable/lib/rails/generators/channel/templates/channel.rb index 7bff3341c1..4bcfb2be4d 100644 --- a/actioncable/lib/rails/generators/channel/templates/channel.rb +++ b/actioncable/lib/rails/generators/channel/templates/channel.rb @@ -1,4 +1,3 @@ -# Be sure to restart your server when you modify this file. Action Cable runs in a loop that does not support auto reloading. <% module_namespacing do -%> class <%= class_name %>Channel < ApplicationCable::Channel def subscribed |