diff options
Diffstat (limited to 'actioncable/lib')
47 files changed, 1224 insertions, 632 deletions
diff --git a/actioncable/lib/action_cable.rb b/actioncable/lib/action_cable.rb index 1dc66ef3ad..b6d2842867 100644 --- a/actioncable/lib/action_cable.rb +++ b/actioncable/lib/action_cable.rb @@ -29,13 +29,14 @@ module ActionCable extend ActiveSupport::Autoload INTERNAL = { - identifiers: { - ping: '_ping'.freeze - }, message_types: { + welcome: 'welcome'.freeze, + ping: 'ping'.freeze, confirmation: 'confirm_subscription'.freeze, rejection: 'reject_subscription'.freeze - } + }, + default_mount_path: '/cable'.freeze, + protocols: ["actioncable-v1-json".freeze, "actioncable-unsupported".freeze].freeze } # Singleton instance of the server diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb index 88cdc1cab1..845b747fc5 100644 --- a/actioncable/lib/action_cable/channel/base.rb +++ b/actioncable/lib/action_cable/channel/base.rb @@ -32,8 +32,11 @@ module ActionCable # # == Action processing # - # Unlike Action Controllers, channels do not follow a REST constraint form for its actions. It's a remote-procedure call model. You can - # declare any public method on the channel (optionally taking a data argument), and this method is automatically exposed as callable to the client. + # Unlike subclasses of ActionController::Base, channels do not follow a RESTful + # constraint form for their actions. Instead, Action Cable operates through a + # remote-procedure call model. You can declare any public method on the + # channel (optionally taking a <tt>data</tt> argument), and this method is + # automatically exposed as callable to the client. # # Example: # @@ -60,18 +63,22 @@ module ActionCable # end # end # - # In this example, subscribed/unsubscribed are not callable methods, as they were already declared in ActionCable::Channel::Base, but #appear/away - # are. #generate_connection_token is also not callable as its a private method. You'll see that appear accepts a data parameter, which it then - # uses as part of its model call. #away does not, it's simply a trigger action. + # In this example, the subscribed and unsubscribed methods are not callable methods, as they + # were already declared in ActionCable::Channel::Base, but <tt>#appear</tt> + # and <tt>#away</tt> are. <tt>#generate_connection_token</tt> is also not + # callable, since it's a private method. You'll see that appear accepts a data + # parameter, which it then uses as part of its model call. <tt>#away</tt> + # does not, since it's simply a trigger action. # - # Also note that in this example, current_user is available because it was marked as an identifying attribute on the connection. - # All such identifiers will automatically create a delegation method of the same name on the channel instance. + # Also note that in this example, <tt>current_user</tt> is available because + # it was marked as an identifying attribute on the connection. All such + # identifiers will automatically create a delegation method of the same name + # on the channel instance. # # == Rejecting subscription requests # - # A channel can reject a subscription request in the #subscribed callback by invoking #reject! - # - # Example: + # A channel can reject a subscription request in the #subscribed callback by + # invoking the #reject method: # # class ChatChannel < ApplicationCable::Channel # def subscribed @@ -80,8 +87,10 @@ module ActionCable # end # end # - # In this example, the subscription will be rejected if the current_user does not have access to the chat room. - # On the client-side, Channel#rejected callback will get invoked when the server rejects the subscription request. + # In this example, the subscription will be rejected if the + # <tt>current_user</tt> does not have access to the chat room. On the + # client-side, the <tt>Channel#rejected</tt> callback will get invoked when + # the server rejects the subscription request. class Base include Callbacks include PeriodicTimers @@ -116,7 +125,7 @@ module ActionCable protected # action_methods are cached and there is sometimes need to refresh # them. ::clear_action_methods! allows you to do that, so next time - # you run action_methods, they will be recalculated + # you run action_methods, they will be recalculated. def clear_action_methods! @action_methods = nil end @@ -151,15 +160,18 @@ module ActionCable action = extract_action(data) if processable_action?(action) - dispatch_action(action, data) + payload = { channel_class: self.class.name, action: action, data: data } + ActiveSupport::Notifications.instrument("perform_action.action_cable", payload) do + dispatch_action(action, data) + end else logger.error "Unable to process #{action_signature(action, data)}" end end - # Called by the cable connection when its cut so the channel has a chance to cleanup with callbacks. + # Called by the cable connection when it's cut, so the channel has a chance to cleanup with callbacks. # This method is not intended to be called directly by the user. Instead, overwrite the #unsubscribed callback. - def unsubscribe_from_channel + def unsubscribe_from_channel # :nodoc: run_callbacks :unsubscribe do unsubscribed end @@ -174,7 +186,7 @@ module ActionCable end # Called once a consumer has cut its cable connection. Can be used for cleaning up connections or marking - # people as offline or the like. + # users as offline or the like. def unsubscribed # Override in subclasses end @@ -182,8 +194,12 @@ module ActionCable # Transmit a hash of data to the subscriber. The hash will automatically be wrapped in a JSON envelope with # the proper channel identifier marked as the recipient. def transmit(data, via: nil) - logger.info "#{self.class.name} transmitting #{data.inspect}".tap { |m| m << " (via #{via})" if via } - connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, message: data) + logger.info "#{self.class.name} transmitting #{data.inspect.truncate(300)}".tap { |m| m << " (via #{via})" if via } + + payload = { channel_class: self.class.name, data: data, via: via } + ActiveSupport::Notifications.instrument("transmit.action_cable", payload) do + connection.transmit identifier: @identifier, message: data + end end def defer_subscription_confirmation! @@ -215,7 +231,6 @@ module ActionCable end end - def subscribe_to_channel run_callbacks :subscribe do subscribed @@ -228,7 +243,6 @@ module ActionCable end end - def extract_action(data) (data['action'].presence || :receive).to_sym end @@ -258,8 +272,11 @@ module ActionCable def transmit_subscription_confirmation unless subscription_confirmation_sent? logger.info "#{self.class.name} is transmitting the subscription confirmation" - connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:confirmation]) - @subscription_confirmation_sent = true + + ActiveSupport::Notifications.instrument("transmit_subscription_confirmation.action_cable", channel_class: self.class.name) do + connection.transmit identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:confirmation] + @subscription_confirmation_sent = true + end end end @@ -270,7 +287,10 @@ module ActionCable def transmit_subscription_rejection logger.info "#{self.class.name} is transmitting the subscription rejection" - connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:rejection]) + + ActiveSupport::Notifications.instrument("transmit_subscription_rejection.action_cable", channel_class: self.class.name) do + connection.transmit identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:rejection] + end end end end diff --git a/actioncable/lib/action_cable/channel/naming.rb b/actioncable/lib/action_cable/channel/naming.rb index 4c9d53b15a..8e1b2a4af0 100644 --- a/actioncable/lib/action_cable/channel/naming.rb +++ b/actioncable/lib/action_cable/channel/naming.rb @@ -10,6 +10,7 @@ module ActionCable # # ChatChannel.channel_name # => 'chat' # Chats::AppearancesChannel.channel_name # => 'chats:appearances' + # FooChats::BarAppearancesChannel.channel_name # => 'foo_chats:bar_appearances' def channel_name @channel_name ||= name.sub(/Channel$/, '').gsub('::',':').underscore end diff --git a/actioncable/lib/action_cable/channel/periodic_timers.rb b/actioncable/lib/action_cable/channel/periodic_timers.rb index 7f0fb37afc..dab604440f 100644 --- a/actioncable/lib/action_cable/channel/periodic_timers.rb +++ b/actioncable/lib/action_cable/channel/periodic_timers.rb @@ -12,11 +12,42 @@ module ActionCable end module ClassMethods - # Allow you to call a private method <tt>every</tt> so often seconds. This periodic timer can be useful - # for sending a steady flow of updates to a client based off an object that was configured on subscription. - # It's an alternative to using streams if the channel is able to do the work internally. - def periodically(callback, every:) - self.periodic_timers += [ [ callback, every: every ] ] + # Periodically performs a task on the channel, like updating an online + # user counter, polling a backend for new status messages, sending + # regular "heartbeat" messages, or doing some internal work and giving + # progress updates. + # + # Pass a method name or lambda argument or provide a block to call. + # Specify the calling period in seconds using the <tt>every:</tt> + # keyword argument. + # + # periodically :transmit_progress, every: 5.seconds + # + # periodically every: 3.minutes do + # transmit action: :update_count, count: current_count + # end + # + def periodically(callback_or_method_name = nil, every:, &block) + callback = + if block_given? + raise ArgumentError, 'Pass a block or provide a callback arg, not both' if callback_or_method_name + block + else + case callback_or_method_name + when Proc + callback_or_method_name + when Symbol + -> { __send__ callback_or_method_name } + else + raise ArgumentError, "Expected a Symbol method name or a Proc, got #{callback_or_method_name.inspect}" + end + end + + unless every.kind_of?(Numeric) && every > 0 + raise ArgumentError, "Expected every: to be a positive number of seconds, got #{every.inspect}" + end + + self.periodic_timers += [[ callback, every: every ]] end end @@ -27,14 +58,21 @@ module ActionCable def start_periodic_timers self.class.periodic_timers.each do |callback, options| - active_periodic_timers << EventMachine::PeriodicTimer.new(options[:every]) do - connection.worker_pool.async_run_periodic_timer(self, callback) + active_periodic_timers << start_periodic_timer(callback, every: options.fetch(:every)) + end + end + + def start_periodic_timer(callback, every:) + connection.server.event_loop.timer every do + connection.worker_pool.async_invoke connection do + instance_exec(&callback) end end end def stop_periodic_timers - active_periodic_timers.each { |timer| timer.cancel } + active_periodic_timers.each { |timer| timer.shutdown } + active_periodic_timers.clear end end end diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index e2876ef6fa..561750d713 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -1,8 +1,8 @@ module ActionCable module Channel - # Streams allow channels to route broadcastings to the subscriber. A broadcasting is, as discussed elsewhere, a pub/sub queue where any data - # put into it is automatically sent to the clients that are connected at that time. It's purely an online queue, though. If you're not - # streaming a broadcasting at the very moment it sends out an update, you'll not get that update when connecting later. + # Streams allow channels to route broadcastings to the subscriber. A broadcasting is, as discussed elsewhere, a pubsub queue where any data + # placed into it is automatically sent to the clients that are connected at that time. It's purely an online queue, though. If you're not + # streaming a broadcasting at the very moment it sends out an update, you will not get that update, even if you connect after it has been sent. # # Most commonly, the streamed broadcast is sent straight to the subscriber on the client-side. The channel just acts as a connector between # the two parties (the broadcaster and the channel subscriber). Here's an example of a channel that allows subscribers to get all new @@ -18,13 +18,15 @@ module ActionCable # end # end # - # So the subscribers of this channel will get whatever data is put into the, let's say, `comments_for_45` broadcasting as soon as it's put there. - # That looks like so from that side of things: + # Based on the above example, the subscribers of this channel will get whatever data is put into the, + # let's say, <tt>comments_for_45</tt> broadcasting as soon as it's put there. + # + # An example broadcasting for this channel looks like so: # # ActionCable.server.broadcast "comments_for_45", author: 'DHH', content: 'Rails is just swell' # # If you have a stream that is related to a model, then the broadcasting used can be generated from the model and channel. - # The following example would subscribe to a broadcasting like `comments:Z2lkOi8vVGVzdEFwcC9Qb3N0LzE` + # The following example would subscribe to a broadcasting like <tt>comments:Z2lkOi8vVGVzdEFwcC9Qb3N0LzE</tt>. # # class CommentsChannel < ApplicationCable::Channel # def subscribed @@ -37,26 +39,25 @@ module ActionCable # # CommentsChannel.broadcast_to(@post, @comment) # - # If you don't just want to parlay the broadcast unfiltered to the subscriber, you can supply a callback that lets you alter what goes out. - # Example below shows how you can use this to provide performance introspection in the process: + # If you don't just want to parlay the broadcast unfiltered to the subscriber, you can also supply a callback that lets you alter what is sent out. + # The below example shows how you can use this to provide performance introspection in the process: # # class ChatChannel < ApplicationCable::Channel - # def subscribed - # @room = Chat::Room[params[:room_number]] - # - # stream_for @room, -> (encoded_message) do - # message = ActiveSupport::JSON.decode(encoded_message) + # def subscribed + # @room = Chat::Room[params[:room_number]] # - # if message['originated_at'].present? - # elapsed_time = (Time.now.to_f - message['originated_at']).round(2) + # stream_for @room, coder: ActiveSupport::JSON do |message| + # if message['originated_at'].present? + # elapsed_time = (Time.now.to_f - message['originated_at']).round(2) # - # ActiveSupport::Notifications.instrument :performance, measurement: 'Chat.message_delay', value: elapsed_time, action: :timing - # logger.info "Message took #{elapsed_time}s to arrive" - # end + # ActiveSupport::Notifications.instrument :performance, measurement: 'Chat.message_delay', value: elapsed_time, action: :timing + # logger.info "Message took #{elapsed_time}s to arrive" + # end # - # transmit message - # end - # end + # transmit message + # end + # end + # end # # You can stop streaming from all broadcasts by calling #stop_all_streams. module Streams @@ -68,15 +69,21 @@ module ActionCable # Start streaming from the named <tt>broadcasting</tt> pubsub queue. Optionally, you can pass a <tt>callback</tt> that'll be used # instead of the default of just transmitting the updates straight to the subscriber. - def stream_from(broadcasting, callback = nil) - # Hold off the confirmation until pubsub#subscribe is successful + # Pass `coder: ActiveSupport::JSON` to decode messages as JSON before passing to the callback. + # Defaults to `coder: nil` which does no decoding, passes raw messages. + def stream_from(broadcasting, callback = nil, coder: nil, &block) + broadcasting = String(broadcasting) + + # Don't send the confirmation until pubsub#subscribe is successful defer_subscription_confirmation! - callback ||= default_stream_callback(broadcasting) - streams << [ broadcasting, callback ] + # Build a stream handler by wrapping the user-provided callback with + # a decoder or defaulting to a JSON-decoding retransmitter. + handler = worker_pool_stream_handler(broadcasting, callback || block, coder: coder) + streams << [ broadcasting, handler ] - EM.next_tick do - pubsub.subscribe(broadcasting, callback, lambda do + connection.server.event_loop.post do + pubsub.subscribe(broadcasting, handler, lambda do transmit_subscription_confirmation logger.info "#{self.class.name} is streaming from #{broadcasting}" end) @@ -86,10 +93,14 @@ module ActionCable # Start streaming the pubsub queue for the <tt>model</tt> in this channel. Optionally, you can pass a # <tt>callback</tt> that'll be used instead of the default of just transmitting the updates straight # to the subscriber. - def stream_for(model, callback = nil) - stream_from(broadcasting_for([ channel_name, model ]), callback) + # + # Pass `coder: ActiveSupport::JSON` to decode messages as JSON before passing to the callback. + # Defaults to `coder: nil` which does no decoding, passes raw messages. + def stream_for(model, callback = nil, coder: nil, &block) + stream_from(broadcasting_for([ channel_name, model ]), callback || block, coder: coder) end + # Unsubscribes all streams associated with this channel from the pubsub queue. def stop_all_streams streams.each do |broadcasting, callback| pubsub.unsubscribe broadcasting, callback @@ -104,11 +115,60 @@ module ActionCable @_streams ||= [] end - def default_stream_callback(broadcasting) + # Always wrap the outermost handler to invoke the user handler on the + # worker pool rather than blocking the event loop. + def worker_pool_stream_handler(broadcasting, user_handler, coder: nil) + handler = stream_handler(broadcasting, user_handler, coder: coder) + + -> message do + connection.worker_pool.async_invoke handler, :call, message, connection: connection + end + end + + # May be overridden to add instrumentation, logging, specialized error + # handling, or other forms of handler decoration. + # + # TODO: Tests demonstrating this. + def stream_handler(broadcasting, user_handler, coder: nil) + if user_handler + stream_decoder user_handler, coder: coder + else + default_stream_handler broadcasting, coder: coder + end + end + + # May be overridden to change the default stream handling behavior + # which decodes JSON and transmits to the client. + # + # TODO: Tests demonstrating this. + # + # TODO: Room for optimization. Update transmit API to be coder-aware + # so we can no-op when pubsub and connection are both JSON-encoded. + # Then we can skip decode+encode if we're just proxying messages. + def default_stream_handler(broadcasting, coder:) + coder ||= ActiveSupport::JSON + stream_transmitter stream_decoder(coder: coder), broadcasting: broadcasting + end + + def stream_decoder(handler = identity_handler, coder:) + if coder + -> message { handler.(coder.decode(message)) } + else + handler + end + end + + def stream_transmitter(handler = identity_handler, broadcasting:) + via = "streamed from #{broadcasting}" + -> (message) do - transmit ActiveSupport::JSON.decode(message), via: "streamed from #{broadcasting}" + transmit handler.(message), via: via end end + + def identity_handler + -> message { message } + end end end end diff --git a/actioncable/lib/action_cable/connection.rb b/actioncable/lib/action_cable/connection.rb index b672e00682..5f813cf8e0 100644 --- a/actioncable/lib/action_cable/connection.rb +++ b/actioncable/lib/action_cable/connection.rb @@ -5,12 +5,17 @@ module ActionCable eager_autoload do autoload :Authorization autoload :Base + autoload :ClientSocket autoload :Identification autoload :InternalChannel + autoload :FayeClientSocket + autoload :FayeEventLoop autoload :MessageBuffer - autoload :WebSocket + autoload :Stream + autoload :StreamEventLoop autoload :Subscriptions autoload :TaggedLoggerProxy + autoload :WebSocket end end end diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb index bb8850aaa0..75c1299e36 100644 --- a/actioncable/lib/action_cable/connection/base.rb +++ b/actioncable/lib/action_cable/connection/base.rb @@ -2,9 +2,9 @@ require 'action_dispatch' module ActionCable module Connection - # For every WebSocket the cable server is accepting, a Connection object will be instantiated. This instance becomes the parent - # of all the channel subscriptions that are created from there on. Incoming messages are then routed to these channel subscriptions - # based on an identifier sent by the cable consumer. The Connection itself does not deal with any specific application logic beyond + # For every WebSocket connection the Action Cable server accepts, a Connection object will be instantiated. This instance becomes the parent + # of all of the channel subscriptions that are created from there on. Incoming messages are then routed to these channel subscriptions + # based on an identifier sent by the Action Cable consumer. The Connection itself does not deal with any specific application logic beyond # authentication and authorization. # # Here's a basic example: @@ -33,14 +33,14 @@ module ActionCable # end # end # - # First, we declare that this connection can be identified by its current_user. This allows us later to be able to find all connections - # established for that current_user (and potentially disconnect them if the user was removed from an account). You can declare as many - # identification indexes as you like. Declaring an identification means that a attr_accessor is automatically set for that key. + # First, we declare that this connection can be identified by its current_user. This allows us to later be able to find all connections + # established for that current_user (and potentially disconnect them). You can declare as many + # identification indexes as you like. Declaring an identification means that an attr_accessor is automatically set for that key. # # Second, we rely on the fact that the WebSocket connection is established with the cookies from the domain being sent along. This makes # it easy to use signed cookies that were set when logging in via a web interface to authorize the WebSocket connection. # - # Finally, we add a tag to the connection-specific logger with name of the current user to easily distinguish their messages in the log. + # Finally, we add a tag to the connection-specific logger with the name of the current user to easily distinguish their messages in the log. # # Pretty simple, eh? class Base @@ -48,15 +48,16 @@ module ActionCable include InternalChannel include Authorization - attr_reader :server, :env, :subscriptions, :logger - delegate :worker_pool, :pubsub, to: :server + attr_reader :server, :env, :subscriptions, :logger, :worker_pool, :protocol + delegate :event_loop, :pubsub, to: :server - def initialize(server, env) - @server, @env = server, env + def initialize(server, env, coder: ActiveSupport::JSON) + @server, @env, @coder = server, env, coder + @worker_pool = server.worker_pool @logger = new_tagged_logger - @websocket = ActionCable::Connection::WebSocket.new(env) + @websocket = ActionCable::Connection::WebSocket.new(env, self, event_loop, server.config.client_socket_class) @subscriptions = ActionCable::Connection::Subscriptions.new(self) @message_buffer = ActionCable::Connection::MessageBuffer.new(self) @@ -65,35 +66,33 @@ module ActionCable end # Called by the server when a new WebSocket connection is established. This configures the callbacks intended for overwriting by the user. - # This method should not be called directly. Rely on the #connect (and #disconnect) callback instead. - def process + # This method should not be called directly -- instead rely upon on the #connect (and #disconnect) callbacks. + def process #:nodoc: logger.info started_request_message if websocket.possible? && allow_request_origin? - websocket.on(:open) { |event| send_async :on_open } - websocket.on(:message) { |event| on_message event.data } - websocket.on(:close) { |event| send_async :on_close } - respond_to_successful_request else respond_to_invalid_request end end - # Data received over the cable is handled by this method. It's expected that everything inbound is JSON encoded. - # The data is routed to the proper channel that the connection has subscribed to. - def receive(data_in_json) + # Decodes WebSocket messages and dispatches them to subscribed channels. + # WebSocket message transfer encoding is always JSON. + def receive(websocket_message) #:nodoc: + send_async :dispatch_websocket_message, websocket_message + end + + def dispatch_websocket_message(websocket_message) #:nodoc: if websocket.alive? - subscriptions.execute_command ActiveSupport::JSON.decode(data_in_json) + subscriptions.execute_command decode(websocket_message) else - logger.error "Received data without a live WebSocket (#{data_in_json.inspect})" + logger.error "Ignoring message processed after the WebSocket was closed: #{websocket_message.inspect})" end end - # Send raw data straight back down the WebSocket. This is not intended to be called directly. Use the #transmit available on the - # Channel instead, as that'll automatically address the correct subscriber and wrap the message in JSON. - def transmit(data) - websocket.transmit data + def transmit(cable_message) # :nodoc: + websocket.transmit encode(cable_message) end # Close the WebSocket connection. @@ -118,7 +117,23 @@ module ActionCable end def beat - transmit ActiveSupport::JSON.encode(identifier: ActionCable::INTERNAL[:identifiers][:ping], message: Time.now.to_i) + transmit type: ActionCable::INTERNAL[:message_types][:ping], message: Time.now.to_i + end + + def on_open # :nodoc: + send_async :handle_open + end + + def on_message(message) # :nodoc: + message_buffer.append message + end + + def on_error(message) # :nodoc: + # ignore + end + + def on_close(reason, code) # :nodoc: + send_async :handle_close end protected @@ -139,10 +154,19 @@ module ActionCable attr_reader :message_buffer private - def on_open + def encode(cable_message) + @coder.encode cable_message + end + + def decode(websocket_message) + @coder.decode websocket_message + end + + def handle_open + @protocol = websocket.protocol connect if respond_to?(:connect) subscribe_to_internal_channel - beat + send_welcome_message message_buffer.process! server.add_connection(self) @@ -150,11 +174,7 @@ module ActionCable respond_to_invalid_request end - def on_message(message) - message_buffer.append message - end - - def on_close + def handle_close logger.info finished_request_message server.remove_connection(self) @@ -165,6 +185,13 @@ module ActionCable disconnect if respond_to?(:disconnect) end + def send_welcome_message + # Send welcome message to the internal connection monitor channel. + # This ensures the connection monitor state is reset after a successful + # websocket connection. + transmit type: ActionCable::INTERNAL[:message_types][:welcome] + end + def allow_request_origin? return true if server.config.disable_request_forgery_protection @@ -177,12 +204,14 @@ module ActionCable end def respond_to_successful_request + logger.info successful_request_message websocket.rack_response end def respond_to_invalid_request close if websocket.alive? + logger.error invalid_request_message logger.info finished_request_message [ 404, { 'Content-Type' => 'text/plain' }, [ 'Page not found' ] ] end @@ -197,7 +226,7 @@ module ActionCable 'Started %s "%s"%s for %s at %s' % [ request.request_method, request.filtered_path, - websocket.possible? ? ' [WebSocket]' : '', + websocket.possible? ? ' [WebSocket]' : '[non-WebSocket]', request.ip, Time.now.to_s ] end @@ -205,10 +234,22 @@ module ActionCable def finished_request_message 'Finished "%s"%s for %s at %s' % [ request.filtered_path, - websocket.possible? ? ' [WebSocket]' : '', + websocket.possible? ? ' [WebSocket]' : '[non-WebSocket]', request.ip, Time.now.to_s ] end + + def invalid_request_message + 'Failed to upgrade to WebSocket (REQUEST_METHOD: %s, HTTP_CONNECTION: %s, HTTP_UPGRADE: %s)' % [ + env["REQUEST_METHOD"], env["HTTP_CONNECTION"], env["HTTP_UPGRADE"] + ] + end + + def successful_request_message + 'Successfully upgraded to WebSocket (REQUEST_METHOD: %s, HTTP_CONNECTION: %s, HTTP_UPGRADE: %s)' % [ + env["REQUEST_METHOD"], env["HTTP_CONNECTION"], env["HTTP_UPGRADE"] + ] + end end end end diff --git a/actioncable/lib/action_cable/connection/client_socket.rb b/actioncable/lib/action_cable/connection/client_socket.rb new file mode 100644 index 0000000000..6f29f32ea9 --- /dev/null +++ b/actioncable/lib/action_cable/connection/client_socket.rb @@ -0,0 +1,155 @@ +require 'websocket/driver' + +module ActionCable + module Connection + #-- + # This class is heavily based on faye-websocket-ruby + # + # Copyright (c) 2010-2015 James Coglan + class ClientSocket # :nodoc: + def self.determine_url(env) + scheme = secure_request?(env) ? 'wss:' : 'ws:' + "#{ scheme }//#{ env['HTTP_HOST'] }#{ env['REQUEST_URI'] }" + end + + def self.secure_request?(env) + return true if env['HTTPS'] == 'on' + return true if env['HTTP_X_FORWARDED_SSL'] == 'on' + return true if env['HTTP_X_FORWARDED_SCHEME'] == 'https' + return true if env['HTTP_X_FORWARDED_PROTO'] == 'https' + return true if env['rack.url_scheme'] == 'https' + + return false + end + + CONNECTING = 0 + OPEN = 1 + CLOSING = 2 + CLOSED = 3 + + attr_reader :env, :url + + def initialize(env, event_target, event_loop, protocols) + @env = env + @event_target = event_target + @event_loop = event_loop + + @url = ClientSocket.determine_url(@env) + + @driver = @driver_started = nil + @close_params = ['', 1006] + + @ready_state = CONNECTING + + # The driver calls +env+, +url+, and +write+ + @driver = ::WebSocket::Driver.rack(self, protocols: protocols) + + @driver.on(:open) { |e| open } + @driver.on(:message) { |e| receive_message(e.data) } + @driver.on(:close) { |e| begin_close(e.reason, e.code) } + @driver.on(:error) { |e| emit_error(e.message) } + + @stream = ActionCable::Connection::Stream.new(@event_loop, self) + end + + def start_driver + return if @driver.nil? || @driver_started + @stream.hijack_rack_socket + + if callback = @env['async.callback'] + callback.call([101, {}, @stream]) + end + + @driver_started = true + @driver.start + end + + def rack_response + start_driver + [ -1, {}, [] ] + end + + def write(data) + @stream.write(data) + rescue => e + emit_error e.message + end + + def transmit(message) + return false if @ready_state > OPEN + case message + when Numeric then @driver.text(message.to_s) + when String then @driver.text(message) + when Array then @driver.binary(message) + else false + end + end + + def close(code = nil, reason = nil) + code ||= 1000 + reason ||= '' + + unless code == 1000 or (code >= 3000 and code <= 4999) + raise ArgumentError, "Failed to execute 'close' on WebSocket: " + + "The code must be either 1000, or between 3000 and 4999. " + + "#{code} is neither." + end + + @ready_state = CLOSING unless @ready_state == CLOSED + @driver.close(reason, code) + end + + def parse(data) + @driver.parse(data) + end + + def client_gone + finalize_close + end + + def alive? + @ready_state == OPEN + end + + def protocol + @driver.protocol + end + + private + def open + return unless @ready_state == CONNECTING + @ready_state = OPEN + + @event_target.on_open + end + + def receive_message(data) + return unless @ready_state == OPEN + + @event_target.on_message(data) + end + + def emit_error(message) + return if @ready_state >= CLOSING + + @event_target.on_error(message) + end + + def begin_close(reason, code) + return if @ready_state == CLOSED + @ready_state = CLOSING + @close_params = [reason, code] + + @stream.shutdown if @stream + finalize_close + end + + def finalize_close + return if @ready_state == CLOSED + @ready_state = CLOSED + + @event_target.on_close(*@close_params) + end + end + end +end diff --git a/actioncable/lib/action_cable/connection/faye_client_socket.rb b/actioncable/lib/action_cable/connection/faye_client_socket.rb new file mode 100644 index 0000000000..a4bfe7db17 --- /dev/null +++ b/actioncable/lib/action_cable/connection/faye_client_socket.rb @@ -0,0 +1,48 @@ +require 'faye/websocket' + +module ActionCable + module Connection + class FayeClientSocket + def initialize(env, event_target, stream_event_loop, protocols) + @env = env + @event_target = event_target + @protocols = protocols + + @faye = nil + end + + def alive? + @faye && @faye.ready_state == Faye::WebSocket::API::OPEN + end + + def transmit(data) + connect + @faye.send data + end + + def close + @faye && @faye.close + end + + def protocol + @faye && @faye.protocol + end + + def rack_response + connect + @faye.rack_response + end + + private + def connect + return if @faye + @faye = Faye::WebSocket.new(@env, @protocols) + + @faye.on(:open) { |event| @event_target.on_open } + @faye.on(:message) { |event| @event_target.on_message(event.data) } + @faye.on(:close) { |event| @event_target.on_close(event.reason, event.code) } + @faye.on(:error) { |event| @event_target.on_error(event.message) } + end + end + end +end diff --git a/actioncable/lib/action_cable/connection/faye_event_loop.rb b/actioncable/lib/action_cable/connection/faye_event_loop.rb new file mode 100644 index 0000000000..9c44b38bc3 --- /dev/null +++ b/actioncable/lib/action_cable/connection/faye_event_loop.rb @@ -0,0 +1,44 @@ +require 'thread' + +require 'eventmachine' +EventMachine.epoll if EventMachine.epoll? +EventMachine.kqueue if EventMachine.kqueue? + +module ActionCable + module Connection + class FayeEventLoop + @@mutex = Mutex.new + + def timer(interval, &block) + ensure_reactor_running + EMTimer.new(::EM::PeriodicTimer.new(interval, &block)) + end + + def post(task = nil, &block) + task ||= block + + ensure_reactor_running + ::EM.next_tick(&task) + end + + private + def ensure_reactor_running + return if EventMachine.reactor_running? + @@mutex.synchronize do + Thread.new { EventMachine.run } unless EventMachine.reactor_running? + Thread.pass until EventMachine.reactor_running? + end + end + + class EMTimer + def initialize(inner) + @inner = inner + end + + def shutdown + @inner.cancel + end + end + end + end +end diff --git a/actioncable/lib/action_cable/connection/identification.rb b/actioncable/lib/action_cable/connection/identification.rb index 885ff3f102..4a54044aff 100644 --- a/actioncable/lib/action_cable/connection/identification.rb +++ b/actioncable/lib/action_cable/connection/identification.rb @@ -12,7 +12,7 @@ module ActionCable class_methods do # Mark a key as being a connection identifier index that can then be used to find the specific connection again later. - # Common identifiers are current_user and current_account, but could be anything really. + # Common identifiers are current_user and current_account, but could be anything, really. # # Note that anything marked as an identifier will automatically create a delegate by the same name on any # channel instances created off the connection. diff --git a/actioncable/lib/action_cable/connection/internal_channel.rb b/actioncable/lib/action_cable/connection/internal_channel.rb index 54ed7672d2..f70d52f99b 100644 --- a/actioncable/lib/action_cable/connection/internal_channel.rb +++ b/actioncable/lib/action_cable/connection/internal_channel.rb @@ -11,24 +11,22 @@ module ActionCable def subscribe_to_internal_channel if connection_identifier.present? - callback = -> (message) { process_internal_message(message) } + callback = -> (message) { process_internal_message decode(message) } @_internal_subscriptions ||= [] @_internal_subscriptions << [ internal_channel, callback ] - EM.next_tick { pubsub.subscribe(internal_channel, callback) } + server.event_loop.post { pubsub.subscribe(internal_channel, callback) } logger.info "Registered connection (#{connection_identifier})" end end def unsubscribe_from_internal_channel if @_internal_subscriptions.present? - @_internal_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe(channel, callback) } } + @_internal_subscriptions.each { |channel, callback| server.event_loop.post { pubsub.unsubscribe(channel, callback) } } end end def process_internal_message(message) - message = ActiveSupport::JSON.decode(message) - case message['type'] when 'disconnect' logger.info "Removing connection (#{connection_identifier})" diff --git a/actioncable/lib/action_cable/connection/message_buffer.rb b/actioncable/lib/action_cable/connection/message_buffer.rb index 2f65a1e84a..6a80770cae 100644 --- a/actioncable/lib/action_cable/connection/message_buffer.rb +++ b/actioncable/lib/action_cable/connection/message_buffer.rb @@ -1,8 +1,7 @@ module ActionCable module Connection - # Allows us to buffer messages received from the WebSocket before the Connection has been fully initialized and is ready to receive them. - # Entirely internal operation and should not be used directly by the user. - class MessageBuffer + # Allows us to buffer messages received from the WebSocket before the Connection has been fully initialized, and is ready to receive them. + class MessageBuffer # :nodoc: def initialize(connection) @connection = connection @buffered_messages = [] @@ -31,7 +30,7 @@ module ActionCable protected attr_reader :connection - attr_accessor :buffered_messages + attr_reader :buffered_messages private def valid?(message) @@ -39,7 +38,7 @@ module ActionCable end def receive(message) - connection.send_async :receive, message + connection.receive message end def buffer(message) diff --git a/actioncable/lib/action_cable/connection/stream.rb b/actioncable/lib/action_cable/connection/stream.rb new file mode 100644 index 0000000000..0cf59091bc --- /dev/null +++ b/actioncable/lib/action_cable/connection/stream.rb @@ -0,0 +1,57 @@ +module ActionCable + module Connection + #-- + # This class is heavily based on faye-websocket-ruby + # + # Copyright (c) 2010-2015 James Coglan + class Stream # :nodoc: + def initialize(event_loop, socket) + @event_loop = event_loop + @socket_object = socket + @stream_send = socket.env['stream.send'] + + @rack_hijack_io = nil + end + + def each(&callback) + @stream_send ||= callback + end + + def close + shutdown + @socket_object.client_gone + end + + def shutdown + clean_rack_hijack + end + + def write(data) + return @rack_hijack_io.write(data) if @rack_hijack_io + return @stream_send.call(data) if @stream_send + rescue EOFError, Errno::ECONNRESET + @socket_object.client_gone + end + + def receive(data) + @socket_object.parse(data) + end + + def hijack_rack_socket + return unless @socket_object.env['rack.hijack'] + + @socket_object.env['rack.hijack'].call + @rack_hijack_io = @socket_object.env['rack.hijack_io'] + + @event_loop.attach(@rack_hijack_io, self) + end + + private + def clean_rack_hijack + return unless @rack_hijack_io + @event_loop.detach(@rack_hijack_io, self) + @rack_hijack_io = nil + end + end + end +end diff --git a/actioncable/lib/action_cable/connection/stream_event_loop.rb b/actioncable/lib/action_cable/connection/stream_event_loop.rb new file mode 100644 index 0000000000..2abad09c03 --- /dev/null +++ b/actioncable/lib/action_cable/connection/stream_event_loop.rb @@ -0,0 +1,103 @@ +require 'nio' +require 'thread' + +module ActionCable + module Connection + class StreamEventLoop + def initialize + @nio = @thread = nil + @map = {} + @stopping = false + @todo = Queue.new + + @spawn_mutex = Mutex.new + end + + def timer(interval, &block) + Concurrent::TimerTask.new(execution_interval: interval, &block).tap(&:execute) + end + + def post(task = nil, &block) + task ||= block + + Concurrent.global_io_executor << task + end + + def attach(io, stream) + @todo << lambda do + @map[io] = stream + @nio.register(io, :r) + end + wakeup + end + + def detach(io, stream) + @todo << lambda do + @nio.deregister io + @map.delete io + end + wakeup + end + + def stop + @stopping = true + wakeup if @nio + end + + private + def spawn + return if @thread && @thread.status + + @spawn_mutex.synchronize do + return if @thread && @thread.status + + @nio ||= NIO::Selector.new + @thread = Thread.new { run } + + return true + end + end + + def wakeup + spawn || @nio.wakeup + end + + def run + loop do + if @stopping + @nio.close + break + end + + until @todo.empty? + @todo.pop(true).call + end + + next unless monitors = @nio.select + + monitors.each do |monitor| + io = monitor.io + stream = @map[io] + + begin + stream.receive io.read_nonblock(4096) + rescue IO::WaitReadable + next + rescue + # We expect one of EOFError or Errno::ECONNRESET in + # normal operation (when the client goes away). But if + # anything else goes wrong, this is still the best way + # to handle it. + begin + stream.close + rescue + @nio.deregister io + @map.delete io + end + end + end + end + end + end + end +end diff --git a/actioncable/lib/action_cable/connection/subscriptions.rb b/actioncable/lib/action_cable/connection/subscriptions.rb index d7f95e6a62..6051818bfb 100644 --- a/actioncable/lib/action_cable/connection/subscriptions.rb +++ b/actioncable/lib/action_cable/connection/subscriptions.rb @@ -3,8 +3,8 @@ require 'active_support/core_ext/hash/indifferent_access' module ActionCable module Connection # Collection class for all the channel subscriptions established on a given connection. Responsible for routing incoming commands that arrive on - # the connection to the proper channel. Should not be used directly by the user. - class Subscriptions + # the connection to the proper channel. + class Subscriptions # :nodoc: def initialize(connection) @connection = connection @subscriptions = {} @@ -26,12 +26,12 @@ module ActionCable id_key = data['identifier'] id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access - subscription_klass = connection.server.channel_classes[id_options[:channel]] + subscription_klass = id_options[:channel].safe_constantize - if subscription_klass + if subscription_klass && ActionCable::Channel::Base >= subscription_klass subscriptions[id_key] ||= subscription_klass.new(connection, id_key, id_options) else - logger.error "Subscription class not found (#{data.inspect})" + logger.error "Subscription class not found: #{id_options[:channel].inspect}" end end @@ -54,7 +54,7 @@ module ActionCable end def unsubscribe_from_all - subscriptions.each { |id, channel| channel.unsubscribe_from_channel } + subscriptions.each { |id, channel| remove_subscription(channel) } end protected diff --git a/actioncable/lib/action_cable/connection/web_socket.rb b/actioncable/lib/action_cable/connection/web_socket.rb index 670d5690ae..11f28c37e8 100644 --- a/actioncable/lib/action_cable/connection/web_socket.rb +++ b/actioncable/lib/action_cable/connection/web_socket.rb @@ -1,13 +1,11 @@ -require 'faye/websocket' +require 'websocket/driver' module ActionCable module Connection - # Decorate the Faye::WebSocket with helpers we need. + # Wrap the real socket to minimize the externally-presented API class WebSocket - delegate :rack_response, :close, :on, to: :websocket - - def initialize(env) - @websocket = Faye::WebSocket.websocket?(env) ? Faye::WebSocket.new(env) : nil + def initialize(env, event_target, event_loop, client_socket_class, protocols: ActionCable::INTERNAL[:protocols]) + @websocket = ::WebSocket::Driver.websocket?(env) ? client_socket_class.new(env, event_target, event_loop, protocols) : nil end def possible? @@ -15,11 +13,23 @@ module ActionCable end def alive? - websocket && websocket.ready_state == Faye::WebSocket::API::OPEN + websocket && websocket.alive? end def transmit(data) - websocket.send data + websocket.transmit data + end + + def close + websocket.close + end + + def protocol + websocket.protocol + end + + def rack_response + websocket.rack_response end protected diff --git a/actioncable/lib/action_cable/engine.rb b/actioncable/lib/action_cable/engine.rb index f5e233e091..96176e0014 100644 --- a/actioncable/lib/action_cable/engine.rb +++ b/actioncable/lib/action_cable/engine.rb @@ -4,9 +4,9 @@ require "action_cable/helpers/action_cable_helper" require "active_support/core_ext/hash/indifferent_access" module ActionCable - class Railtie < Rails::Engine # :nodoc: + class Engine < Rails::Engine # :nodoc: config.action_cable = ActiveSupport::OrderedOptions.new - config.action_cable.url = '/cable' + config.action_cable.mount_path = ActionCable::INTERNAL[:default_mount_path] config.eager_load_namespaces << ActionCable @@ -31,8 +31,47 @@ module ActionCable self.cable = Rails.application.config_for(config_path).with_indifferent_access end + previous_connection_class = self.connection_class + self.connection_class = -> { 'ApplicationCable::Connection'.safe_constantize || previous_connection_class.call } + options.each { |k,v| send("#{k}=", v) } end end + + initializer "action_cable.routes" do + config.after_initialize do |app| + config = app.config + unless config.action_cable.mount_path.nil? + app.routes.prepend do + mount ActionCable.server => config.action_cable.mount_path, internal: true + end + end + end + end + + initializer "action_cable.set_work_hooks" do |app| + ActiveSupport.on_load(:action_cable) do + ActionCable::Server::Worker.set_callback :work, :around, prepend: true do |_, inner| + app.executor.wrap do + # If we took a while to get the lock, we may have been halted + # in the meantime. As we haven't started doing any real work + # yet, we should pretend that we never made it off the queue. + unless stopping? + inner.call + end + end + end + + wrap = lambda do |_, inner| + app.executor.wrap(&inner) + end + ActionCable::Channel::Base.set_callback :subscribe, :around, prepend: true, &wrap + ActionCable::Channel::Base.set_callback :unsubscribe, :around, prepend: true, &wrap + + app.reloader.before_class_unload do + ActionCable.server.restart + end + end + end end end diff --git a/actioncable/lib/action_cable/gem_version.rb b/actioncable/lib/action_cable/gem_version.rb index c652fb91ae..8ba0230d47 100644 --- a/actioncable/lib/action_cable/gem_version.rb +++ b/actioncable/lib/action_cable/gem_version.rb @@ -6,9 +6,9 @@ module ActionCable module VERSION MAJOR = 5 - MINOR = 0 + MINOR = 1 TINY = 0 - PRE = "beta1.1" + PRE = "alpha" STRING = [MAJOR, MINOR, TINY, PRE].compact.join(".") end diff --git a/actioncable/lib/action_cable/helpers/action_cable_helper.rb b/actioncable/lib/action_cable/helpers/action_cable_helper.rb index b82751468a..2081a37db6 100644 --- a/actioncable/lib/action_cable/helpers/action_cable_helper.rb +++ b/actioncable/lib/action_cable/helpers/action_cable_helper.rb @@ -1,28 +1,39 @@ module ActionCable module Helpers module ActionCableHelper - # Returns an "action-cable-url" meta tag with the value of the url specified in your - # configuration. Ensure this is above your javascript tag: + # Returns an "action-cable-url" meta tag with the value of the URL specified in your + # configuration. Ensure this is above your JavaScript tag: # # <head> # <%= action_cable_meta_tag %> # <%= javascript_include_tag 'application', 'data-turbolinks-track' => true %> # </head> # - # This is then used by ActionCable to determine the url of your websocket server. + # This is then used by Action Cable to determine the URL of your WebSocket server. # Your CoffeeScript can then connect to the server without needing to specify the - # url directly: + # URL directly: # # #= require cable # @App = {} # App.cable = Cable.createConsumer() # - # Make sure to specify the correct server location in each of your environments - # config file: + # Make sure to specify the correct server location in each of your environment + # config files: + # + # config.action_cable.mount_path = "/cable123" + # <%= action_cable_meta_tag %> would render: + # => <meta name="action-cable-url" content="/cable123" /> + # + # config.action_cable.url = "ws://actioncable.com" + # <%= action_cable_meta_tag %> would render: + # => <meta name="action-cable-url" content="ws://actioncable.com" /> # - # config.action_cable.url = "ws://example.com:28080" def action_cable_meta_tag - tag "meta", name: "action-cable-url", content: Rails.application.config.action_cable.url + tag "meta", name: "action-cable-url", content: ( + ActionCable.server.config.url || + ActionCable.server.config.mount_path || + raise("No Action Cable URL configured -- please configure this at config.action_cable.url") + ) end end end diff --git a/actioncable/lib/action_cable/process/logging.rb b/actioncable/lib/action_cable/process/logging.rb deleted file mode 100644 index dce637b3ca..0000000000 --- a/actioncable/lib/action_cable/process/logging.rb +++ /dev/null @@ -1,7 +0,0 @@ -require 'action_cable/server' -require 'eventmachine' - -EM.error_handler do |e| - puts "Error raised inside the event loop: #{e.message}" - puts e.backtrace.join("\n") -end diff --git a/actioncable/lib/action_cable/remote_connections.rb b/actioncable/lib/action_cable/remote_connections.rb index aa2fc95d2f..a528024427 100644 --- a/actioncable/lib/action_cable/remote_connections.rb +++ b/actioncable/lib/action_cable/remote_connections.rb @@ -1,6 +1,7 @@ module ActionCable - # If you need to disconnect a given connection, you go through the RemoteConnections. You find the connections you're looking for by - # searching the identifier declared on the connection. Example: + # If you need to disconnect a given connection, you can go through the + # RemoteConnections. You can find the connections you're looking for by + # searching for the identifier declared on the connection. For example: # # module ApplicationCable # class Connection < ActionCable::Connection::Base @@ -11,8 +12,9 @@ module ActionCable # # ActionCable.server.remote_connections.where(current_user: User.find(1)).disconnect # - # That will disconnect all the connections established for User.find(1) across all servers running on all machines (because it uses - # the internal channel that all these servers are subscribed to). + # This will disconnect all the connections established for + # <tt>User.find(1)</tt>, across all servers running on all machines, because + # it uses the internal channel that all of these servers are subscribed to. class RemoteConnections attr_reader :server @@ -25,8 +27,8 @@ module ActionCable end private - # Represents a single remote connection found via ActionCable.server.remote_connections.where(*). - # Exists for the solely for the purpose of calling #disconnect on that connection. + # Represents a single remote connection found via <tt>ActionCable.server.remote_connections.where(*)</tt>. + # Exists solely for the purpose of calling #disconnect on that connection. class RemoteConnection class InvalidIdentifiersError < StandardError; end diff --git a/actioncable/lib/action_cable/server.rb b/actioncable/lib/action_cable/server.rb index a2a89d5f1e..bd6a3826a3 100644 --- a/actioncable/lib/action_cable/server.rb +++ b/actioncable/lib/action_cable/server.rb @@ -1,7 +1,3 @@ -require 'eventmachine' -EventMachine.epoll if EventMachine.epoll? -EventMachine.kqueue if EventMachine.kqueue? - module ActionCable module Server extend ActiveSupport::Autoload diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb index 3385a4c9f3..0ad1e408a9 100644 --- a/actioncable/lib/action_cable/server/base.rb +++ b/actioncable/lib/action_cable/server/base.rb @@ -1,9 +1,11 @@ +require 'monitor' + module ActionCable module Server - # A singleton ActionCable::Server instance is available via ActionCable.server. It's used by the rack process that starts the cable server, but - # also by the user to reach the RemoteConnections instead for finding and disconnecting connections across all servers. + # A singleton ActionCable::Server instance is available via ActionCable.server. It's used by the Rack process that starts the Action Cable server, but + # is also used by the user to reach the RemoteConnections object, which is used for finding and disconnecting connections across all servers. # - # Also, this is the server instance used for broadcasting. See Broadcasting for details. + # Also, this is the server instance used for broadcasting. See Broadcasting for more information. class Base include ActionCable::Server::Broadcasting include ActionCable::Server::Connections @@ -13,13 +15,17 @@ module ActionCable def self.logger; config.logger; end delegate :logger, to: :config + attr_reader :mutex + def initialize + @mutex = Monitor.new + @remote_connections = @event_loop = @worker_pool = @pubsub = nil end - # Called by rack to setup the server. + # Called by Rack to setup the server. def call(env) setup_heartbeat_timer - config.connection_class.new(self, env).process + config.connection_class.call.new(self, env).process end # Disconnect all the connections identified by `identifiers` on this server or any others via RemoteConnections. @@ -27,32 +33,48 @@ module ActionCable remote_connections.where(identifiers).disconnect end + def restart + connections.each(&:close) + + @mutex.synchronize do + worker_pool.halt if @worker_pool + + @worker_pool = nil + end + end + # Gateway to RemoteConnections. See that class for details. def remote_connections - @remote_connections ||= RemoteConnections.new(self) + @remote_connections || @mutex.synchronize { @remote_connections ||= RemoteConnections.new(self) } end - # The thread worker pool for handling all the connection work on this server. Default size is set by config.worker_pool_size. - def worker_pool - @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size) + def event_loop + @event_loop || @mutex.synchronize { @event_loop ||= config.event_loop_class.new } end - # Requires and returns a hash of all the channel class constants keyed by name. - def channel_classes - @channel_classes ||= begin - config.channel_paths.each { |channel_path| require channel_path } - config.channel_class_names.each_with_object({}) { |name, hash| hash[name] = name.constantize } - end + # The worker pool is where we run connection callbacks and channel actions. We do as little as possible on the server's main thread. + # The worker pool is an executor service that's backed by a pool of threads working from a task queue. The thread pool size maxes out + # at 4 worker threads by default. Tune the size yourself with `config.action_cable.worker_pool_size`. + # + # Using Active Record, Redis, etc within your channel actions means you'll get a separate connection from each thread in the worker pool. + # Plan your deployment accordingly: 5 servers each running 5 Puma workers each running an 8-thread worker pool means at least 200 database + # connections. + # + # Also, ensure that your database connection pool size is as least as large as your worker pool size. Otherwise, workers may oversubscribe + # the database connection pool and block while they wait for other workers to release their connections. Use a smaller worker pool or a larger + # database connection pool instead. + def worker_pool + @worker_pool || @mutex.synchronize { @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size) } end # Adapter used for all streams/broadcasting. def pubsub - @pubsub ||= config.pubsub_adapter.new(self) + @pubsub || @mutex.synchronize { @pubsub ||= config.pubsub_adapter.new(self) } end - # All the identifiers applied to the connection class associated with this server. + # All of the identifiers applied to the connection class associated with this server. def connection_identifiers - config.connection_class.identifiers + config.connection_class.call.identifiers end end diff --git a/actioncable/lib/action_cable/server/broadcasting.rb b/actioncable/lib/action_cable/server/broadcasting.rb index 4a26ed9269..8f93564113 100644 --- a/actioncable/lib/action_cable/server/broadcasting.rb +++ b/actioncable/lib/action_cable/server/broadcasting.rb @@ -1,45 +1,46 @@ module ActionCable module Server - # Broadcasting is how other parts of your application can send messages to the channel subscribers. As explained in Channel, most of the time, these + # Broadcasting is how other parts of your application can send messages to a channel's subscribers. As explained in Channel, most of the time, these # broadcastings are streamed directly to the clients subscribed to the named broadcasting. Let's explain with a full-stack example: # # class WebNotificationsChannel < ApplicationCable::Channel - # def subscribed - # stream_from "web_notifications_#{current_user.id}" - # end - # end + # def subscribed + # stream_from "web_notifications_#{current_user.id}" + # end + # end # - # # Somewhere in your app this is called, perhaps from a NewCommentJob - # ActionCable.server.broadcast \ - # "web_notifications_1", { title: 'New things!', body: 'All shit fit for print' } + # # Somewhere in your app this is called, perhaps from a NewCommentJob: + # ActionCable.server.broadcast \ + # "web_notifications_1", { title: "New things!", body: "All that's fit for print" } # - # # Client-side coffescript, which assumes you've already requested the right to send web notifications - # App.cable.subscriptions.create "WebNotificationsChannel", - # received: (data) -> - # new Notification data['title'], body: data['body'] + # # Client-side CoffeeScript, which assumes you've already requested the right to send web notifications: + # App.cable.subscriptions.create "WebNotificationsChannel", + # received: (data) -> + # new Notification data['title'], body: data['body'] module Broadcasting - # Broadcast a hash directly to a named <tt>broadcasting</tt>. It'll automatically be JSON encoded. - def broadcast(broadcasting, message) - broadcaster_for(broadcasting).broadcast(message) + # Broadcast a hash directly to a named <tt>broadcasting</tt>. This will later be JSON encoded. + def broadcast(broadcasting, message, coder: ActiveSupport::JSON) + broadcaster_for(broadcasting, coder: coder).broadcast(message) end - # Returns a broadcaster for a named <tt>broadcasting</tt> that can be reused. Useful when you have a object that + # Returns a broadcaster for a named <tt>broadcasting</tt> that can be reused. Useful when you have an object that # may need multiple spots to transmit to a specific broadcasting over and over. - def broadcaster_for(broadcasting) - Broadcaster.new(self, broadcasting) + def broadcaster_for(broadcasting, coder: ActiveSupport::JSON) + Broadcaster.new(self, String(broadcasting), coder: coder) end private class Broadcaster - attr_reader :server, :broadcasting + attr_reader :server, :broadcasting, :coder - def initialize(server, broadcasting) - @server, @broadcasting = server, broadcasting + def initialize(server, broadcasting, coder:) + @server, @broadcasting, @coder = server, broadcasting, coder end def broadcast(message) - server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message}" - server.pubsub.broadcast broadcasting, ActiveSupport::JSON.encode(message) + server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message.inspect}" + encoded = coder ? coder.encode(message) : message + server.pubsub.broadcast broadcasting, encoded end end end diff --git a/actioncable/lib/action_cable/server/configuration.rb b/actioncable/lib/action_cable/server/configuration.rb index 9a248933c4..ada1ac22cc 100644 --- a/actioncable/lib/action_cable/server/configuration.rb +++ b/actioncable/lib/action_cable/server/configuration.rb @@ -1,37 +1,22 @@ module ActionCable module Server - # An instance of this configuration object is available via ActionCable.server.config, which allows you to tweak the configuration points + # An instance of this configuration object is available via ActionCable.server.config, which allows you to tweak Action Cable configuration # in a Rails config initializer. class Configuration attr_accessor :logger, :log_tags - attr_accessor :connection_class, :worker_pool_size - attr_accessor :channel_load_paths + attr_accessor :use_faye, :connection_class, :worker_pool_size attr_accessor :disable_request_forgery_protection, :allowed_request_origins - attr_accessor :cable, :url + attr_accessor :cable, :url, :mount_path def initialize @log_tags = [] - @connection_class = ApplicationCable::Connection - @worker_pool_size = 100 - - @channel_load_paths = [Rails.root.join('app/channels')] + @connection_class = -> { ActionCable::Connection::Base } + @worker_pool_size = 4 @disable_request_forgery_protection = false end - def channel_paths - @channel_paths ||= channel_load_paths.flat_map do |path| - Dir["#{path}/**/*_channel.rb"] - end - end - - def channel_class_names - @channel_class_names ||= channel_paths.collect do |channel_path| - Pathname.new(channel_path).basename.to_s.split('.').first.camelize - end - end - # Returns constant of subscription adapter specified in config/cable.yml. # If the adapter cannot be found, this will default to the Redis adapter. # Also makes sure proper dependencies are required. @@ -50,6 +35,22 @@ module ActionCable adapter = 'PostgreSQL' if adapter == 'Postgresql' "ActionCable::SubscriptionAdapter::#{adapter}".constantize end + + def event_loop_class + if use_faye + ActionCable::Connection::FayeEventLoop + else + ActionCable::Connection::StreamEventLoop + end + end + + def client_socket_class + if use_faye + ActionCable::Connection::FayeClientSocket + else + ActionCable::Connection::ClientSocket + end + end end end end diff --git a/actioncable/lib/action_cable/server/connections.rb b/actioncable/lib/action_cable/server/connections.rb index 47dcea8c20..5e61b4e335 100644 --- a/actioncable/lib/action_cable/server/connections.rb +++ b/actioncable/lib/action_cable/server/connections.rb @@ -1,9 +1,8 @@ module ActionCable module Server - # Collection class for all the connections that's been established on this specific server. Remember, usually you'll run many cable servers, so - # you can't use this collection as an full list of all the connections established against your application. Use RemoteConnections for that. - # As such, this is primarily for internal use. - module Connections + # Collection class for all the connections that have been established on this specific server. Remember, usually you'll run many Action Cable servers, so + # you can't use this collection as a full list of all of the connections established against your application. Instead, use RemoteConnections for that. + module Connections # :nodoc: BEAT_INTERVAL = 3 def connections @@ -19,13 +18,11 @@ module ActionCable end # WebSocket connection implementations differ on when they'll mark a connection as stale. We basically never want a connection to go stale, as you - # then can't rely on being able to receive and send to it. So there's a 3 second heartbeat running on all connections. If the beat fails, we automatically + # then can't rely on being able to communicate with the connection. To solve this, a 3 second heartbeat runs on all connections. If the beat fails, we automatically # disconnect. def setup_heartbeat_timer - EM.next_tick do - @heartbeat_timer ||= EventMachine.add_periodic_timer(BEAT_INTERVAL) do - EM.next_tick { connections.map(&:beat) } - end + @heartbeat_timer ||= event_loop.timer(BEAT_INTERVAL) do + event_loop.post { connections.map(&:beat) } end end diff --git a/actioncable/lib/action_cable/server/worker.rb b/actioncable/lib/action_cable/server/worker.rb index 3b6c6d44a1..a638ff72e7 100644 --- a/actioncable/lib/action_cable/server/worker.rb +++ b/actioncable/lib/action_cable/server/worker.rb @@ -4,60 +4,60 @@ require 'concurrent' module ActionCable module Server - # Worker used by Server.send_async to do connection work in threads. Only for internal use. - class Worker + # Worker used by Server.send_async to do connection work in threads. + class Worker # :nodoc: include ActiveSupport::Callbacks thread_mattr_accessor :connection define_callbacks :work include ActiveRecordConnectionManagement + attr_reader :executor + def initialize(max_size: 5) - @pool = Concurrent::ThreadPoolExecutor.new( + @executor = Concurrent::ThreadPoolExecutor.new( min_threads: 1, max_threads: max_size, max_queue: 0, ) end - def async_invoke(receiver, method, *args) - @pool.post do - invoke(receiver, method, *args) - end + # Stop processing work: any work that has not already started + # running will be discarded from the queue + def halt + @executor.kill end - def invoke(receiver, method, *args) - begin - self.connection = receiver + def stopping? + @executor.shuttingdown? + end - run_callbacks :work do - receiver.send method, *args - end - rescue Exception => e - logger.error "There was an exception - #{e.class}(#{e.message})" - logger.error e.backtrace.join("\n") + def work(connection) + self.connection = connection - receiver.handle_exception if receiver.respond_to?(:handle_exception) - ensure - self.connection = nil + run_callbacks :work do + yield end + ensure + self.connection = nil end - def async_run_periodic_timer(channel, callback) - @pool.post do - run_periodic_timer(channel, callback) + def async_invoke(receiver, method, *args, connection: receiver) + @executor.post do + invoke(receiver, method, *args, connection: connection) end end - def run_periodic_timer(channel, callback) - begin - self.connection = channel.connection + def invoke(receiver, method, *args, connection:) + work(connection) do + begin + receiver.send method, *args + rescue Exception => e + logger.error "There was an exception - #{e.class}(#{e.message})" + logger.error e.backtrace.join("\n") - run_callbacks :work do - callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback) + receiver.handle_exception if receiver.respond_to?(:handle_exception) end - ensure - self.connection = nil end end diff --git a/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb b/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb index ecece4e270..c1e4aa8103 100644 --- a/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb +++ b/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb @@ -1,7 +1,6 @@ module ActionCable module Server class Worker - # Clear active connections between units of work so the long-running channel or connection processes do not hoard connections. module ActiveRecordConnectionManagement extend ActiveSupport::Concern @@ -13,10 +12,8 @@ module ActionCable def with_database_connections connection.logger.tag(ActiveRecord::Base.logger) { yield } - ensure - ActiveRecord::Base.clear_active_connections! end end end end -end
\ No newline at end of file +end diff --git a/actioncable/lib/action_cable/subscription_adapter/async.rb b/actioncable/lib/action_cable/subscription_adapter/async.rb index 85d4892e4c..10b3ac8cd8 100644 --- a/actioncable/lib/action_cable/subscription_adapter/async.rb +++ b/actioncable/lib/action_cable/subscription_adapter/async.rb @@ -4,17 +4,22 @@ module ActionCable module SubscriptionAdapter class Async < Inline # :nodoc: private - def subscriber_map - @subscriber_map ||= AsyncSubscriberMap.new + def new_subscriber_map + AsyncSubscriberMap.new(server.event_loop) end class AsyncSubscriberMap < SubscriberMap + def initialize(event_loop) + @event_loop = event_loop + super() + end + def add_subscriber(*) - ::EM.next_tick { super } + @event_loop.post { super } end def invoke_callback(*) - ::EM.next_tick { super } + @event_loop.post { super } end end end diff --git a/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb new file mode 100644 index 0000000000..4735a4bfa8 --- /dev/null +++ b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb @@ -0,0 +1,79 @@ +require 'thread' + +gem 'em-hiredis', '~> 0.3.0' +gem 'redis', '~> 3.0' +require 'em-hiredis' +require 'redis' + +EventMachine.epoll if EventMachine.epoll? +EventMachine.kqueue if EventMachine.kqueue? + +module ActionCable + module SubscriptionAdapter + class EventedRedis < Base # :nodoc: + @@mutex = Mutex.new + + # Overwrite this factory method for EventMachine Redis connections if you want to use a different Redis connection library than EM::Hiredis. + # This is needed, for example, when using Makara proxies for distributed Redis. + cattr_accessor(:em_redis_connector) { ->(config) { EM::Hiredis.connect(config[:url]) } } + + # Overwrite this factory method for Redis connections if you want to use a different Redis connection library than Redis. + # This is needed, for example, when using Makara proxies for distributed Redis. + cattr_accessor(:redis_connector) { ->(config) { ::Redis.new(url: config[:url]) } } + + def initialize(*) + super + @redis_connection_for_broadcasts = @redis_connection_for_subscriptions = nil + end + + def broadcast(channel, payload) + redis_connection_for_broadcasts.publish(channel, payload) + end + + def subscribe(channel, message_callback, success_callback = nil) + redis_connection_for_subscriptions.pubsub.subscribe(channel, &message_callback).tap do |result| + result.callback { |reply| success_callback.call } if success_callback + end + end + + def unsubscribe(channel, message_callback) + redis_connection_for_subscriptions.pubsub.unsubscribe_proc(channel, message_callback) + end + + def shutdown + redis_connection_for_subscriptions.pubsub.close_connection + @redis_connection_for_subscriptions = nil + end + + private + def redis_connection_for_subscriptions + ensure_reactor_running + @redis_connection_for_subscriptions || @server.mutex.synchronize do + @redis_connection_for_subscriptions ||= self.class.em_redis_connector.call(@server.config.cable).tap do |redis| + redis.on(:reconnect_failed) do + @logger.error "[ActionCable] Redis reconnect failed." + end + + redis.on(:failed) do + @logger.error "[ActionCable] Redis connection has failed." + end + end + end + end + + def redis_connection_for_broadcasts + @redis_connection_for_broadcasts || @server.mutex.synchronize do + @redis_connection_for_broadcasts ||= self.class.redis_connector.call(@server.config.cable) + end + end + + def ensure_reactor_running + return if EventMachine.reactor_running? + @@mutex.synchronize do + Thread.new { EventMachine.run } unless EventMachine.reactor_running? + Thread.pass until EventMachine.reactor_running? + end + end + end + end +end diff --git a/actioncable/lib/action_cable/subscription_adapter/inline.rb b/actioncable/lib/action_cable/subscription_adapter/inline.rb index 4a2a8d23a2..81357faead 100644 --- a/actioncable/lib/action_cable/subscription_adapter/inline.rb +++ b/actioncable/lib/action_cable/subscription_adapter/inline.rb @@ -1,6 +1,11 @@ module ActionCable module SubscriptionAdapter class Inline < Base # :nodoc: + def initialize(*) + super + @subscriber_map = nil + end + def broadcast(channel, payload) subscriber_map.broadcast(channel, payload) end @@ -19,7 +24,11 @@ module ActionCable private def subscriber_map - @subscriber_map ||= SubscriberMap.new + @subscriber_map || @server.mutex.synchronize { @subscriber_map ||= new_subscriber_map } + end + + def new_subscriber_map + SubscriberMap.new end end end diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb index 78f8aeb599..66c7852f6e 100644 --- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb +++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb @@ -5,6 +5,11 @@ require 'thread' module ActionCable module SubscriptionAdapter class PostgreSQL < Base # :nodoc: + def initialize(*) + super + @listener = nil + end + def broadcast(channel, payload) with_connection do |pg_conn| pg_conn.exec("NOTIFY #{pg_conn.escape_identifier(channel)}, '#{pg_conn.escape_string(payload)}'") @@ -37,14 +42,15 @@ module ActionCable private def listener - @listener ||= Listener.new(self) + @listener || @server.mutex.synchronize { @listener ||= Listener.new(self, @server.event_loop) } end class Listener < SubscriberMap - def initialize(adapter) + def initialize(adapter, event_loop) super() @adapter = adapter + @event_loop = event_loop @queue = Queue.new @thread = Thread.new do @@ -63,7 +69,7 @@ module ActionCable case action when :listen pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}") - ::EM.next_tick(&callback) if callback + @event_loop.post(&callback) if callback when :unlisten pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}") when :shutdown @@ -93,7 +99,7 @@ module ActionCable end def invoke_callback(*) - ::EM.next_tick { super } + @event_loop.post { super } end end end diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb index 3b86354621..65434f7107 100644 --- a/actioncable/lib/action_cable/subscription_adapter/redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb @@ -1,41 +1,171 @@ -gem 'em-hiredis', '~> 0.3.0' +require 'thread' + gem 'redis', '~> 3.0' -require 'em-hiredis' require 'redis' module ActionCable module SubscriptionAdapter class Redis < Base # :nodoc: + # Overwrite this factory method for redis connections if you want to use a different Redis library than Redis. + # This is needed, for example, when using Makara proxies for distributed Redis. + cattr_accessor(:redis_connector) { ->(config) { ::Redis.new(url: config[:url]) } } + + def initialize(*) + super + @listener = nil + @redis_connection_for_broadcasts = nil + end + def broadcast(channel, payload) redis_connection_for_broadcasts.publish(channel, payload) end - def subscribe(channel, message_callback, success_callback = nil) - redis_connection_for_subscriptions.pubsub.subscribe(channel, &message_callback).tap do |result| - result.callback { |reply| success_callback.call } if success_callback - end + def subscribe(channel, callback, success_callback = nil) + listener.add_subscriber(channel, callback, success_callback) end - def unsubscribe(channel, message_callback) - redis_connection_for_subscriptions.pubsub.unsubscribe_proc(channel, message_callback) + def unsubscribe(channel, callback) + listener.remove_subscriber(channel, callback) end def shutdown - redis_connection_for_subscriptions.pubsub.close_connection - @redis_connection_for_subscriptions = nil + @listener.shutdown if @listener + end + + def redis_connection_for_subscriptions + redis_connection end private - def redis_connection_for_subscriptions - @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis| - redis.on(:reconnect_failed) do - @logger.info "[ActionCable] Redis reconnect failed." - end - end + def listener + @listener || @server.mutex.synchronize { @listener ||= Listener.new(self, @server.event_loop) } end def redis_connection_for_broadcasts - @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable) + @redis_connection_for_broadcasts || @server.mutex.synchronize do + @redis_connection_for_broadcasts ||= redis_connection + end + end + + def redis_connection + self.class.redis_connector.call(@server.config.cable) + end + + class Listener < SubscriberMap + def initialize(adapter, event_loop) + super() + + @adapter = adapter + @event_loop = event_loop + + @subscribe_callbacks = Hash.new { |h, k| h[k] = [] } + @subscription_lock = Mutex.new + + @raw_client = nil + + @when_connected = [] + + @thread = nil + end + + def listen(conn) + conn.without_reconnect do + original_client = conn.client + + conn.subscribe('_action_cable_internal') do |on| + on.subscribe do |chan, count| + @subscription_lock.synchronize do + if count == 1 + @raw_client = original_client + + until @when_connected.empty? + @when_connected.shift.call + end + end + + if callbacks = @subscribe_callbacks[chan] + next_callback = callbacks.shift + @event_loop.post(&next_callback) if next_callback + @subscribe_callbacks.delete(chan) if callbacks.empty? + end + end + end + + on.message do |chan, message| + broadcast(chan, message) + end + + on.unsubscribe do |chan, count| + if count == 0 + @subscription_lock.synchronize do + @raw_client = nil + end + end + end + end + end + end + + def shutdown + @subscription_lock.synchronize do + return if @thread.nil? + + when_connected do + send_command('unsubscribe') + @raw_client = nil + end + end + + Thread.pass while @thread.alive? + end + + def add_channel(channel, on_success) + @subscription_lock.synchronize do + ensure_listener_running + @subscribe_callbacks[channel] << on_success + when_connected { send_command('subscribe', channel) } + end + end + + def remove_channel(channel) + @subscription_lock.synchronize do + when_connected { send_command('unsubscribe', channel) } + end + end + + def invoke_callback(*) + @event_loop.post { super } + end + + private + def ensure_listener_running + @thread ||= Thread.new do + Thread.current.abort_on_exception = true + + conn = @adapter.redis_connection_for_subscriptions + listen conn + end + end + + def when_connected(&block) + if @raw_client + block.call + else + @when_connected << block + end + end + + def send_command(*command) + @raw_client.write(command) + + very_raw_connection = + @raw_client.connection.instance_variable_defined?(:@connection) && + @raw_client.connection.instance_variable_get(:@connection) + + if very_raw_connection && very_raw_connection.respond_to?(:flush) + very_raw_connection.flush + end + end end end end diff --git a/actioncable/lib/assets/javascripts/action_cable.coffee.erb b/actioncable/lib/assets/javascripts/action_cable.coffee.erb deleted file mode 100644 index 7daea4ebcd..0000000000 --- a/actioncable/lib/assets/javascripts/action_cable.coffee.erb +++ /dev/null @@ -1,23 +0,0 @@ -#= require_self -#= require action_cable/consumer - -@ActionCable = - INTERNAL: <%= ActionCable::INTERNAL.to_json %> - - createConsumer: (url = @getConfig("url")) -> - new ActionCable.Consumer @createWebSocketURL(url) - - getConfig: (name) -> - element = document.head.querySelector("meta[name='action-cable-#{name}']") - element?.getAttribute("content") - - createWebSocketURL: (url) -> - if url and not /^wss?:/i.test(url) - a = document.createElement("a") - a.href = url - # Fix populating Location properties in IE. Otherwise, protocol will be blank. - a.href = a.href - a.protocol = a.protocol.replace("http", "ws") - a.href - else - url diff --git a/actioncable/lib/assets/javascripts/action_cable/connection.coffee b/actioncable/lib/assets/javascripts/action_cable/connection.coffee deleted file mode 100644 index fbd7dbd35b..0000000000 --- a/actioncable/lib/assets/javascripts/action_cable/connection.coffee +++ /dev/null @@ -1,81 +0,0 @@ -# Encapsulate the cable connection held by the consumer. This is an internal class not intended for direct user manipulation. - -{message_types} = ActionCable.INTERNAL - -class ActionCable.Connection - @reopenDelay: 500 - - constructor: (@consumer) -> - @open() - - send: (data) -> - if @isOpen() - @webSocket.send(JSON.stringify(data)) - true - else - false - - open: => - if @webSocket and not @isState("closed") - throw new Error("Existing connection must be closed before opening") - else - @webSocket = new WebSocket(@consumer.url) - @installEventHandlers() - true - - close: -> - @webSocket?.close() - - reopen: -> - if @isState("closed") - @open() - else - try - @close() - finally - setTimeout(@open, @constructor.reopenDelay) - - isOpen: -> - @isState("open") - - # Private - - isState: (states...) -> - @getState() in states - - getState: -> - return state.toLowerCase() for state, value of WebSocket when value is @webSocket?.readyState - null - - installEventHandlers: -> - for eventName of @events - handler = @events[eventName].bind(this) - @webSocket["on#{eventName}"] = handler - return - - events: - message: (event) -> - {identifier, message, type} = JSON.parse(event.data) - - switch type - when message_types.confirmation - @consumer.subscriptions.notify(identifier, "connected") - when message_types.rejection - @consumer.subscriptions.reject(identifier) - else - @consumer.subscriptions.notify(identifier, "received", message) - - open: -> - @disconnected = false - @consumer.subscriptions.reload() - - close: -> - @disconnect() - - error: -> - @disconnect() - - disconnect: -> - return if @disconnected - @disconnected = true - @consumer.subscriptions.notifyAll("disconnected") diff --git a/actioncable/lib/assets/javascripts/action_cable/connection_monitor.coffee b/actioncable/lib/assets/javascripts/action_cable/connection_monitor.coffee deleted file mode 100644 index 99b9a1c6d5..0000000000 --- a/actioncable/lib/assets/javascripts/action_cable/connection_monitor.coffee +++ /dev/null @@ -1,79 +0,0 @@ -# Responsible for ensuring the cable connection is in good health by validating the heartbeat pings sent from the server, and attempting -# revival reconnections if things go astray. Internal class, not intended for direct user manipulation. -class ActionCable.ConnectionMonitor - @pollInterval: - min: 3 - max: 30 - - @staleThreshold: 6 # Server::Connections::BEAT_INTERVAL * 2 (missed two pings) - - identifier: ActionCable.INTERNAL.identifiers.ping - - constructor: (@consumer) -> - @consumer.subscriptions.add(this) - @start() - - connected: -> - @reset() - @pingedAt = now() - delete @disconnectedAt - - disconnected: -> - @disconnectedAt = now() - - received: -> - @pingedAt = now() - - reset: -> - @reconnectAttempts = 0 - - start: -> - @reset() - delete @stoppedAt - @startedAt = now() - @poll() - document.addEventListener("visibilitychange", @visibilityDidChange) - - stop: -> - @stoppedAt = now() - document.removeEventListener("visibilitychange", @visibilityDidChange) - - poll: -> - setTimeout => - unless @stoppedAt - @reconnectIfStale() - @poll() - , @getInterval() - - getInterval: -> - {min, max} = @constructor.pollInterval - interval = 5 * Math.log(@reconnectAttempts + 1) - clamp(interval, min, max) * 1000 - - reconnectIfStale: -> - if @connectionIsStale() - @reconnectAttempts++ - unless @disconnectedRecently() - @consumer.connection.reopen() - - connectionIsStale: -> - secondsSince(@pingedAt ? @startedAt) > @constructor.staleThreshold - - disconnectedRecently: -> - @disconnectedAt and secondsSince(@disconnectedAt) < @constructor.staleThreshold - - visibilityDidChange: => - if document.visibilityState is "visible" - setTimeout => - if @connectionIsStale() or not @consumer.connection.isOpen() - @consumer.connection.reopen() - , 200 - - now = -> - new Date().getTime() - - secondsSince = (time) -> - (now() - time) / 1000 - - clamp = (number, min, max) -> - Math.max(min, Math.min(max, number)) diff --git a/actioncable/lib/assets/javascripts/action_cable/consumer.coffee b/actioncable/lib/assets/javascripts/action_cable/consumer.coffee deleted file mode 100644 index fcd8d0fb6c..0000000000 --- a/actioncable/lib/assets/javascripts/action_cable/consumer.coffee +++ /dev/null @@ -1,25 +0,0 @@ -#= require action_cable/connection -#= require action_cable/connection_monitor -#= require action_cable/subscriptions -#= require action_cable/subscription - -# The ActionCable.Consumer establishes the connection to a server-side Ruby Connection object. Once established, -# the ActionCable.ConnectionMonitor will ensure that its properly maintained through heartbeats and checking for stale updates. -# The Consumer instance is also the gateway to establishing subscriptions to desired channels through the #createSubscription -# method. -# -# The following example shows how this can be setup: -# -# @App = {} -# App.cable = ActionCable.createConsumer "ws://example.com/accounts/1" -# App.appearance = App.cable.subscriptions.create "AppearanceChannel" -# -# For more details on how you'd configure an actual channel subscription, see ActionCable.Subscription. -class ActionCable.Consumer - constructor: (@url) -> - @subscriptions = new ActionCable.Subscriptions this - @connection = new ActionCable.Connection this - @connectionMonitor = new ActionCable.ConnectionMonitor this - - send: (data) -> - @connection.send(data) diff --git a/actioncable/lib/assets/javascripts/action_cable/subscription.coffee b/actioncable/lib/assets/javascripts/action_cable/subscription.coffee deleted file mode 100644 index 339d676933..0000000000 --- a/actioncable/lib/assets/javascripts/action_cable/subscription.coffee +++ /dev/null @@ -1,68 +0,0 @@ -# A new subscription is created through the ActionCable.Subscriptions instance available on the consumer. -# It provides a number of callbacks and a method for calling remote procedure calls on the corresponding -# Channel instance on the server side. -# -# An example demonstrates the basic functionality: -# -# App.appearance = App.cable.subscriptions.create "AppearanceChannel", -# connected: -> -# # Called once the subscription has been successfully completed -# -# appear: -> -# @perform 'appear', appearing_on: @appearingOn() -# -# away: -> -# @perform 'away' -# -# appearingOn: -> -# $('main').data 'appearing-on' -# -# The methods #appear and #away forward their intent to the remote AppearanceChannel instance on the server -# by calling the `@perform` method with the first parameter being the action (which maps to AppearanceChannel#appear/away). -# The second parameter is a hash that'll get JSON encoded and made available on the server in the data parameter. -# -# This is how the server component would look: -# -# class AppearanceChannel < ApplicationActionCable::Channel -# def subscribed -# current_user.appear -# end -# -# def unsubscribed -# current_user.disappear -# end -# -# def appear(data) -# current_user.appear on: data['appearing_on'] -# end -# -# def away -# current_user.away -# end -# end -# -# The "AppearanceChannel" name is automatically mapped between the client-side subscription creation and the server-side Ruby class name. -# The AppearanceChannel#appear/away public methods are exposed automatically to client-side invocation through the @perform method. -class ActionCable.Subscription - constructor: (@subscriptions, params = {}, mixin) -> - @identifier = JSON.stringify(params) - extend(this, mixin) - @subscriptions.add(this) - @consumer = @subscriptions.consumer - - # Perform a channel action with the optional data passed as an attribute - perform: (action, data = {}) -> - data.action = action - @send(data) - - send: (data) -> - @consumer.send(command: "message", identifier: @identifier, data: JSON.stringify(data)) - - unsubscribe: -> - @subscriptions.remove(this) - - extend = (object, properties) -> - if properties? - for key, value of properties - object[key] = value - object diff --git a/actioncable/lib/assets/javascripts/action_cable/subscriptions.coffee b/actioncable/lib/assets/javascripts/action_cable/subscriptions.coffee deleted file mode 100644 index ae041ffa2b..0000000000 --- a/actioncable/lib/assets/javascripts/action_cable/subscriptions.coffee +++ /dev/null @@ -1,64 +0,0 @@ -# Collection class for creating (and internally managing) channel subscriptions. The only method intended to be triggered by the user -# us ActionCable.Subscriptions#create, and it should be called through the consumer like so: -# -# @App = {} -# App.cable = ActionCable.createConsumer "ws://example.com/accounts/1" -# App.appearance = App.cable.subscriptions.create "AppearanceChannel" -# -# For more details on how you'd configure an actual channel subscription, see ActionCable.Subscription. -class ActionCable.Subscriptions - constructor: (@consumer) -> - @subscriptions = [] - - create: (channelName, mixin) -> - channel = channelName - params = if typeof channel is "object" then channel else {channel} - new ActionCable.Subscription this, params, mixin - - # Private - - add: (subscription) -> - @subscriptions.push(subscription) - @notify(subscription, "initialized") - @sendCommand(subscription, "subscribe") - - remove: (subscription) -> - @forget(subscription) - - unless @findAll(subscription.identifier).length - @sendCommand(subscription, "unsubscribe") - - reject: (identifier) -> - for subscription in @findAll(identifier) - @forget(subscription) - @notify(subscription, "rejected") - - forget: (subscription) -> - @subscriptions = (s for s in @subscriptions when s isnt subscription) - - findAll: (identifier) -> - s for s in @subscriptions when s.identifier is identifier - - reload: -> - for subscription in @subscriptions - @sendCommand(subscription, "subscribe") - - notifyAll: (callbackName, args...) -> - for subscription in @subscriptions - @notify(subscription, callbackName, args...) - - notify: (subscription, callbackName, args...) -> - if typeof subscription is "string" - subscriptions = @findAll(subscription) - else - subscriptions = [subscription] - - for subscription in subscriptions - subscription[callbackName]?(args...) - - sendCommand: (subscription, command) -> - {identifier} = subscription - if identifier is ActionCable.INTERNAL.identifiers.ping - @consumer.connection.isOpen() - else - @consumer.send({command, identifier}) diff --git a/actioncable/lib/rails/generators/channel/USAGE b/actioncable/lib/rails/generators/channel/USAGE index 27a934c689..6249553c22 100644 --- a/actioncable/lib/rails/generators/channel/USAGE +++ b/actioncable/lib/rails/generators/channel/USAGE @@ -3,7 +3,7 @@ Description: Stubs out a new cable channel for the server (in Ruby) and client (in CoffeeScript). Pass the channel name, either CamelCased or under_scored, and an optional list of channel actions as arguments. - Note: Turn on the cable connection in app/assets/javascript/cable.coffee after generating any channels. + Note: Turn on the cable connection in app/assets/javascript/cable.js after generating any channels. Example: ======== diff --git a/actioncable/lib/rails/generators/channel/channel_generator.rb b/actioncable/lib/rails/generators/channel/channel_generator.rb index c5d398810a..47232252e3 100644 --- a/actioncable/lib/rails/generators/channel/channel_generator.rb +++ b/actioncable/lib/rails/generators/channel/channel_generator.rb @@ -13,13 +13,34 @@ module Rails template "channel.rb", File.join('app/channels', class_path, "#{file_name}_channel.rb") if options[:assets] - template "assets/channel.coffee", File.join('app/assets/javascripts/channels', class_path, "#{file_name}.coffee") + if self.behavior == :invoke + template "assets/cable.js", "app/assets/javascripts/cable.js" + end + + js_template "assets/channel", File.join('app/assets/javascripts/channels', class_path, "#{file_name}") end + + generate_application_cable_files end protected def file_name - @_file_name ||= super.gsub(/\_channel/i, '') + @_file_name ||= super.gsub(/_channel/i, '') + end + + # FIXME: Change these files to symlinks once RubyGems 2.5.0 is required. + def generate_application_cable_files + return if self.behavior != :invoke + + files = [ + 'application_cable/channel.rb', + 'application_cable/connection.rb' + ] + + files.each do |name| + path = File.join('app/channels/', name) + template(name, path) if !File.exist?(path) + end end end end diff --git a/actioncable/lib/rails/generators/channel/templates/application_cable/channel.rb b/actioncable/lib/rails/generators/channel/templates/application_cable/channel.rb new file mode 100644 index 0000000000..17a85f60f9 --- /dev/null +++ b/actioncable/lib/rails/generators/channel/templates/application_cable/channel.rb @@ -0,0 +1,6 @@ +# Be sure to restart your server when you modify this file. Action Cable runs in +# a loop that does not support auto reloading. +module ApplicationCable + class Channel < ActionCable::Channel::Base + end +end diff --git a/actioncable/lib/rails/generators/channel/templates/application_cable/connection.rb b/actioncable/lib/rails/generators/channel/templates/application_cable/connection.rb new file mode 100644 index 0000000000..93f28c4306 --- /dev/null +++ b/actioncable/lib/rails/generators/channel/templates/application_cable/connection.rb @@ -0,0 +1,6 @@ +# Be sure to restart your server when you modify this file. Action Cable runs in +# a loop that does not support auto reloading. +module ApplicationCable + class Connection < ActionCable::Connection::Base + end +end diff --git a/actioncable/lib/rails/generators/channel/templates/assets/cable.js b/actioncable/lib/rails/generators/channel/templates/assets/cable.js new file mode 100644 index 0000000000..71ee1e66de --- /dev/null +++ b/actioncable/lib/rails/generators/channel/templates/assets/cable.js @@ -0,0 +1,13 @@ +// Action Cable provides the framework to deal with WebSockets in Rails. +// You can generate new channels where WebSocket features live using the rails generate channel command. +// +//= require action_cable +//= require_self +//= require_tree ./channels + +(function() { + this.App || (this.App = {}); + + App.cable = ActionCable.createConsumer(); + +}).call(this); diff --git a/actioncable/lib/rails/generators/channel/templates/assets/channel.js b/actioncable/lib/rails/generators/channel/templates/assets/channel.js new file mode 100644 index 0000000000..ab0e68b11a --- /dev/null +++ b/actioncable/lib/rails/generators/channel/templates/assets/channel.js @@ -0,0 +1,18 @@ +App.<%= class_name.underscore %> = App.cable.subscriptions.create("<%= class_name %>Channel", { + connected: function() { + // Called when the subscription is ready for use on the server + }, + + disconnected: function() { + // Called when the subscription has been terminated by the server + }, + + received: function(data) { + // Called when there's incoming data on the websocket for this channel + }<%= actions.any? ? ",\n" : '' %> +<% actions.each do |action| -%> + <%=action %>: function() { + return this.perform('<%= action %>'); + }<%= action == actions[-1] ? '' : ",\n" %> +<% end -%> +}); diff --git a/actioncable/lib/rails/generators/channel/templates/channel.rb b/actioncable/lib/rails/generators/channel/templates/channel.rb index 6cf04ee61f..7bff3341c1 100644 --- a/actioncable/lib/rails/generators/channel/templates/channel.rb +++ b/actioncable/lib/rails/generators/channel/templates/channel.rb @@ -1,4 +1,4 @@ -# Be sure to restart your server when you modify this file. Action Cable runs in an EventMachine loop that does not support auto reloading. +# Be sure to restart your server when you modify this file. Action Cable runs in a loop that does not support auto reloading. <% module_namespacing do -%> class <%= class_name %>Channel < ApplicationCable::Channel def subscribed |