diff options
Diffstat (limited to 'actioncable')
42 files changed, 795 insertions, 245 deletions
diff --git a/actioncable/CHANGELOG.md b/actioncable/CHANGELOG.md index fb9e498e49..5162a31cf8 100644 --- a/actioncable/CHANGELOG.md +++ b/actioncable/CHANGELOG.md @@ -1,7 +1,36 @@ -* Allow channel identifiers with no backslahes/escaping to be accepted - by the subscription storer. +* WebSocket protocol negotiation. - *Jon Moss* + Introduces an Action Cable protocol version that moves independently + of and, hopefully, more slowly than Action Cable itself. Client sockets + negotiate a protocol with the Cable server using WebSockets' native + subprotocol support: + * https://tools.ietf.org/html/rfc6455#section-1.9 + * https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API/Writing_WebSocket_servers#Subprotocols + + If they can't negotiate a compatible protocol (usually due to upgrading + the Cable server with a browser still running old JavaScript) then the + client knows to disconnect, cease retrying, and tell the app that it hit + a protocol mismatch. + + This allows us to evolve the Action Cable message format, handshaking, + pings, acknowledgements, and more without breaking older clients' + expectations of server behavior. + + *Daniel Rhodes* + +* Pubsub: automatic stream decoding. + + stream_for @room, coder: ActiveSupport::JSON do |message| + # `message` is a Ruby hash here instead of a JSON string + + The `coder` must respond to `#decode`. Defaults to `coder: nil` + which skips decoding entirely. + + *Jeremy Daer* + +* Add ActiveSupport::Notifications to ActionCable::Channel. + + *Matthew Wear* * Safely support autoloading and class unloading, by preventing concurrent loads, and disconnecting all cables during reload. diff --git a/actioncable/README.md b/actioncable/README.md index 595830feb0..fe4d213485 100644 --- a/actioncable/README.md +++ b/actioncable/README.md @@ -178,7 +178,7 @@ App.cable.subscriptions.create "AppearanceChannel", ``` Simply calling `App.cable.subscriptions.create` will setup the subscription, which will call `AppearanceChannel#subscribed`, -which in turn is linked to original `App.cable` -> `ApplicationCable::Connection` instances. +which in turn is linked to the original `App.cable` -> `ApplicationCable::Connection` instances. Next, we link the client-side `appear` method to `AppearanceChannel#appear(data)`. This is possible because the server-side channel instance will automatically expose the public methods declared on the class (minus the callbacks), so that these @@ -412,12 +412,12 @@ The above will start a cable server on port 28080. ### In app -If you are using a server that supports the [Rack socket hijacking API](http://www.rubydoc.info/github/rack/rack/file/SPEC#Hijacking), Action Cable can run alongside your Rails application. For example, to listen for WebSocket requests on `/cable`, mount the server at that path: +If you are using a server that supports the [Rack socket hijacking API](http://www.rubydoc.info/github/rack/rack/file/SPEC#Hijacking), Action Cable can run alongside your Rails application. For example, to listen for WebSocket requests on `/websocket`, specify that path to `config.action_cable.mount_path`: ```ruby -# config/routes.rb -Example::Application.routes.draw do - mount ActionCable.server => '/cable' +# config/application.rb +class Application < Rails::Application + config.action_cable.mount_path = '/websocket' end ``` diff --git a/actioncable/Rakefile b/actioncable/Rakefile index 1d77fc7067..5ba7b7f7f6 100644 --- a/actioncable/Rakefile +++ b/actioncable/Rakefile @@ -19,6 +19,14 @@ Rake::TestTask.new do |t| t.ruby_opts = ["--dev"] if defined?(JRUBY_VERSION) end +namespace :test do + task :isolated do + Dir.glob("test/**/*_test.rb").all? do |file| + sh(Gem.ruby, '-w', '-Ilib:test', file) + end or raise "Failures" + end +end + namespace :assets do root_path = Pathname.new(dir) destination_path = root_path.join("lib/assets/compiled") diff --git a/actioncable/app/assets/javascripts/action_cable/connection.coffee b/actioncable/app/assets/javascripts/action_cable/connection.coffee index 92272cc5b8..d6a6397804 100644 --- a/actioncable/app/assets/javascripts/action_cable/connection.coffee +++ b/actioncable/app/assets/javascripts/action_cable/connection.coffee @@ -2,7 +2,8 @@ # Encapsulate the cable connection held by the consumer. This is an internal class not intended for direct user manipulation. -{message_types} = ActionCable.INTERNAL +{message_types, protocols} = ActionCable.INTERNAL +[supportedProtocols..., unsupportedProtocol] = protocols class ActionCable.Connection @reopenDelay: 500 @@ -10,6 +11,7 @@ class ActionCable.Connection constructor: (@consumer) -> {@subscriptions} = @consumer @monitor = new ActionCable.ConnectionMonitor this + @disconnected = true send: (data) -> if @isOpen() @@ -20,18 +22,19 @@ class ActionCable.Connection open: => if @isActive() - ActionCable.log("Attemped to open WebSocket, but existing socket is #{@getState()}") + ActionCable.log("Attempted to open WebSocket, but existing socket is #{@getState()}") throw new Error("Existing connection must be closed before opening") else - ActionCable.log("Opening WebSocket, current state is #{@getState()}") + ActionCable.log("Opening WebSocket, current state is #{@getState()}, subprotocols: #{protocols}") @uninstallEventHandlers() if @webSocket? - @webSocket = new WebSocket(@consumer.url) + @webSocket = new WebSocket(@consumer.url, protocols) @installEventHandlers() @monitor.start() true - close: -> - @webSocket?.close() + close: ({allowReconnect} = {allowReconnect: true}) -> + @monitor.stop() unless allowReconnect + @webSocket?.close() if @isActive() reopen: -> ActionCable.log("Reopening WebSocket, current state is #{@getState()}") @@ -46,6 +49,9 @@ class ActionCable.Connection else @open() + getProtocol: -> + @webSocket?.protocol + isOpen: -> @isState("open") @@ -54,6 +60,9 @@ class ActionCable.Connection # Private + isProtocolSupported: -> + @getProtocol() in supportedProtocols + isState: (states...) -> @getState() in states @@ -74,10 +83,12 @@ class ActionCable.Connection events: message: (event) -> + return unless @isProtocolSupported() {identifier, message, type} = JSON.parse(event.data) switch type when message_types.welcome @monitor.recordConnect() + @subscriptions.reload() when message_types.ping @monitor.recordPing() when message_types.confirmation @@ -88,20 +99,18 @@ class ActionCable.Connection @subscriptions.notify(identifier, "received", message) open: -> - ActionCable.log("WebSocket onopen event") + ActionCable.log("WebSocket onopen event, using '#{@getProtocol()}' subprotocol") @disconnected = false - @subscriptions.reload() + if not @isProtocolSupported() + ActionCable.log("Protocol is unsupported. Stopping monitor and disconnecting.") + @close(allowReconnect: false) - close: -> + close: (event) -> ActionCable.log("WebSocket onclose event") - @disconnect() + return if @disconnected + @disconnected = true + @monitor.recordDisconnect() + @subscriptions.notifyAll("disconnected", {willAttemptReconnect: @monitor.isRunning()}) error: -> ActionCable.log("WebSocket onerror event") - @disconnect() - - disconnect: -> - return if @disconnected - @disconnected = true - @subscriptions.notifyAll("disconnected") - @monitor.recordDisconnect() diff --git a/actioncable/app/assets/javascripts/action_cable/consumer.coffee b/actioncable/app/assets/javascripts/action_cable/consumer.coffee index 7aae1ed8ed..3298be717f 100644 --- a/actioncable/app/assets/javascripts/action_cable/consumer.coffee +++ b/actioncable/app/assets/javascripts/action_cable/consumer.coffee @@ -14,6 +14,19 @@ # App.appearance = App.cable.subscriptions.create "AppearanceChannel" # # For more details on how you'd configure an actual channel subscription, see ActionCable.Subscription. +# +# When a consumer is created, it automatically connects with the server. +# +# To disconnect from the server, call +# +# App.cable.disconnect() +# +# and to restart the connection: +# +# App.cable.connect() +# +# Any channel subscriptions which existed prior to disconnecting will +# automatically resubscribe. class ActionCable.Consumer constructor: (@url) -> @subscriptions = new ActionCable.Subscriptions this @@ -22,6 +35,12 @@ class ActionCable.Consumer send: (data) -> @connection.send(data) + connect: -> + @connection.open() + + disconnect: -> + @connection.close(allowReconnect: false) + ensureActiveConnection: -> unless @connection.isActive() @connection.open() diff --git a/actioncable/app/assets/javascripts/action_cable/subscription.coffee b/actioncable/app/assets/javascripts/action_cable/subscription.coffee index 61a3fb1309..8e0805a174 100644 --- a/actioncable/app/assets/javascripts/action_cable/subscription.coffee +++ b/actioncable/app/assets/javascripts/action_cable/subscription.coffee @@ -8,6 +8,12 @@ # connected: -> # # Called once the subscription has been successfully completed # +# disconnected: ({ willAttemptReconnect: boolean }) -> +# # Called when the client has disconnected with the server. +# # The object will have an `willAttemptReconnect` property which +# # says whether the client has the intention of attempting +# # to reconnect. +# # appear: -> # @perform 'appear', appearing_on: @appearingOn() # diff --git a/actioncable/lib/action_cable.rb b/actioncable/lib/action_cable.rb index 68a5fff3e7..b6d2842867 100644 --- a/actioncable/lib/action_cable.rb +++ b/actioncable/lib/action_cable.rb @@ -35,7 +35,8 @@ module ActionCable confirmation: 'confirm_subscription'.freeze, rejection: 'reject_subscription'.freeze }, - default_mount_path: '/cable'.freeze + default_mount_path: '/cable'.freeze, + protocols: ["actioncable-v1-json".freeze, "actioncable-unsupported".freeze].freeze } # Singleton instance of the server diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb index 714d9887d4..845b747fc5 100644 --- a/actioncable/lib/action_cable/channel/base.rb +++ b/actioncable/lib/action_cable/channel/base.rb @@ -160,7 +160,10 @@ module ActionCable action = extract_action(data) if processable_action?(action) - dispatch_action(action, data) + payload = { channel_class: self.class.name, action: action, data: data } + ActiveSupport::Notifications.instrument("perform_action.action_cable", payload) do + dispatch_action(action, data) + end else logger.error "Unable to process #{action_signature(action, data)}" end @@ -192,7 +195,11 @@ module ActionCable # the proper channel identifier marked as the recipient. def transmit(data, via: nil) logger.info "#{self.class.name} transmitting #{data.inspect.truncate(300)}".tap { |m| m << " (via #{via})" if via } - connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, message: data) + + payload = { channel_class: self.class.name, data: data, via: via } + ActiveSupport::Notifications.instrument("transmit.action_cable", payload) do + connection.transmit identifier: @identifier, message: data + end end def defer_subscription_confirmation! @@ -265,8 +272,11 @@ module ActionCable def transmit_subscription_confirmation unless subscription_confirmation_sent? logger.info "#{self.class.name} is transmitting the subscription confirmation" - connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:confirmation]) - @subscription_confirmation_sent = true + + ActiveSupport::Notifications.instrument("transmit_subscription_confirmation.action_cable", channel_class: self.class.name) do + connection.transmit identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:confirmation] + @subscription_confirmation_sent = true + end end end @@ -277,7 +287,10 @@ module ActionCable def transmit_subscription_rejection logger.info "#{self.class.name} is transmitting the subscription rejection" - connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:rejection]) + + ActiveSupport::Notifications.instrument("transmit_subscription_rejection.action_cable", channel_class: self.class.name) do + connection.transmit identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:rejection] + end end end end diff --git a/actioncable/lib/action_cable/channel/periodic_timers.rb b/actioncable/lib/action_cable/channel/periodic_timers.rb index b414255707..dab604440f 100644 --- a/actioncable/lib/action_cable/channel/periodic_timers.rb +++ b/actioncable/lib/action_cable/channel/periodic_timers.rb @@ -12,11 +12,42 @@ module ActionCable end module ClassMethods - # Allows you to call a private method <tt>every</tt> so often seconds. This periodic timer can be useful - # for sending a steady flow of updates to a client based off an object that was configured on subscription. - # It's an alternative to using streams if the channel is able to do the work internally. - def periodically(callback, every:) - self.periodic_timers += [ [ callback, every: every ] ] + # Periodically performs a task on the channel, like updating an online + # user counter, polling a backend for new status messages, sending + # regular "heartbeat" messages, or doing some internal work and giving + # progress updates. + # + # Pass a method name or lambda argument or provide a block to call. + # Specify the calling period in seconds using the <tt>every:</tt> + # keyword argument. + # + # periodically :transmit_progress, every: 5.seconds + # + # periodically every: 3.minutes do + # transmit action: :update_count, count: current_count + # end + # + def periodically(callback_or_method_name = nil, every:, &block) + callback = + if block_given? + raise ArgumentError, 'Pass a block or provide a callback arg, not both' if callback_or_method_name + block + else + case callback_or_method_name + when Proc + callback_or_method_name + when Symbol + -> { __send__ callback_or_method_name } + else + raise ArgumentError, "Expected a Symbol method name or a Proc, got #{callback_or_method_name.inspect}" + end + end + + unless every.kind_of?(Numeric) && every > 0 + raise ArgumentError, "Expected every: to be a positive number of seconds, got #{every.inspect}" + end + + self.periodic_timers += [[ callback, every: every ]] end end @@ -27,14 +58,21 @@ module ActionCable def start_periodic_timers self.class.periodic_timers.each do |callback, options| - active_periodic_timers << connection.server.event_loop.timer(options[:every]) do - connection.worker_pool.async_run_periodic_timer(self, callback) + active_periodic_timers << start_periodic_timer(callback, every: options.fetch(:every)) + end + end + + def start_periodic_timer(callback, every:) + connection.server.event_loop.timer every do + connection.worker_pool.async_invoke connection do + instance_exec(&callback) end end end def stop_periodic_timers active_periodic_timers.each { |timer| timer.shutdown } + active_periodic_timers.clear end end end diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index 23d7320a28..200c9d053c 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -2,7 +2,7 @@ module ActionCable module Channel # Streams allow channels to route broadcastings to the subscriber. A broadcasting is, as discussed elsewhere, a pubsub queue where any data # placed into it is automatically sent to the clients that are connected at that time. It's purely an online queue, though. If you're not - # streaming a broadcasting at the very moment it sends out an update, you will not get that update, if you connect after it has been sent. + # streaming a broadcasting at the very moment it sends out an update, you will not get that update, even if you connect after it has been sent. # # Most commonly, the streamed broadcast is sent straight to the subscriber on the client-side. The channel just acts as a connector between # the two parties (the broadcaster and the channel subscriber). Here's an example of a channel that allows subscribers to get all new @@ -46,9 +46,7 @@ module ActionCable # def subscribed # @room = Chat::Room[params[:room_number]] # - # stream_for @room, -> (encoded_message) do - # message = ActiveSupport::JSON.decode(encoded_message) - # + # stream_for @room, coder: ActiveSupport::JSON do |message| # if message['originated_at'].present? # elapsed_time = (Time.now.to_f - message['originated_at']).round(2) # @@ -71,16 +69,21 @@ module ActionCable # Start streaming from the named <tt>broadcasting</tt> pubsub queue. Optionally, you can pass a <tt>callback</tt> that'll be used # instead of the default of just transmitting the updates straight to the subscriber. - def stream_from(broadcasting, callback = nil) + # Pass `coder: ActiveSupport::JSON` to decode messages as JSON before passing to the callback. + # Defaults to `coder: nil` which does no decoding, passes raw messages. + def stream_from(broadcasting, callback = nil, coder: nil, &block) broadcasting = String(broadcasting) + # Don't send the confirmation until pubsub#subscribe is successful defer_subscription_confirmation! - callback ||= default_stream_callback(broadcasting) - streams << [ broadcasting, callback ] + # Build a stream handler by wrapping the user-provided callback with + # a decoder or defaulting to a JSON-decoding retransmitter. + handler = worker_pool_stream_handler(broadcasting, callback || block, coder: coder) + streams << [ broadcasting, handler ] connection.server.event_loop.post do - pubsub.subscribe(broadcasting, callback, lambda do + pubsub.subscribe(broadcasting, handler, lambda do transmit_subscription_confirmation logger.info "#{self.class.name} is streaming from #{broadcasting}" end) @@ -90,8 +93,11 @@ module ActionCable # Start streaming the pubsub queue for the <tt>model</tt> in this channel. Optionally, you can pass a # <tt>callback</tt> that'll be used instead of the default of just transmitting the updates straight # to the subscriber. - def stream_for(model, callback = nil) - stream_from(broadcasting_for([ channel_name, model ]), callback) + # + # Pass `coder: ActiveSupport::JSON` to decode messages as JSON before passing to the callback. + # Defaults to `coder: nil` which does no decoding, passes raw messages. + def stream_for(model, callback = nil, coder: nil, &block) + stream_from(broadcasting_for([ channel_name, model ]), callback || block, coder: coder) end # Unsubscribes all streams associated with this channel from the pubsub queue. @@ -109,11 +115,60 @@ module ActionCable @_streams ||= [] end - def default_stream_callback(broadcasting) + # Always wrap the outermost handler to invoke the user handler on the + # worker pool rather than blocking the event loop. + def worker_pool_stream_handler(broadcasting, user_handler, coder: nil) + handler = stream_handler(broadcasting, user_handler, coder: coder) + + -> message do + connection.worker_pool.async_invoke handler, :call, message, connection: connection + end + end + + # May be overridden to add instrumentation, logging, specialized error + # handling, or other forms of handler decoration. + # + # TODO: Tests demonstrating this. + def stream_handler(broadcasting, user_handler, coder: nil) + if user_handler + stream_decoder user_handler, coder: coder + else + default_stream_handler broadcasting, coder: coder + end + end + + # May be overridden to change the default stream handling behavior + # which decodes JSON and transmits to client. + # + # TODO: Tests demonstrating this. + # + # TODO: Room for optimization. Update transmit API to be coder-aware + # so we can no-op when pubsub and connection are both JSON-encoded. + # Then we can skip decode+encode if we're just proxying messages. + def default_stream_handler(broadcasting, coder:) + coder ||= ActiveSupport::JSON + stream_transmitter stream_decoder(coder: coder), broadcasting: broadcasting + end + + def stream_decoder(handler = identity_handler, coder:) + if coder + -> message { handler.(coder.decode(message)) } + else + handler + end + end + + def stream_transmitter(handler = identity_handler, broadcasting:) + via = "streamed from #{broadcasting}" + -> (message) do - transmit ActiveSupport::JSON.decode(message), via: "streamed from #{broadcasting}" + transmit handler.(message), via: via end end + + def identity_handler + -> message { message } + end end end end diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb index b4488265cb..cc4e0f8c8b 100644 --- a/actioncable/lib/action_cable/connection/base.rb +++ b/actioncable/lib/action_cable/connection/base.rb @@ -40,7 +40,7 @@ module ActionCable # Second, we rely on the fact that the WebSocket connection is established with the cookies from the domain being sent along. This makes # it easy to use signed cookies that were set when logging in via a web interface to authorize the WebSocket connection. # - # Finally, we add a tag to the connection-specific logger with name of the current user to easily distinguish their messages in the log. + # Finally, we add a tag to the connection-specific logger with the name of the current user to easily distinguish their messages in the log. # # Pretty simple, eh? class Base @@ -48,11 +48,11 @@ module ActionCable include InternalChannel include Authorization - attr_reader :server, :env, :subscriptions, :logger, :worker_pool + attr_reader :server, :env, :subscriptions, :logger, :worker_pool, :protocol delegate :event_loop, :pubsub, to: :server - def initialize(server, env) - @server, @env = server, env + def initialize(server, env, coder: ActiveSupport::JSON) + @server, @env, @coder = server, env, coder @worker_pool = server.worker_pool @logger = new_tagged_logger @@ -67,7 +67,7 @@ module ActionCable # Called by the server when a new WebSocket connection is established. This configures the callbacks intended for overwriting by the user. # This method should not be called directly -- instead rely upon on the #connect (and #disconnect) callbacks. - def process # :nodoc: + def process #:nodoc: logger.info started_request_message if websocket.possible? && allow_request_origin? @@ -77,20 +77,22 @@ module ActionCable end end - # Data received over the WebSocket connection is handled by this method. It's expected that everything inbound is JSON encoded. - # The data is routed to the proper channel that the connection has subscribed to. - def receive(data_in_json) + # Decodes WebSocket messages and dispatches them to subscribed channels. + # WebSocket message transfer encoding is always JSON. + def receive(websocket_message) #:nodoc: + send_async :dispatch_websocket_message, websocket_message + end + + def dispatch_websocket_message(websocket_message) #:nodoc: if websocket.alive? - subscriptions.execute_command ActiveSupport::JSON.decode(data_in_json) + subscriptions.execute_command decode(websocket_message) else - logger.error "Received data without a live WebSocket (#{data_in_json.inspect})" + logger.error "Ignoring message processed after the WebSocket was closed: #{websocket_message.inspect})" end end - # Send raw data straight back down the WebSocket. This is not intended to be called directly. Use the #transmit available on the - # Channel instead, as that'll automatically address the correct subscriber and wrap the message in JSON. - def transmit(data) # :nodoc: - websocket.transmit data + def transmit(cable_message) # :nodoc: + websocket.transmit encode(cable_message) end # Close the WebSocket connection. @@ -115,7 +117,7 @@ module ActionCable end def beat - transmit ActiveSupport::JSON.encode(type: ActionCable::INTERNAL[:message_types][:ping], message: Time.now.to_i) + transmit type: ActionCable::INTERNAL[:message_types][:ping], message: Time.now.to_i end def on_open # :nodoc: @@ -152,7 +154,16 @@ module ActionCable attr_reader :message_buffer private + def encode(cable_message) + @coder.encode cable_message + end + + def decode(websocket_message) + @coder.decode websocket_message + end + def handle_open + @protocol = websocket.protocol connect if respond_to?(:connect) subscribe_to_internal_channel send_welcome_message @@ -178,7 +189,7 @@ module ActionCable # Send welcome message to the internal connection monitor channel. # This ensures the connection monitor state is reset after a successful # websocket connection. - transmit ActiveSupport::JSON.encode(type: ActionCable::INTERNAL[:message_types][:welcome]) + transmit type: ActionCable::INTERNAL[:message_types][:welcome] end def allow_request_origin? diff --git a/actioncable/lib/action_cable/connection/client_socket.rb b/actioncable/lib/action_cable/connection/client_socket.rb index 9e4dbcd6e6..6f29f32ea9 100644 --- a/actioncable/lib/action_cable/connection/client_socket.rb +++ b/actioncable/lib/action_cable/connection/client_socket.rb @@ -29,7 +29,7 @@ module ActionCable attr_reader :env, :url - def initialize(env, event_target, event_loop) + def initialize(env, event_target, event_loop, protocols) @env = env @event_target = event_target @event_loop = event_loop @@ -42,7 +42,7 @@ module ActionCable @ready_state = CONNECTING # The driver calls +env+, +url+, and +write+ - @driver = ::WebSocket::Driver.rack(self) + @driver = ::WebSocket::Driver.rack(self, protocols: protocols) @driver.on(:open) { |e| open } @driver.on(:message) { |e| receive_message(e.data) } @@ -71,6 +71,8 @@ module ActionCable def write(data) @stream.write(data) + rescue => e + emit_error e.message end def transmit(message) @@ -109,6 +111,10 @@ module ActionCable @ready_state == OPEN end + def protocol + @driver.protocol + end + private def open return unless @ready_state == CONNECTING diff --git a/actioncable/lib/action_cable/connection/faye_client_socket.rb b/actioncable/lib/action_cable/connection/faye_client_socket.rb index c9139b6858..a4bfe7db17 100644 --- a/actioncable/lib/action_cable/connection/faye_client_socket.rb +++ b/actioncable/lib/action_cable/connection/faye_client_socket.rb @@ -3,9 +3,10 @@ require 'faye/websocket' module ActionCable module Connection class FayeClientSocket - def initialize(env, event_target, stream_event_loop) + def initialize(env, event_target, stream_event_loop, protocols) @env = env @event_target = event_target + @protocols = protocols @faye = nil end @@ -23,6 +24,10 @@ module ActionCable @faye && @faye.close end + def protocol + @faye && @faye.protocol + end + def rack_response connect @faye.rack_response @@ -31,11 +36,12 @@ module ActionCable private def connect return if @faye - @faye = Faye::WebSocket.new(@env) + @faye = Faye::WebSocket.new(@env, @protocols) @faye.on(:open) { |event| @event_target.on_open } @faye.on(:message) { |event| @event_target.on_message(event.data) } @faye.on(:close) { |event| @event_target.on_close(event.reason, event.code) } + @faye.on(:error) { |event| @event_target.on_error(event.message) } end end end diff --git a/actioncable/lib/action_cable/connection/faye_event_loop.rb b/actioncable/lib/action_cable/connection/faye_event_loop.rb index 8b70f3d84e..9c44b38bc3 100644 --- a/actioncable/lib/action_cable/connection/faye_event_loop.rb +++ b/actioncable/lib/action_cable/connection/faye_event_loop.rb @@ -36,7 +36,7 @@ module ActionCable end def shutdown - inner.cancel + @inner.cancel end end end diff --git a/actioncable/lib/action_cable/connection/internal_channel.rb b/actioncable/lib/action_cable/connection/internal_channel.rb index 3c5d39f59a..f70d52f99b 100644 --- a/actioncable/lib/action_cable/connection/internal_channel.rb +++ b/actioncable/lib/action_cable/connection/internal_channel.rb @@ -11,7 +11,7 @@ module ActionCable def subscribe_to_internal_channel if connection_identifier.present? - callback = -> (message) { process_internal_message(message) } + callback = -> (message) { process_internal_message decode(message) } @_internal_subscriptions ||= [] @_internal_subscriptions << [ internal_channel, callback ] @@ -27,8 +27,6 @@ module ActionCable end def process_internal_message(message) - message = ActiveSupport::JSON.decode(message) - case message['type'] when 'disconnect' logger.info "Removing connection (#{connection_identifier})" diff --git a/actioncable/lib/action_cable/connection/message_buffer.rb b/actioncable/lib/action_cable/connection/message_buffer.rb index 19f2e6e918..6a80770cae 100644 --- a/actioncable/lib/action_cable/connection/message_buffer.rb +++ b/actioncable/lib/action_cable/connection/message_buffer.rb @@ -30,7 +30,7 @@ module ActionCable protected attr_reader :connection - attr_accessor :buffered_messages + attr_reader :buffered_messages private def valid?(message) @@ -38,7 +38,7 @@ module ActionCable end def receive(message) - connection.send_async :receive, message + connection.receive message end def buffer(message) diff --git a/actioncable/lib/action_cable/connection/stream.rb b/actioncable/lib/action_cable/connection/stream.rb index 2d97b28c09..0cf59091bc 100644 --- a/actioncable/lib/action_cable/connection/stream.rb +++ b/actioncable/lib/action_cable/connection/stream.rb @@ -29,7 +29,7 @@ module ActionCable def write(data) return @rack_hijack_io.write(data) if @rack_hijack_io return @stream_send.call(data) if @stream_send - rescue EOFError + rescue EOFError, Errno::ECONNRESET @socket_object.client_gone end diff --git a/actioncable/lib/action_cable/connection/subscriptions.rb b/actioncable/lib/action_cable/connection/subscriptions.rb index 5aa907c2d3..3742f248d1 100644 --- a/actioncable/lib/action_cable/connection/subscriptions.rb +++ b/actioncable/lib/action_cable/connection/subscriptions.rb @@ -23,13 +23,13 @@ module ActionCable end def add(data) - id_options = decode_hash(data['identifier']) - identifier = normalize_identifier(id_options) + id_key = data['identifier'] + id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access subscription_klass = connection.server.channel_classes[id_options[:channel]] if subscription_klass - subscriptions[identifier] ||= subscription_klass.new(connection, identifier, id_options) + subscriptions[id_key] ||= subscription_klass.new(connection, id_key, id_options) else logger.error "Subscription class not found (#{data.inspect})" end @@ -37,7 +37,7 @@ module ActionCable def remove(data) logger.info "Unsubscribing from channel: #{data['identifier']}" - remove_subscription subscriptions[normalize_identifier(data['identifier'])] + remove_subscription subscriptions[data['identifier']] end def remove_subscription(subscription) @@ -46,7 +46,7 @@ module ActionCable end def perform_action(data) - find(data).perform_action(decode_hash(data['data'])) + find(data).perform_action ActiveSupport::JSON.decode(data['data']) end def identifiers @@ -63,21 +63,8 @@ module ActionCable private delegate :logger, to: :connection - def normalize_identifier(identifier) - identifier = ActiveSupport::JSON.encode(identifier) if identifier.is_a?(Hash) - identifier - end - - # If `data` is a Hash, this means that the original JSON - # sent by the client had no backslashes in it, and does - # not need to be decoded again. - def decode_hash(data) - data = ActiveSupport::JSON.decode(data) unless data.is_a?(Hash) - data.with_indifferent_access - end - def find(data) - if subscription = subscriptions[normalize_identifier(data['identifier'])] + if subscription = subscriptions[data['identifier']] subscription else raise "Unable to find subscription with identifier: #{data['identifier']}" diff --git a/actioncable/lib/action_cable/connection/web_socket.rb b/actioncable/lib/action_cable/connection/web_socket.rb index 0bec9b6a96..11f28c37e8 100644 --- a/actioncable/lib/action_cable/connection/web_socket.rb +++ b/actioncable/lib/action_cable/connection/web_socket.rb @@ -4,8 +4,8 @@ module ActionCable module Connection # Wrap the real socket to minimize the externally-presented API class WebSocket - def initialize(env, event_target, event_loop, client_socket_class) - @websocket = ::WebSocket::Driver.websocket?(env) ? client_socket_class.new(env, event_target, event_loop) : nil + def initialize(env, event_target, event_loop, client_socket_class, protocols: ActionCable::INTERNAL[:protocols]) + @websocket = ::WebSocket::Driver.websocket?(env) ? client_socket_class.new(env, event_target, event_loop, protocols) : nil end def possible? @@ -24,6 +24,10 @@ module ActionCable websocket.close end + def protocol + websocket.protocol + end + def rack_response websocket.rack_response end diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb index 778f5ffeed..b1a0e11631 100644 --- a/actioncable/lib/action_cable/server/base.rb +++ b/actioncable/lib/action_cable/server/base.rb @@ -52,7 +52,17 @@ module ActionCable @event_loop || @mutex.synchronize { @event_loop ||= config.event_loop_class.new } end - # The thread worker pool for handling all the connection work on this server. Default size is set by config.worker_pool_size. + # The worker pool is where we run connection callbacks and channel actions. We do as little as possible on the server's main thread. + # The worker pool is an executor service that's backed by a pool of threads working from a task queue. The thread pool size maxes out + # at 4 worker threads by default. Tune the size yourself with config.action_cable.worker_pool_size. + # + # Using Active Record, Redis, etc within your channel actions means you'll get a separate connection from each thread in the worker pool. + # Plan your deployment accordingly: 5 servers each running 5 Puma workers each running an 8-thread worker pool means at least 200 database + # connections. + # + # Also, ensure that your database connection pool size is as least as large as your worker pool size. Otherwise, workers may oversubscribe + # the db connection pool and block while they wait for other workers to release their connections. Use a smaller worker pool or a larger + # db connection pool instead. def worker_pool @worker_pool || @mutex.synchronize { @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size) } end diff --git a/actioncable/lib/action_cable/server/broadcasting.rb b/actioncable/lib/action_cable/server/broadcasting.rb index 98025f27f2..8f93564113 100644 --- a/actioncable/lib/action_cable/server/broadcasting.rb +++ b/actioncable/lib/action_cable/server/broadcasting.rb @@ -19,27 +19,28 @@ module ActionCable # new Notification data['title'], body: data['body'] module Broadcasting # Broadcast a hash directly to a named <tt>broadcasting</tt>. This will later be JSON encoded. - def broadcast(broadcasting, message) - broadcaster_for(broadcasting).broadcast(message) + def broadcast(broadcasting, message, coder: ActiveSupport::JSON) + broadcaster_for(broadcasting, coder: coder).broadcast(message) end # Returns a broadcaster for a named <tt>broadcasting</tt> that can be reused. Useful when you have an object that # may need multiple spots to transmit to a specific broadcasting over and over. - def broadcaster_for(broadcasting) - Broadcaster.new(self, String(broadcasting)) + def broadcaster_for(broadcasting, coder: ActiveSupport::JSON) + Broadcaster.new(self, String(broadcasting), coder: coder) end private class Broadcaster - attr_reader :server, :broadcasting + attr_reader :server, :broadcasting, :coder - def initialize(server, broadcasting) - @server, @broadcasting = server, broadcasting + def initialize(server, broadcasting, coder:) + @server, @broadcasting, @coder = server, broadcasting, coder end def broadcast(message) - server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message}" - server.pubsub.broadcast broadcasting, ActiveSupport::JSON.encode(message) + server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message.inspect}" + encoded = coder ? coder.encode(message) : message + server.pubsub.broadcast broadcasting, encoded end end end diff --git a/actioncable/lib/action_cable/server/configuration.rb b/actioncable/lib/action_cable/server/configuration.rb index 5fe71caed2..0bb378cf03 100644 --- a/actioncable/lib/action_cable/server/configuration.rb +++ b/actioncable/lib/action_cable/server/configuration.rb @@ -14,7 +14,7 @@ module ActionCable @log_tags = [] @connection_class = ActionCable::Connection::Base - @worker_pool_size = 100 + @worker_pool_size = 4 @disable_request_forgery_protection = false end diff --git a/actioncable/lib/action_cable/server/worker.rb b/actioncable/lib/action_cable/server/worker.rb index 49cbaec0c0..a638ff72e7 100644 --- a/actioncable/lib/action_cable/server/worker.rb +++ b/actioncable/lib/action_cable/server/worker.rb @@ -12,8 +12,10 @@ module ActionCable define_callbacks :work include ActiveRecordConnectionManagement + attr_reader :executor + def initialize(max_size: 5) - @pool = Concurrent::ThreadPoolExecutor.new( + @executor = Concurrent::ThreadPoolExecutor.new( min_threads: 1, max_threads: max_size, max_queue: 0, @@ -23,11 +25,11 @@ module ActionCable # Stop processing work: any work that has not already started # running will be discarded from the queue def halt - @pool.kill + @executor.kill end def stopping? - @pool.shuttingdown? + @executor.shuttingdown? end def work(connection) @@ -40,14 +42,14 @@ module ActionCable self.connection = nil end - def async_invoke(receiver, method, *args) - @pool.post do - invoke(receiver, method, *args) + def async_invoke(receiver, method, *args, connection: receiver) + @executor.post do + invoke(receiver, method, *args, connection: connection) end end - def invoke(receiver, method, *args) - work(receiver) do + def invoke(receiver, method, *args, connection:) + work(connection) do begin receiver.send method, *args rescue Exception => e @@ -59,18 +61,6 @@ module ActionCable end end - def async_run_periodic_timer(channel, callback) - @pool.post do - run_periodic_timer(channel, callback) - end - end - - def run_periodic_timer(channel, callback) - work(channel.connection) do - callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback) - end - end - private def logger diff --git a/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb index e6c862959b..256876cf30 100644 --- a/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb @@ -51,7 +51,7 @@ module ActionCable @redis_connection_for_subscriptions || @server.mutex.synchronize do @redis_connection_for_subscriptions ||= self.class.em_redis_connector.call(@server.config.cable).tap do |redis| redis.on(:reconnect_failed) do - @logger.info "[ActionCable] Redis reconnect failed." + @logger.error "[ActionCable] Redis reconnect failed." end end end diff --git a/actioncable/lib/rails/generators/channel/channel_generator.rb b/actioncable/lib/rails/generators/channel/channel_generator.rb index d89ab45816..05fd21a954 100644 --- a/actioncable/lib/rails/generators/channel/channel_generator.rb +++ b/actioncable/lib/rails/generators/channel/channel_generator.rb @@ -13,6 +13,9 @@ module Rails template "channel.rb", File.join('app/channels', class_path, "#{file_name}_channel.rb") if options[:assets] + if self.behavior == :invoke + template "assets/cable.js", "app/assets/javascripts/cable.js" + end template "assets/channel.coffee", File.join('app/assets/javascripts/channels', class_path, "#{file_name}.coffee") end diff --git a/actioncable/lib/rails/generators/channel/templates/assets/cable.js b/actioncable/lib/rails/generators/channel/templates/assets/cable.js new file mode 100644 index 0000000000..71ee1e66de --- /dev/null +++ b/actioncable/lib/rails/generators/channel/templates/assets/cable.js @@ -0,0 +1,13 @@ +// Action Cable provides the framework to deal with WebSockets in Rails. +// You can generate new channels where WebSocket features live using the rails generate channel command. +// +//= require action_cable +//= require_self +//= require_tree ./channels + +(function() { + this.App || (this.App = {}); + + App.cable = ActionCable.createConsumer(); + +}).call(this); diff --git a/actioncable/test/channel/base_test.rb b/actioncable/test/channel/base_test.rb index d41bf3064b..daa782eeb3 100644 --- a/actioncable/test/channel/base_test.rb +++ b/actioncable/test/channel/base_test.rb @@ -146,12 +146,12 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase test "transmitting data" do @channel.perform_action 'action' => :get_latest - expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "message" => { "data" => "latest" } + expected = { "identifier" => "{id: 1}", "message" => { "data" => "latest" }} assert_equal expected, @connection.last_transmission end test "subscription confirmation" do - expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "confirm_subscription" + expected = { "identifier" => "{id: 1}", "type" => "confirm_subscription" } assert_equal expected, @connection.last_transmission end @@ -166,6 +166,81 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase end end + test "notification for perform_action" do + begin + events = [] + ActiveSupport::Notifications.subscribe "perform_action.action_cable" do |*args| + events << ActiveSupport::Notifications::Event.new(*args) + end + + data = {'action' => :speak, 'content' => 'hello'} + @channel.perform_action data + + assert_equal 1, events.length + assert_equal 'perform_action.action_cable', events[0].name + assert_equal 'ActionCable::Channel::BaseTest::ChatChannel', events[0].payload[:channel_class] + assert_equal :speak, events[0].payload[:action] + assert_equal data, events[0].payload[:data] + ensure + ActiveSupport::Notifications.unsubscribe "perform_action.action_cable" + end + end + + test "notification for transmit" do + begin + events = [] + ActiveSupport::Notifications.subscribe 'transmit.action_cable' do |*args| + events << ActiveSupport::Notifications::Event.new(*args) + end + + @channel.perform_action 'action' => :get_latest + expected_data = {data: 'latest'} + + assert_equal 1, events.length + assert_equal 'transmit.action_cable', events[0].name + assert_equal 'ActionCable::Channel::BaseTest::ChatChannel', events[0].payload[:channel_class] + assert_equal expected_data, events[0].payload[:data] + assert_nil events[0].payload[:via] + ensure + ActiveSupport::Notifications.unsubscribe 'transmit.action_cable' + end + end + + test "notification for transmit_subscription_confirmation" do + begin + events = [] + ActiveSupport::Notifications.subscribe 'transmit_subscription_confirmation.action_cable' do |*args| + events << ActiveSupport::Notifications::Event.new(*args) + end + + @channel.stubs(:subscription_confirmation_sent?).returns(false) + @channel.send(:transmit_subscription_confirmation) + + assert_equal 1, events.length + assert_equal 'transmit_subscription_confirmation.action_cable', events[0].name + assert_equal 'ActionCable::Channel::BaseTest::ChatChannel', events[0].payload[:channel_class] + ensure + ActiveSupport::Notifications.unsubscribe 'transmit_subscription_confirmation.action_cable' + end + end + + test "notification for transmit_subscription_rejection" do + begin + events = [] + ActiveSupport::Notifications.subscribe 'transmit_subscription_rejection.action_cable' do |*args| + events << ActiveSupport::Notifications::Event.new(*args) + end + + @channel.send(:transmit_subscription_rejection) + + assert_equal 1, events.length + assert_equal 'transmit_subscription_rejection.action_cable', events[0].name + assert_equal 'ActionCable::Channel::BaseTest::ChatChannel', events[0].payload[:channel_class] + ensure + ActiveSupport::Notifications.unsubscribe 'transmit_subscription_rejection.action_cable' + end + end + private def assert_logged(message) old_logger = @connection.logger diff --git a/actioncable/test/channel/periodic_timers_test.rb b/actioncable/test/channel/periodic_timers_test.rb index e6f0c14c9d..03464003cf 100644 --- a/actioncable/test/channel/periodic_timers_test.rb +++ b/actioncable/test/channel/periodic_timers_test.rb @@ -1,12 +1,21 @@ require 'test_helper' require 'stubs/test_connection' require 'stubs/room' +require 'active_support/time' class ActionCable::Channel::PeriodicTimersTest < ActiveSupport::TestCase class ChatChannel < ActionCable::Channel::Base - periodically -> { ping }, every: 5 + # Method name arg periodically :send_updates, every: 1 + # Proc arg + periodically -> { ping }, every: 2 + + # Block arg + periodically every: 3 do + ping + end + private def ping end @@ -19,22 +28,41 @@ class ActionCable::Channel::PeriodicTimersTest < ActiveSupport::TestCase test "periodic timers definition" do timers = ChatChannel.periodic_timers - assert_equal 2, timers.size + assert_equal 3, timers.size - first_timer = timers[0] - assert_kind_of Proc, first_timer[0] - assert_equal 5, first_timer[1][:every] + timers.each_with_index do |timer, i| + assert_kind_of Proc, timer[0] + assert_equal i+1, timer[1][:every] + end + end - second_timer = timers[1] - assert_equal :send_updates, second_timer[0] - assert_equal 1, second_timer[1][:every] + test 'disallow negative and zero periods' do + [ 0, 0.0, 0.seconds, -1, -1.seconds, 'foo', :foo, Object.new ].each do |invalid| + assert_raise ArgumentError, /Expected every:/ do + ChatChannel.periodically :send_updates, every: invalid + end + end + end + + test 'disallow block and arg together' do + assert_raise ArgumentError, /not both/ do + ChatChannel.periodically(:send_updates, every: 1) { ping } + end + end + + test 'disallow unknown args' do + [ 'send_updates', Object.new, nil ].each do |invalid| + assert_raise ArgumentError, /Expected a Symbol/ do + ChatChannel.periodically invalid, every: 1 + end + end end test "timer start and stop" do - @connection.server.event_loop.expects(:timer).times(2).returns(true) + @connection.server.event_loop.expects(:timer).times(3).returns(stub(shutdown: nil)) channel = ChatChannel.new @connection, "{id: 1}", { id: 1 } - channel.expects(:stop_periodic_timers).once channel.unsubscribe_from_channel + assert_equal [], channel.send(:active_periodic_timers) end end diff --git a/actioncable/test/channel/rejection_test.rb b/actioncable/test/channel/rejection_test.rb index aa93396d44..15db57d6ba 100644 --- a/actioncable/test/channel/rejection_test.rb +++ b/actioncable/test/channel/rejection_test.rb @@ -18,7 +18,7 @@ class ActionCable::Channel::RejectionTest < ActiveSupport::TestCase @connection.expects(:subscriptions).returns mock().tap { |m| m.expects(:remove_subscription).with instance_of(SecretChannel) } @channel = SecretChannel.new @connection, "{id: 1}", { id: 1 } - expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "reject_subscription" + expected = { "identifier" => "{id: 1}", "type" => "reject_subscription" } assert_equal expected, @connection.last_transmission end diff --git a/actioncable/test/channel/stream_test.rb b/actioncable/test/channel/stream_test.rb index 526ea92e4f..0b0c72ccf6 100644 --- a/actioncable/test/channel/stream_test.rb +++ b/actioncable/test/channel/stream_test.rb @@ -2,18 +2,43 @@ require 'test_helper' require 'stubs/test_connection' require 'stubs/room' -class ActionCable::Channel::StreamTest < ActionCable::TestCase +module ActionCable::StreamTests + class Connection < ActionCable::Connection::Base + attr_reader :websocket + + def send_async(method, *args) + send method, *args + end + end + class ChatChannel < ActionCable::Channel::Base def subscribed if params[:id] @room = Room.new params[:id] - stream_from "test_room_#{@room.id}" + stream_from "test_room_#{@room.id}", coder: pick_coder(params[:coder]) end end def send_confirmation transmit_subscription_confirmation end + + private def pick_coder(coder) + case coder + when nil, 'json' + ActiveSupport::JSON + when 'custom' + DummyEncoder + when 'none' + nil + end + end + end + + module DummyEncoder + extend self + def encode(*) '{ "foo": "encoded" }' end + def decode(*) { foo: 'decoded' } end end class SymbolChannel < ActionCable::Channel::Base @@ -22,69 +47,137 @@ class ActionCable::Channel::StreamTest < ActionCable::TestCase end end - test "streaming start and stop" do - run_in_eventmachine do - connection = TestConnection.new - connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("test_room_1", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } - channel = ChatChannel.new connection, "{id: 1}", { id: 1 } + class StreamTest < ActionCable::TestCase + test "streaming start and stop" do + run_in_eventmachine do + connection = TestConnection.new + connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("test_room_1", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } + channel = ChatChannel.new connection, "{id: 1}", { id: 1 } - connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe) } - channel.unsubscribe_from_channel + connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe) } + channel.unsubscribe_from_channel + end end - end - test "stream from non-string channel" do - run_in_eventmachine do - connection = TestConnection.new - connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("channel", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } - channel = SymbolChannel.new connection, "" + test "stream from non-string channel" do + run_in_eventmachine do + connection = TestConnection.new + connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("channel", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } + channel = SymbolChannel.new connection, "" - connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe) } - channel.unsubscribe_from_channel + connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe) } + channel.unsubscribe_from_channel + end end - end - test "stream_for" do - run_in_eventmachine do - connection = TestConnection.new - connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } + test "stream_for" do + run_in_eventmachine do + connection = TestConnection.new + connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:stream_tests:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } - channel = ChatChannel.new connection, "" - channel.stream_for Room.new(1) + channel = ChatChannel.new connection, "" + channel.stream_for Room.new(1) + end end - end - test "stream_from subscription confirmation" do - run_in_eventmachine do - connection = TestConnection.new + test "stream_from subscription confirmation" do + run_in_eventmachine do + connection = TestConnection.new - ChatChannel.new connection, "{id: 1}", { id: 1 } - assert_nil connection.last_transmission + ChatChannel.new connection, "{id: 1}", { id: 1 } + assert_nil connection.last_transmission - wait_for_async + wait_for_async - expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "confirm_subscription" - connection.transmit(expected) + confirmation = { "identifier" => "{id: 1}", "type" => "confirm_subscription" } + connection.transmit(confirmation) - assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation within 0.1s" + assert_equal confirmation, connection.last_transmission, "Did not receive subscription confirmation within 0.1s" + end end - end - test "subscription confirmation should only be sent out once" do - run_in_eventmachine do - connection = TestConnection.new + test "subscription confirmation should only be sent out once" do + run_in_eventmachine do + connection = TestConnection.new + + channel = ChatChannel.new connection, "test_channel" + channel.send_confirmation + channel.send_confirmation + + wait_for_async - channel = ChatChannel.new connection, "test_channel" - channel.send_confirmation - channel.send_confirmation + expected = { "identifier" => "test_channel", "type" => "confirm_subscription" } + assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation" - wait_for_async + assert_equal 1, connection.transmissions.size + end + end + end - expected = ActiveSupport::JSON.encode "identifier" => "test_channel", "type" => "confirm_subscription" - assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation" + require 'action_cable/subscription_adapter/inline' - assert_equal 1, connection.transmissions.size + class UserCallbackChannel < ActionCable::Channel::Base + def subscribed + stream_from :channel do + Thread.current[:ran_callback] = true + end end end + class StreamEncodingTest < ActionCable::TestCase + setup do + @server = TestServer.new(subscription_adapter: ActionCable::SubscriptionAdapter::Inline) + @server.config.allowed_request_origins = %w( http://rubyonrails.com ) + @server.stubs(:channel_classes).returns( + ChatChannel.name => ChatChannel, + UserCallbackChannel.name => UserCallbackChannel, + ) + end + + test 'custom encoder' do + run_in_eventmachine do + connection = open_connection + subscribe_to connection, identifiers: { id: 1 } + + connection.websocket.expects(:transmit) + @server.broadcast 'test_room_1', { foo: 'bar' }, coder: DummyEncoder + wait_for_async + wait_for_executor connection.server.worker_pool.executor + end + end + + test "user supplied callbacks are run through the worker pool" do + run_in_eventmachine do + connection = open_connection + receive(connection, command: 'subscribe', channel: UserCallbackChannel.name, identifiers: { id: 1 }) + + @server.broadcast 'channel', {} + wait_for_async + refute Thread.current[:ran_callback], "User callback was not run through the worker pool" + end + end + + private + def subscribe_to(connection, identifiers:) + receive connection, command: 'subscribe', identifiers: identifiers + end + + def open_connection + env = Rack::MockRequest.env_for '/test', 'HTTP_HOST' => 'localhost', 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket', 'HTTP_ORIGIN' => 'http://rubyonrails.com' + + Connection.new(@server, env).tap do |connection| + connection.process + assert connection.websocket.possible? + + wait_for_async + assert connection.websocket.alive? + end + end + + def receive(connection, command:, identifiers:, channel: 'ActionCable::StreamTests::ChatChannel') + identifier = JSON.generate(channel: channel, **identifiers) + connection.dispatch_websocket_message JSON.generate(command: command, identifier: identifier) + wait_for_async + end + end end diff --git a/actioncable/test/client_test.rb b/actioncable/test/client_test.rb index 5f5c09d1a1..fe503fd703 100644 --- a/actioncable/test/client_test.rb +++ b/actioncable/test/client_test.rb @@ -75,11 +75,11 @@ class ClientTest < ActionCable::TestCase end @ws.on(:message) do |event| - hash = JSON.parse(event.data) - if hash['type'] == 'ping' + message = JSON.parse(event.data) + if message['type'] == 'ping' @pings += 1 else - @messages << hash + @messages << message @has_messages.release end end @@ -116,8 +116,8 @@ class ClientTest < ActionCable::TestCase list end - def send_message(hash) - @ws.send(JSON.dump(hash)) + def send_message(message) + @ws.send(JSON.generate(message)) end def close @@ -148,9 +148,9 @@ class ClientTest < ActionCable::TestCase with_puma_server do |port| c = faye_client(port) assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack - c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel') + c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel') assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message) - c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello') + c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'ding', message: 'hello') assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "message"=>{"dong"=>"hello"}}, c.read_message) c.close end @@ -165,12 +165,12 @@ class ClientTest < ActionCable::TestCase clients.map {|c| Concurrent::Future.execute { assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack - c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel') + c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel') assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "type"=>"confirm_subscription"}, c.read_message) - c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello') + c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'ding', message: 'hello') assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message) barrier_1.wait WAIT_WHEN_EXPECTING_EVENT - c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'bulk', message: 'hello') + c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'bulk', message: 'hello') barrier_2.wait WAIT_WHEN_EXPECTING_EVENT assert_equal clients.size, c.read_messages(clients.size).size } }.each(&:wait!) @@ -185,9 +185,9 @@ class ClientTest < ActionCable::TestCase clients.map {|c| Concurrent::Future.execute { assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack - c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel') + c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel') assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "type"=>"confirm_subscription"}, c.read_message) - c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello') + c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'ding', message: 'hello') assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message) } }.each(&:wait!) @@ -199,16 +199,16 @@ class ClientTest < ActionCable::TestCase with_puma_server do |port| c = faye_client(port) assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack - c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel') + c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel') assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message) - c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'delay', message: 'hello') + c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'delay', message: 'hello') c.close # disappear before write c = faye_client(port) assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack - c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel') + c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel') assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message) - c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello') + c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'ding', message: 'hello') assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message) c.close # disappear before read end @@ -217,7 +217,7 @@ class ClientTest < ActionCable::TestCase def test_unsubscribe_client with_puma_server do |port| app = ActionCable.server - identifier = JSON.dump(channel: 'EchoChannel') + identifier = JSON.generate(channel: 'EchoChannel') c = faye_client(port) assert_equal({"type" => "welcome"}, c.read_message) @@ -226,7 +226,9 @@ class ClientTest < ActionCable::TestCase assert_equal(1, app.connections.count) assert(app.remote_connections.where(identifier: identifier)) - channel = app.connections.first.subscriptions.send(:subscriptions).first[1] + subscriptions = app.connections.first.subscriptions.send(:subscriptions) + assert_not_equal 0, subscriptions.size, 'Missing EchoChannel subscription' + channel = subscriptions.first[1] channel.expects(:unsubscribed) c.close sleep 0.1 # Data takes a moment to process @@ -240,7 +242,7 @@ class ClientTest < ActionCable::TestCase with_puma_server do |port| c = faye_client(port) assert_equal({"type" => "welcome"}, c.read_message) - c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel') + c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel') assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message) ActionCable.server.restart diff --git a/actioncable/test/connection/client_socket_test.rb b/actioncable/test/connection/client_socket_test.rb new file mode 100644 index 0000000000..4af071b4da --- /dev/null +++ b/actioncable/test/connection/client_socket_test.rb @@ -0,0 +1,64 @@ +require 'test_helper' +require 'stubs/test_server' + +class ActionCable::Connection::ClientSocketTest < ActionCable::TestCase + class Connection < ActionCable::Connection::Base + attr_reader :connected, :websocket, :errors + + def initialize(*) + super + @errors = [] + end + + def connect + @connected = true + end + + def disconnect + @connected = false + end + + def send_async(method, *args) + send method, *args + end + + def on_error(message) + @errors << message + end + end + + setup do + @server = TestServer.new + @server.config.allowed_request_origins = %w( http://rubyonrails.com ) + end + + test 'delegate socket errors to on_error handler' do + skip if ENV['FAYE'].present? + + run_in_eventmachine do + connection = open_connection + + # Internal hax = :( + client = connection.websocket.send(:websocket) + client.instance_variable_get('@stream').expects(:write).raises('foo') + client.expects(:client_gone).never + + client.write('boo') + assert_equal %w[ foo ], connection.errors + end + end + + private + def open_connection + env = Rack::MockRequest.env_for '/test', + 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket', + 'HTTP_HOST' => 'localhost', 'HTTP_ORIGIN' => 'http://rubyonrails.com' + env['rack.hijack'] = -> { env['rack.hijack_io'] = StringIO.new } + + Connection.new(@server, env).tap do |connection| + connection.process + connection.send :handle_open + assert connection.connected + end + end +end diff --git a/actioncable/test/connection/identifier_test.rb b/actioncable/test/connection/identifier_test.rb index c3d5f1f90b..b48d9af809 100644 --- a/actioncable/test/connection/identifier_test.rb +++ b/actioncable/test/connection/identifier_test.rb @@ -40,8 +40,7 @@ class ActionCable::Connection::IdentifierTest < ActionCable::TestCase open_connection_with_stubbed_pubsub @connection.websocket.expects(:close) - message = ActiveSupport::JSON.encode('type' => 'disconnect') - @connection.process_internal_message message + @connection.process_internal_message 'type' => 'disconnect' end end @@ -50,8 +49,7 @@ class ActionCable::Connection::IdentifierTest < ActionCable::TestCase open_connection_with_stubbed_pubsub @connection.websocket.expects(:close).never - message = ActiveSupport::JSON.encode('type' => 'unknown') - @connection.process_internal_message message + @connection.process_internal_message 'type' => 'unknown' end end diff --git a/actioncable/test/connection/stream_test.rb b/actioncable/test/connection/stream_test.rb new file mode 100644 index 0000000000..a7a61d8d6f --- /dev/null +++ b/actioncable/test/connection/stream_test.rb @@ -0,0 +1,66 @@ +require 'test_helper' +require 'stubs/test_server' + +class ActionCable::Connection::StreamTest < ActionCable::TestCase + class Connection < ActionCable::Connection::Base + attr_reader :connected, :websocket, :errors + + def initialize(*) + super + @errors = [] + end + + def connect + @connected = true + end + + def disconnect + @connected = false + end + + def send_async(method, *args) + send method, *args + end + + def on_error(message) + @errors << message + end + end + + setup do + @server = TestServer.new + @server.config.allowed_request_origins = %w( http://rubyonrails.com ) + end + + [ EOFError, Errno::ECONNRESET ].each do |closed_exception| + test "closes socket on #{closed_exception}" do + skip if ENV['FAYE'].present? + + run_in_eventmachine do + connection = open_connection + + # Internal hax = :( + client = connection.websocket.send(:websocket) + client.instance_variable_get('@stream').instance_variable_get('@rack_hijack_io').expects(:write).raises(closed_exception, 'foo') + client.expects(:client_gone) + + client.write('boo') + assert_equal [], connection.errors + end + end + end + + private + def open_connection + env = Rack::MockRequest.env_for '/test', + 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket', + 'HTTP_HOST' => 'localhost', 'HTTP_ORIGIN' => 'http://rubyonrails.com' + env['rack.hijack'] = -> { env['rack.hijack_io'] = StringIO.new } + + Connection.new(@server, env).tap do |connection| + connection.process + connection.send :handle_open + assert connection.connected + end + end +end diff --git a/actioncable/test/connection/subscriptions_test.rb b/actioncable/test/connection/subscriptions_test.rb index f91597f567..53e8547245 100644 --- a/actioncable/test/connection/subscriptions_test.rb +++ b/actioncable/test/connection/subscriptions_test.rb @@ -88,7 +88,7 @@ class ActionCable::Connection::SubscriptionsTest < ActionCable::TestCase channel1 = subscribe_to_chat_channel - channel2_id = ActiveSupport::JSON.encode({ id: 2, channel: 'ActionCable::Connection::SubscriptionsTest::ChatChannel' }) + channel2_id = ActiveSupport::JSON.encode(id: 2, channel: 'ActionCable::Connection::SubscriptionsTest::ChatChannel') channel2 = subscribe_to_chat_channel(channel2_id) channel1.expects(:unsubscribe_from_channel) diff --git a/actioncable/test/stubs/test_connection.rb b/actioncable/test/stubs/test_connection.rb index 8ba284fdc6..885450dda6 100644 --- a/actioncable/test/stubs/test_connection.rb +++ b/actioncable/test/stubs/test_connection.rb @@ -3,24 +3,31 @@ require 'stubs/user' class TestConnection attr_reader :identifiers, :logger, :current_user, :server, :transmissions - def initialize(user = User.new("lifo")) + delegate :pubsub, to: :server + + def initialize(user = User.new("lifo"), coder: ActiveSupport::JSON, subscription_adapter: SuccessAdapter) + @coder = coder @identifiers = [ :current_user ] @current_user = user @logger = ActiveSupport::TaggedLogging.new ActiveSupport::Logger.new(StringIO.new) - @server = TestServer.new + @server = TestServer.new(subscription_adapter: subscription_adapter) @transmissions = [] end - def pubsub - SuccessAdapter.new(server) + def transmit(cable_message) + @transmissions << encode(cable_message) end - def transmit(data) - @transmissions << data + def last_transmission + decode @transmissions.last if @transmissions.any? end - def last_transmission - @transmissions.last + def decode(websocket_message) + @coder.decode websocket_message + end + + def encode(cable_message) + @coder.encode cable_message end end diff --git a/actioncable/test/stubs/test_server.rb b/actioncable/test/stubs/test_server.rb index 9e860825f3..b86f422a13 100644 --- a/actioncable/test/stubs/test_server.rb +++ b/actioncable/test/stubs/test_server.rb @@ -2,22 +2,26 @@ require 'ostruct' class TestServer include ActionCable::Server::Connections + include ActionCable::Server::Broadcasting - attr_reader :logger, :config + attr_reader :logger, :config, :mutex - def initialize + def initialize(subscription_adapter: SuccessAdapter) @logger = ActiveSupport::TaggedLogging.new ActiveSupport::Logger.new(StringIO.new) - @config = OpenStruct.new(log_tags: [], subscription_adapter: SuccessAdapter) + + @config = OpenStruct.new(log_tags: [], subscription_adapter: subscription_adapter) @config.use_faye = ENV['FAYE'].present? @config.client_socket_class = if @config.use_faye ActionCable::Connection::FayeClientSocket else ActionCable::Connection::ClientSocket end + + @mutex = Monitor.new end def pubsub - @config.subscription_adapter.new(self) + @pubsub ||= @config.subscription_adapter.new(self) end def event_loop diff --git a/actioncable/test/subscription_adapter/common.rb b/actioncable/test/subscription_adapter/common.rb index 82f0abbbf3..285c690df0 100644 --- a/actioncable/test/subscription_adapter/common.rb +++ b/actioncable/test/subscription_adapter/common.rb @@ -20,8 +20,7 @@ module CommonSubscriptionAdapterTest end def teardown - @tx_adapter.shutdown if @tx_adapter && @tx_adapter != @rx_adapter - @rx_adapter.shutdown if @rx_adapter + [@rx_adapter, @tx_adapter].uniq.each(&:shutdown) end diff --git a/actioncable/test/subscription_adapter/evented_redis_test.rb b/actioncable/test/subscription_adapter/evented_redis_test.rb index 70333e51bd..6d20e6ed78 100644 --- a/actioncable/test/subscription_adapter/evented_redis_test.rb +++ b/actioncable/test/subscription_adapter/evented_redis_test.rb @@ -4,6 +4,17 @@ require_relative './common' class EventedRedisAdapterTest < ActionCable::TestCase include CommonSubscriptionAdapterTest + def setup + super + + # em-hiredis is warning-rich + @previous_verbose, $VERBOSE = $VERBOSE, nil + end + + def teardown + $VERBOSE = @previous_verbose + end + def cable_config { adapter: 'evented_redis', url: 'redis://127.0.0.1:6379/12' } end diff --git a/actioncable/test/subscription_adapter/postgresql_test.rb b/actioncable/test/subscription_adapter/postgresql_test.rb index 3b4fadbb2a..214352a0b2 100644 --- a/actioncable/test/subscription_adapter/postgresql_test.rb +++ b/actioncable/test/subscription_adapter/postgresql_test.rb @@ -21,6 +21,7 @@ class PostgresqlAdapterTest < ActionCable::TestCase begin ActiveRecord::Base.connection rescue + @rx_adapter = @tx_adapter = nil skip "Couldn't connect to PostgreSQL: #{database_config.inspect}" end diff --git a/actioncable/test/test_helper.rb b/actioncable/test/test_helper.rb index 030362d512..0a9ee7ce77 100644 --- a/actioncable/test/test_helper.rb +++ b/actioncable/test/test_helper.rb @@ -2,11 +2,13 @@ require 'action_cable' require 'active_support/testing/autorun' require 'puma' - require 'mocha/setup' - require 'rack/mock' -require 'active_support/core_ext/hash/indifferent_access' + +begin + require 'byebug' +rescue LoadError +end # Require all the stubs and models Dir[File.dirname(__FILE__) + '/stubs/*.rb'].each {|file| require file } @@ -47,10 +49,7 @@ end module ConcurrentRubyConcurrencyHelpers def wait_for_async - e = Concurrent.global_io_executor - until e.completed_task_count == e.scheduled_task_count - sleep 0.1 - end + wait_for_executor Concurrent.global_io_executor end def run_in_eventmachine @@ -65,4 +64,10 @@ class ActionCable::TestCase < ActiveSupport::TestCase else include ConcurrentRubyConcurrencyHelpers end + + def wait_for_executor(executor) + until executor.completed_task_count == executor.scheduled_task_count + sleep 0.1 + end + end end diff --git a/actioncable/test/worker_test.rb b/actioncable/test/worker_test.rb index 654f49821e..e2c81fe312 100644 --- a/actioncable/test/worker_test.rb +++ b/actioncable/test/worker_test.rb @@ -33,22 +33,12 @@ class WorkerTest < ActiveSupport::TestCase end test "invoke" do - @worker.invoke @receiver, :run + @worker.invoke @receiver, :run, connection: @receiver.connection assert_equal :run, @receiver.last_action end test "invoke with arguments" do - @worker.invoke @receiver, :process, "Hello" + @worker.invoke @receiver, :process, "Hello", connection: @receiver.connection assert_equal [ :process, "Hello" ], @receiver.last_action end - - test "running periodic timers with a proc" do - @worker.run_periodic_timer @receiver, @receiver.method(:run) - assert_equal :run, @receiver.last_action - end - - test "running periodic timers with a method" do - @worker.run_periodic_timer @receiver, :run - assert_equal :run, @receiver.last_action - end end |