diff options
Diffstat (limited to 'actioncable')
66 files changed, 774 insertions, 684 deletions
diff --git a/actioncable/CHANGELOG.md b/actioncable/CHANGELOG.md index cc15e9bf61..d70d32ce07 100644 --- a/actioncable/CHANGELOG.md +++ b/actioncable/CHANGELOG.md @@ -1,9 +1,29 @@ +* Permit same-origin connections by default. + + New option `config.action_cable.allow_same_origin_as_host = false` + to disable. + + *Dávid Halász*, *Matthew Draper* + +* Prevent race where the client could receive and act upon a + subscription confirmation before the channel's `subscribed` method + completed. + + Fixes #25381. + + *Vladimir Dementyev* + +* Buffer writes to websocket connections, to avoid blocking threads + that could be doing more useful things. + + *Matthew Draper*, *Tinco Andringa* + * Protect against concurrent writes to a websocket connection from multiple threads; the underlying OS write is not always threadsafe. *Tinco Andringa* -* Add ActiveSupport::Notifications hook to Broadcaster#broadcast +* Add ActiveSupport::Notifications hook to Broadcaster#broadcast. *Matthew Wear* diff --git a/actioncable/README.md b/actioncable/README.md index 28e2602cbf..8ad9aeb1f1 100644 --- a/actioncable/README.md +++ b/actioncable/README.md @@ -167,7 +167,7 @@ App.cable.subscriptions.create "AppearanceChannel", buttonSelector = "[data-behavior~=appear_away]" install: -> - $(document).on "page:change.appearance", => + $(document).on "turbolinks:load.appearance", => @appear() $(document).on "click.appearance", buttonSelector, => @@ -326,7 +326,10 @@ Rails.application.paths.add "config/cable", with: "somewhere/else/cable.yml" ### Allowed Request Origins -Action Cable will only accept requests from specified origins, which are passed to the server config as an array. The origins can be instances of strings or regular expressions, against which a check for match will be performed. +Action Cable will only accept requests from specific origins. + +By default, only an origin matching the cable server itself will be permitted. +Additional origins can be specified using strings or regular expressions, provided in an array. ```ruby Rails.application.config.action_cable.allowed_request_origins = ['http://rubyonrails.com', /http:\/\/ruby.*/] @@ -334,12 +337,19 @@ Rails.application.config.action_cable.allowed_request_origins = ['http://rubyonr When running in the development environment, this defaults to "http://localhost:3000". -To disable and allow requests from any origin: +To disable protection and allow requests from any origin: ```ruby Rails.application.config.action_cable.disable_request_forgery_protection = true ``` +To disable automatic access for same-origin requests, and strictly allow +only the configured origins: + +```ruby +Rails.application.config.action_cable.allow_same_origin_as_host = false +``` + ### Consumer Configuration Once you have decided how to run your cable server (see below), you must provide the server URL (or path) to your client-side setup. diff --git a/actioncable/Rakefile b/actioncable/Rakefile index a72142deb5..87d443919c 100644 --- a/actioncable/Rakefile +++ b/actioncable/Rakefile @@ -1,13 +1,12 @@ -require 'rake/testtask' -require 'pathname' -require 'action_cable' -require 'blade' +require "rake/testtask" +require "pathname" +require "action_cable" dir = File.dirname(__FILE__) -task :default => :test +task default: :test -task :package => "assets:compile" +task package: "assets:compile" Rake::TestTask.new do |t| t.libs << "test" @@ -20,12 +19,13 @@ end namespace :test do task :isolated do Dir.glob("test/**/*_test.rb").all? do |file| - sh(Gem.ruby, '-w', '-Ilib:test', file) - end or raise "Failures" + sh(Gem.ruby, "-w", "-Ilib:test", file) + end || raise("Failures") end task :integration do - if ENV['CI'] + require "blade" + if ENV["CI"] Blade.start(interface: :ci) else Blade.start(interface: :runner) @@ -36,6 +36,7 @@ end namespace :assets do desc "Compile Action Cable assets" task :compile do + require "blade" Blade.build end end diff --git a/actioncable/actioncable.gemspec b/actioncable/actioncable.gemspec index c65ff7871f..e7f91d0e34 100644 --- a/actioncable/actioncable.gemspec +++ b/actioncable/actioncable.gemspec @@ -1,25 +1,25 @@ -version = File.read(File.expand_path('../../RAILS_VERSION', __FILE__)).strip +version = File.read(File.expand_path("../../RAILS_VERSION", __FILE__)).strip Gem::Specification.new do |s| s.platform = Gem::Platform::RUBY - s.name = 'actioncable' + s.name = "actioncable" s.version = version - s.summary = 'WebSocket framework for Rails.' - s.description = 'Structure many real-time application concerns into channels over a single WebSocket connection.' + s.summary = "WebSocket framework for Rails." + s.description = "Structure many real-time application concerns into channels over a single WebSocket connection." - s.required_ruby_version = '>= 2.2.2' + s.required_ruby_version = ">= 2.2.2" - s.license = 'MIT' + s.license = "MIT" - s.author = ['Pratik Naik', 'David Heinemeier Hansson'] - s.email = ['pratiknaik@gmail.com', 'david@loudthinking.com'] - s.homepage = 'http://rubyonrails.org' + s.author = ["Pratik Naik", "David Heinemeier Hansson"] + s.email = ["pratiknaik@gmail.com", "david@loudthinking.com"] + s.homepage = "http://rubyonrails.org" - s.files = Dir['CHANGELOG.md', 'MIT-LICENSE', 'README.md', 'lib/**/*'] - s.require_path = 'lib' + s.files = Dir["CHANGELOG.md", "MIT-LICENSE", "README.md", "lib/**/*"] + s.require_path = "lib" - s.add_dependency 'actionpack', version + s.add_dependency "actionpack", version - s.add_dependency 'nio4r', '~> 1.2' - s.add_dependency 'websocket-driver', '~> 0.6.1' + s.add_dependency "nio4r", "~> 1.2" + s.add_dependency "websocket-driver", "~> 0.6.1" end diff --git a/actioncable/lib/action_cable.rb b/actioncable/lib/action_cable.rb index b6d2842867..d353716636 100644 --- a/actioncable/lib/action_cable.rb +++ b/actioncable/lib/action_cable.rb @@ -21,21 +21,21 @@ # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #++ -require 'active_support' -require 'active_support/rails' -require 'action_cable/version' +require "active_support" +require "active_support/rails" +require "action_cable/version" module ActionCable extend ActiveSupport::Autoload INTERNAL = { message_types: { - welcome: 'welcome'.freeze, - ping: 'ping'.freeze, - confirmation: 'confirm_subscription'.freeze, - rejection: 'reject_subscription'.freeze + welcome: "welcome".freeze, + ping: "ping".freeze, + confirmation: "confirm_subscription".freeze, + rejection: "reject_subscription".freeze }, - default_mount_path: '/cable'.freeze, + default_mount_path: "/cable".freeze, protocols: ["actioncable-v1-json".freeze, "actioncable-unsupported".freeze].freeze } diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb index 845b747fc5..a866044f95 100644 --- a/actioncable/lib/action_cable/channel/base.rb +++ b/actioncable/lib/action_cable/channel/base.rb @@ -1,4 +1,4 @@ -require 'set' +require "set" module ActionCable module Channel @@ -144,13 +144,14 @@ module ActionCable # When a channel is streaming via pubsub, we want to delay the confirmation # transmission until pubsub subscription is confirmed. - @defer_subscription_confirmation = false + # + # The counter starts at 1 because it's awaiting a call to #subscribe_to_channel + @defer_subscription_confirmation_counter = Concurrent::AtomicFixnum.new(1) @reject_subscription = nil @subscription_confirmation_sent = nil delegate_connection_identifiers - subscribe_to_channel end # Extract the action name from the passed data and process it via the channel. The process will ensure @@ -169,6 +170,17 @@ module ActionCable end end + # This method is called after subscription has been added to the connection + # and confirms or rejects the subscription. + def subscribe_to_channel + run_callbacks :subscribe do + subscribed + end + + reject_subscription if subscription_rejected? + ensure_confirmation_sent + end + # Called by the cable connection when it's cut, so the channel has a chance to cleanup with callbacks. # This method is not intended to be called directly by the user. Instead, overwrite the #unsubscribed callback. def unsubscribe_from_channel # :nodoc: @@ -177,7 +189,6 @@ module ActionCable end end - protected # Called once a consumer has become a subscriber of the channel. Usually the place to setup any streams # you want this channel to be sending to the subscriber. @@ -202,12 +213,18 @@ module ActionCable end end + def ensure_confirmation_sent + return if subscription_rejected? + @defer_subscription_confirmation_counter.decrement + transmit_subscription_confirmation unless defer_subscription_confirmation? + end + def defer_subscription_confirmation! - @defer_subscription_confirmation = true + @defer_subscription_confirmation_counter.increment end def defer_subscription_confirmation? - @defer_subscription_confirmation + @defer_subscription_confirmation_counter.value > 0 end def subscription_confirmation_sent? @@ -231,24 +248,12 @@ module ActionCable end end - def subscribe_to_channel - run_callbacks :subscribe do - subscribed - end - - if subscription_rejected? - reject_subscription - else - transmit_subscription_confirmation unless defer_subscription_confirmation? - end - end - def extract_action(data) - (data['action'].presence || :receive).to_sym + (data["action"].presence || :receive).to_sym end def processable_action?(action) - self.class.action_methods.include?(action.to_s) + self.class.action_methods.include?(action.to_s) unless subscription_rejected? end def dispatch_action(action, data) @@ -263,7 +268,7 @@ module ActionCable def action_signature(action, data) "#{self.class.name}##{action}".tap do |signature| - if (arguments = data.except('action')).any? + if (arguments = data.except("action")).any? signature << "(#{arguments.inspect})" end end diff --git a/actioncable/lib/action_cable/channel/broadcasting.rb b/actioncable/lib/action_cable/channel/broadcasting.rb index afc23d7d1a..23ed4ec943 100644 --- a/actioncable/lib/action_cable/channel/broadcasting.rb +++ b/actioncable/lib/action_cable/channel/broadcasting.rb @@ -1,4 +1,4 @@ -require 'active_support/core_ext/object/to_param' +require "active_support/core_ext/object/to_param" module ActionCable module Channel @@ -16,7 +16,7 @@ module ActionCable def broadcasting_for(model) #:nodoc: case when model.is_a?(Array) - model.map { |m| broadcasting_for(m) }.join(':') + model.map { |m| broadcasting_for(m) }.join(":") when model.respond_to?(:to_gid_param) model.to_gid_param else diff --git a/actioncable/lib/action_cable/channel/callbacks.rb b/actioncable/lib/action_cable/channel/callbacks.rb index 295d750e86..c740132c94 100644 --- a/actioncable/lib/action_cable/channel/callbacks.rb +++ b/actioncable/lib/action_cable/channel/callbacks.rb @@ -1,4 +1,4 @@ -require 'active_support/callbacks' +require "active_support/callbacks" module ActionCable module Channel diff --git a/actioncable/lib/action_cable/channel/naming.rb b/actioncable/lib/action_cable/channel/naming.rb index 8e1b2a4af0..b565cb3cac 100644 --- a/actioncable/lib/action_cable/channel/naming.rb +++ b/actioncable/lib/action_cable/channel/naming.rb @@ -12,7 +12,7 @@ module ActionCable # Chats::AppearancesChannel.channel_name # => 'chats:appearances' # FooChats::BarAppearancesChannel.channel_name # => 'foo_chats:bar_appearances' def channel_name - @channel_name ||= name.sub(/Channel$/, '').gsub('::',':').underscore + @channel_name ||= name.sub(/Channel$/, "").gsub("::", ":").underscore end end diff --git a/actioncable/lib/action_cable/channel/periodic_timers.rb b/actioncable/lib/action_cable/channel/periodic_timers.rb index 41511312fc..c9daa0bcd3 100644 --- a/actioncable/lib/action_cable/channel/periodic_timers.rb +++ b/actioncable/lib/action_cable/channel/periodic_timers.rb @@ -30,7 +30,7 @@ module ActionCable def periodically(callback_or_method_name = nil, every:, &block) callback = if block_given? - raise ArgumentError, 'Pass a block or provide a callback arg, not both' if callback_or_method_name + raise ArgumentError, "Pass a block or provide a callback arg, not both" if callback_or_method_name block else case callback_or_method_name diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index 561750d713..dbba333353 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -69,8 +69,8 @@ module ActionCable # Start streaming from the named <tt>broadcasting</tt> pubsub queue. Optionally, you can pass a <tt>callback</tt> that'll be used # instead of the default of just transmitting the updates straight to the subscriber. - # Pass `coder: ActiveSupport::JSON` to decode messages as JSON before passing to the callback. - # Defaults to `coder: nil` which does no decoding, passes raw messages. + # Pass <tt>coder: ActiveSupport::JSON</tt> to decode messages as JSON before passing to the callback. + # Defaults to <tt>coder: nil</tt> which does no decoding, passes raw messages. def stream_from(broadcasting, callback = nil, coder: nil, &block) broadcasting = String(broadcasting) @@ -84,7 +84,7 @@ module ActionCable connection.server.event_loop.post do pubsub.subscribe(broadcasting, handler, lambda do - transmit_subscription_confirmation + ensure_confirmation_sent logger.info "#{self.class.name} is streaming from #{broadcasting}" end) end @@ -94,8 +94,8 @@ module ActionCable # <tt>callback</tt> that'll be used instead of the default of just transmitting the updates straight # to the subscriber. # - # Pass `coder: ActiveSupport::JSON` to decode messages as JSON before passing to the callback. - # Defaults to `coder: nil` which does no decoding, passes raw messages. + # Pass <tt>coder: ActiveSupport::JSON</tt> to decode messages as JSON before passing to the callback. + # Defaults to <tt>coder: nil</tt> which does no decoding, passes raw messages. def stream_for(model, callback = nil, coder: nil, &block) stream_from(broadcasting_for([ channel_name, model ]), callback || block, coder: coder) end diff --git a/actioncable/lib/action_cable/connection.rb b/actioncable/lib/action_cable/connection.rb index 5f813cf8e0..902efb07e2 100644 --- a/actioncable/lib/action_cable/connection.rb +++ b/actioncable/lib/action_cable/connection.rb @@ -8,8 +8,6 @@ module ActionCable autoload :ClientSocket autoload :Identification autoload :InternalChannel - autoload :FayeClientSocket - autoload :FayeEventLoop autoload :MessageBuffer autoload :Stream autoload :StreamEventLoop diff --git a/actioncable/lib/action_cable/connection/authorization.rb b/actioncable/lib/action_cable/connection/authorization.rb index 070a70e4e2..85df206445 100644 --- a/actioncable/lib/action_cable/connection/authorization.rb +++ b/actioncable/lib/action_cable/connection/authorization.rb @@ -10,4 +10,4 @@ module ActionCable end end end -end
\ No newline at end of file +end diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb index 75c1299e36..dfee123ea2 100644 --- a/actioncable/lib/action_cable/connection/base.rb +++ b/actioncable/lib/action_cable/connection/base.rb @@ -1,4 +1,4 @@ -require 'action_dispatch' +require "action_dispatch" module ActionCable module Connection @@ -57,7 +57,7 @@ module ActionCable @worker_pool = server.worker_pool @logger = new_tagged_logger - @websocket = ActionCable::Connection::WebSocket.new(env, self, event_loop, server.config.client_socket_class) + @websocket = ActionCable::Connection::WebSocket.new(env, self, event_loop) @subscriptions = ActionCable::Connection::Subscriptions.new(self) @message_buffer = ActionCable::Connection::MessageBuffer.new(self) @@ -105,14 +105,14 @@ module ActionCable worker_pool.async_invoke(self, method, *arguments) end - # Return a basic hash of statistics for the connection keyed with `identifier`, `started_at`, and `subscriptions`. + # Return a basic hash of statistics for the connection keyed with <tt>identifier</tt>, <tt>started_at</tt>, <tt>subscriptions</tt>, and <tt>request_id</tt>. # This can be returned by a health check against the connection. def statistics { identifier: connection_identifier, started_at: @started_at, subscriptions: subscriptions.identifiers, - request_id: @env['action_dispatch.request_id'] + request_id: @env["action_dispatch.request_id"] } end @@ -195,7 +195,10 @@ module ActionCable def allow_request_origin? return true if server.config.disable_request_forgery_protection - if Array(server.config.allowed_request_origins).any? { |allowed_origin| allowed_origin === env['HTTP_ORIGIN'] } + proto = Rack::Request.new(env).ssl? ? "https" : "http" + if server.config.allow_same_origin_as_host && env["HTTP_ORIGIN"] == "#{proto}://#{env['HTTP_HOST']}" + true + elsif Array(server.config.allowed_request_origins).any? { |allowed_origin| allowed_origin === env["HTTP_ORIGIN"] } true else logger.error("Request origin not allowed: #{env['HTTP_ORIGIN']}") @@ -213,7 +216,7 @@ module ActionCable logger.error invalid_request_message logger.info finished_request_message - [ 404, { 'Content-Type' => 'text/plain' }, [ 'Page not found' ] ] + [ 404, { "Content-Type" => "text/plain" }, [ "Page not found" ] ] end # Tags are declared in the server but computed in the connection. This allows us per-connection tailored tags. @@ -226,7 +229,7 @@ module ActionCable 'Started %s "%s"%s for %s at %s' % [ request.request_method, request.filtered_path, - websocket.possible? ? ' [WebSocket]' : '[non-WebSocket]', + websocket.possible? ? " [WebSocket]" : "[non-WebSocket]", request.ip, Time.now.to_s ] end @@ -234,19 +237,19 @@ module ActionCable def finished_request_message 'Finished "%s"%s for %s at %s' % [ request.filtered_path, - websocket.possible? ? ' [WebSocket]' : '[non-WebSocket]', + websocket.possible? ? " [WebSocket]" : "[non-WebSocket]", request.ip, Time.now.to_s ] end def invalid_request_message - 'Failed to upgrade to WebSocket (REQUEST_METHOD: %s, HTTP_CONNECTION: %s, HTTP_UPGRADE: %s)' % [ + "Failed to upgrade to WebSocket (REQUEST_METHOD: %s, HTTP_CONNECTION: %s, HTTP_UPGRADE: %s)" % [ env["REQUEST_METHOD"], env["HTTP_CONNECTION"], env["HTTP_UPGRADE"] ] end def successful_request_message - 'Successfully upgraded to WebSocket (REQUEST_METHOD: %s, HTTP_CONNECTION: %s, HTTP_UPGRADE: %s)' % [ + "Successfully upgraded to WebSocket (REQUEST_METHOD: %s, HTTP_CONNECTION: %s, HTTP_UPGRADE: %s)" % [ env["REQUEST_METHOD"], env["HTTP_CONNECTION"], env["HTTP_UPGRADE"] ] end diff --git a/actioncable/lib/action_cable/connection/client_socket.rb b/actioncable/lib/action_cable/connection/client_socket.rb index 6f29f32ea9..70a2bbecb1 100644 --- a/actioncable/lib/action_cable/connection/client_socket.rb +++ b/actioncable/lib/action_cable/connection/client_socket.rb @@ -1,4 +1,4 @@ -require 'websocket/driver' +require "websocket/driver" module ActionCable module Connection @@ -8,16 +8,16 @@ module ActionCable # Copyright (c) 2010-2015 James Coglan class ClientSocket # :nodoc: def self.determine_url(env) - scheme = secure_request?(env) ? 'wss:' : 'ws:' + scheme = secure_request?(env) ? "wss:" : "ws:" "#{ scheme }//#{ env['HTTP_HOST'] }#{ env['REQUEST_URI'] }" end def self.secure_request?(env) - return true if env['HTTPS'] == 'on' - return true if env['HTTP_X_FORWARDED_SSL'] == 'on' - return true if env['HTTP_X_FORWARDED_SCHEME'] == 'https' - return true if env['HTTP_X_FORWARDED_PROTO'] == 'https' - return true if env['rack.url_scheme'] == 'https' + return true if env["HTTPS"] == "on" + return true if env["HTTP_X_FORWARDED_SSL"] == "on" + return true if env["HTTP_X_FORWARDED_SCHEME"] == "https" + return true if env["HTTP_X_FORWARDED_PROTO"] == "https" + return true if env["rack.url_scheme"] == "https" return false end @@ -37,7 +37,7 @@ module ActionCable @url = ClientSocket.determine_url(@env) @driver = @driver_started = nil - @close_params = ['', 1006] + @close_params = ["", 1006] @ready_state = CONNECTING @@ -56,7 +56,7 @@ module ActionCable return if @driver.nil? || @driver_started @stream.hijack_rack_socket - if callback = @env['async.callback'] + if callback = @env["async.callback"] callback.call([101, {}, @stream]) end @@ -78,18 +78,18 @@ module ActionCable def transmit(message) return false if @ready_state > OPEN case message - when Numeric then @driver.text(message.to_s) - when String then @driver.text(message) - when Array then @driver.binary(message) + when Numeric then @driver.text(message.to_s) + when String then @driver.text(message) + when Array then @driver.binary(message) else false end end def close(code = nil, reason = nil) code ||= 1000 - reason ||= '' + reason ||= "" - unless code == 1000 or (code >= 3000 and code <= 4999) + unless code == 1000 || (code >= 3000 && code <= 4999) raise ArgumentError, "Failed to execute 'close' on WebSocket: " + "The code must be either 1000, or between 3000 and 4999. " + "#{code} is neither." diff --git a/actioncable/lib/action_cable/connection/faye_client_socket.rb b/actioncable/lib/action_cable/connection/faye_client_socket.rb deleted file mode 100644 index a4bfe7db17..0000000000 --- a/actioncable/lib/action_cable/connection/faye_client_socket.rb +++ /dev/null @@ -1,48 +0,0 @@ -require 'faye/websocket' - -module ActionCable - module Connection - class FayeClientSocket - def initialize(env, event_target, stream_event_loop, protocols) - @env = env - @event_target = event_target - @protocols = protocols - - @faye = nil - end - - def alive? - @faye && @faye.ready_state == Faye::WebSocket::API::OPEN - end - - def transmit(data) - connect - @faye.send data - end - - def close - @faye && @faye.close - end - - def protocol - @faye && @faye.protocol - end - - def rack_response - connect - @faye.rack_response - end - - private - def connect - return if @faye - @faye = Faye::WebSocket.new(@env, @protocols) - - @faye.on(:open) { |event| @event_target.on_open } - @faye.on(:message) { |event| @event_target.on_message(event.data) } - @faye.on(:close) { |event| @event_target.on_close(event.reason, event.code) } - @faye.on(:error) { |event| @event_target.on_error(event.message) } - end - end - end -end diff --git a/actioncable/lib/action_cable/connection/faye_event_loop.rb b/actioncable/lib/action_cable/connection/faye_event_loop.rb deleted file mode 100644 index 9c44b38bc3..0000000000 --- a/actioncable/lib/action_cable/connection/faye_event_loop.rb +++ /dev/null @@ -1,44 +0,0 @@ -require 'thread' - -require 'eventmachine' -EventMachine.epoll if EventMachine.epoll? -EventMachine.kqueue if EventMachine.kqueue? - -module ActionCable - module Connection - class FayeEventLoop - @@mutex = Mutex.new - - def timer(interval, &block) - ensure_reactor_running - EMTimer.new(::EM::PeriodicTimer.new(interval, &block)) - end - - def post(task = nil, &block) - task ||= block - - ensure_reactor_running - ::EM.next_tick(&task) - end - - private - def ensure_reactor_running - return if EventMachine.reactor_running? - @@mutex.synchronize do - Thread.new { EventMachine.run } unless EventMachine.reactor_running? - Thread.pass until EventMachine.reactor_running? - end - end - - class EMTimer - def initialize(inner) - @inner = inner - end - - def shutdown - @inner.cancel - end - end - end - end -end diff --git a/actioncable/lib/action_cable/connection/identification.rb b/actioncable/lib/action_cable/connection/identification.rb index 4a54044aff..c91a1d1fd7 100644 --- a/actioncable/lib/action_cable/connection/identification.rb +++ b/actioncable/lib/action_cable/connection/identification.rb @@ -1,4 +1,4 @@ -require 'set' +require "set" module ActionCable module Connection diff --git a/actioncable/lib/action_cable/connection/internal_channel.rb b/actioncable/lib/action_cable/connection/internal_channel.rb index f70d52f99b..8f0ec766c3 100644 --- a/actioncable/lib/action_cable/connection/internal_channel.rb +++ b/actioncable/lib/action_cable/connection/internal_channel.rb @@ -27,8 +27,8 @@ module ActionCable end def process_internal_message(message) - case message['type'] - when 'disconnect' + case message["type"] + when "disconnect" logger.info "Removing connection (#{connection_identifier})" websocket.close end diff --git a/actioncable/lib/action_cable/connection/stream.rb b/actioncable/lib/action_cable/connection/stream.rb index 5695623859..e620b93845 100644 --- a/actioncable/lib/action_cable/connection/stream.rb +++ b/actioncable/lib/action_cable/connection/stream.rb @@ -1,4 +1,4 @@ -require 'thread' +require "thread" module ActionCable module Connection @@ -10,10 +10,13 @@ module ActionCable def initialize(event_loop, socket) @event_loop = event_loop @socket_object = socket - @stream_send = socket.env['stream.send'] + @stream_send = socket.env["stream.send"] @rack_hijack_io = nil @write_lock = Mutex.new + + @write_head = nil + @write_buffer = Queue.new end def each(&callback) @@ -30,23 +33,71 @@ module ActionCable end def write(data) - @write_lock.synchronize do - return @rack_hijack_io.write(data) if @rack_hijack_io - return @stream_send.call(data) if @stream_send + if @stream_send + return @stream_send.call(data) end + + if @write_lock.try_lock + begin + if @write_head.nil? && @write_buffer.empty? + written = @rack_hijack_io.write_nonblock(data, exception: false) + + case written + when :wait_writable + # proceed below + when data.bytesize + return data.bytesize + else + @write_head = data.byteslice(written, data.bytesize) + @event_loop.writes_pending @rack_hijack_io + + return data.bytesize + end + end + ensure + @write_lock.unlock + end + end + + @write_buffer << data + @event_loop.writes_pending @rack_hijack_io + + data.bytesize rescue EOFError, Errno::ECONNRESET @socket_object.client_gone end + def flush_write_buffer + @write_lock.synchronize do + loop do + if @write_head.nil? + return true if @write_buffer.empty? + @write_head = @write_buffer.pop + end + + written = @rack_hijack_io.write_nonblock(@write_head, exception: false) + case written + when :wait_writable + return false + when @write_head.bytesize + @write_head = nil + else + @write_head = @write_head.byteslice(written, @write_head.bytesize) + return false + end + end + end + end + def receive(data) @socket_object.parse(data) end def hijack_rack_socket - return unless @socket_object.env['rack.hijack'] + return unless @socket_object.env["rack.hijack"] - @socket_object.env['rack.hijack'].call - @rack_hijack_io = @socket_object.env['rack.hijack_io'] + @socket_object.env["rack.hijack"].call + @rack_hijack_io = @socket_object.env["rack.hijack_io"] @event_loop.attach(@rack_hijack_io, self) end @@ -55,7 +106,6 @@ module ActionCable def clean_rack_hijack return unless @rack_hijack_io @event_loop.detach(@rack_hijack_io, self) - @rack_hijack_io.close @rack_hijack_io = nil end end diff --git a/actioncable/lib/action_cable/connection/stream_event_loop.rb b/actioncable/lib/action_cable/connection/stream_event_loop.rb index 2abad09c03..2d1af0ff9f 100644 --- a/actioncable/lib/action_cable/connection/stream_event_loop.rb +++ b/actioncable/lib/action_cable/connection/stream_event_loop.rb @@ -1,11 +1,11 @@ -require 'nio' -require 'thread' +require "nio" +require "thread" module ActionCable module Connection class StreamEventLoop def initialize - @nio = @thread = nil + @nio = @executor = @thread = nil @map = {} @stopping = false @todo = Queue.new @@ -20,13 +20,14 @@ module ActionCable def post(task = nil, &block) task ||= block - Concurrent.global_io_executor << task + spawn + @executor << task end def attach(io, stream) @todo << lambda do - @map[io] = stream - @nio.register(io, :r) + @map[io] = @nio.register(io, :r) + @map[io].value = stream end wakeup end @@ -35,6 +36,16 @@ module ActionCable @todo << lambda do @nio.deregister io @map.delete io + io.close + end + wakeup + end + + def writes_pending(io) + @todo << lambda do + if monitor = @map[io] + monitor.interests = :rw + end end wakeup end @@ -52,6 +63,13 @@ module ActionCable return if @thread && @thread.status @nio ||= NIO::Selector.new + + @executor ||= Concurrent::ThreadPoolExecutor.new( + min_threads: 1, + max_threads: 10, + max_queue: 0, + ) + @thread = Thread.new { run } return true @@ -77,12 +95,25 @@ module ActionCable monitors.each do |monitor| io = monitor.io - stream = @map[io] + stream = monitor.value begin - stream.receive io.read_nonblock(4096) - rescue IO::WaitReadable - next + if monitor.writable? + if stream.flush_write_buffer + monitor.interests = :r + end + next unless monitor.readable? + end + + incoming = io.read_nonblock(4096, exception: false) + case incoming + when :wait_readable + next + when nil + stream.close + else + stream.receive incoming + end rescue # We expect one of EOFError or Errno::ECONNRESET in # normal operation (when the client goes away). But if diff --git a/actioncable/lib/action_cable/connection/subscriptions.rb b/actioncable/lib/action_cable/connection/subscriptions.rb index 6051818bfb..00511aead5 100644 --- a/actioncable/lib/action_cable/connection/subscriptions.rb +++ b/actioncable/lib/action_cable/connection/subscriptions.rb @@ -1,4 +1,4 @@ -require 'active_support/core_ext/hash/indifferent_access' +require "active_support/core_ext/hash/indifferent_access" module ActionCable module Connection @@ -11,10 +11,10 @@ module ActionCable end def execute_command(data) - case data['command'] - when 'subscribe' then add data - when 'unsubscribe' then remove data - when 'message' then perform_action data + case data["command"] + when "subscribe" then add data + when "unsubscribe" then remove data + when "message" then perform_action data else logger.error "Received unrecognized command in #{data.inspect}" end @@ -23,13 +23,17 @@ module ActionCable end def add(data) - id_key = data['identifier'] + id_key = data["identifier"] id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access + return if subscriptions.key?(id_key) + subscription_klass = id_options[:channel].safe_constantize if subscription_klass && ActionCable::Channel::Base >= subscription_klass - subscriptions[id_key] ||= subscription_klass.new(connection, id_key, id_options) + subscription = subscription_klass.new(connection, id_key, id_options) + subscriptions[id_key] = subscription + subscription.subscribe_to_channel else logger.error "Subscription class not found: #{id_options[:channel].inspect}" end @@ -37,7 +41,7 @@ module ActionCable def remove(data) logger.info "Unsubscribing from channel: #{data['identifier']}" - remove_subscription subscriptions[data['identifier']] + remove_subscription subscriptions[data["identifier"]] end def remove_subscription(subscription) @@ -46,7 +50,7 @@ module ActionCable end def perform_action(data) - find(data).perform_action ActiveSupport::JSON.decode(data['data']) + find(data).perform_action ActiveSupport::JSON.decode(data["data"]) end def identifiers @@ -64,7 +68,7 @@ module ActionCable delegate :logger, to: :connection def find(data) - if subscription = subscriptions[data['identifier']] + if subscription = subscriptions[data["identifier"]] subscription else raise "Unable to find subscription with identifier: #{data['identifier']}" diff --git a/actioncable/lib/action_cable/connection/web_socket.rb b/actioncable/lib/action_cable/connection/web_socket.rb index 11f28c37e8..382141b89f 100644 --- a/actioncable/lib/action_cable/connection/web_socket.rb +++ b/actioncable/lib/action_cable/connection/web_socket.rb @@ -1,11 +1,11 @@ -require 'websocket/driver' +require "websocket/driver" module ActionCable module Connection # Wrap the real socket to minimize the externally-presented API class WebSocket - def initialize(env, event_target, event_loop, client_socket_class, protocols: ActionCable::INTERNAL[:protocols]) - @websocket = ::WebSocket::Driver.websocket?(env) ? client_socket_class.new(env, event_target, event_loop, protocols) : nil + def initialize(env, event_target, event_loop, protocols: ActionCable::INTERNAL[:protocols]) + @websocket = ::WebSocket::Driver.websocket?(env) ? ClientSocket.new(env, event_target, event_loop, protocols) : nil end def possible? diff --git a/actioncable/lib/action_cable/engine.rb b/actioncable/lib/action_cable/engine.rb index 34f9952c71..e23527b84e 100644 --- a/actioncable/lib/action_cable/engine.rb +++ b/actioncable/lib/action_cable/engine.rb @@ -32,9 +32,9 @@ module ActionCable end previous_connection_class = self.connection_class - self.connection_class = -> { 'ApplicationCable::Connection'.safe_constantize || previous_connection_class.call } + self.connection_class = -> { "ApplicationCable::Connection".safe_constantize || previous_connection_class.call } - options.each { |k,v| send("#{k}=", v) } + options.each { |k, v| send("#{k}=", v) } end end diff --git a/actioncable/lib/action_cable/remote_connections.rb b/actioncable/lib/action_cable/remote_connections.rb index a528024427..d2856bc6ae 100644 --- a/actioncable/lib/action_cable/remote_connections.rb +++ b/actioncable/lib/action_cable/remote_connections.rb @@ -41,7 +41,7 @@ module ActionCable # Uses the internal channel to disconnect the connection. def disconnect - server.broadcast internal_channel, type: 'disconnect' + server.broadcast internal_channel, type: "disconnect" end # Returns all the identifiers that were applied to this connection. @@ -54,7 +54,7 @@ module ActionCable def set_identifier_instance_vars(ids) raise InvalidIdentifiersError unless valid_identifiers?(ids) - ids.each { |k,v| instance_variable_set("@#{k}", v) } + ids.each { |k, v| instance_variable_set("@#{k}", v) } end def valid_identifiers?(ids) diff --git a/actioncable/lib/action_cable/server.rb b/actioncable/lib/action_cable/server.rb index bd6a3826a3..22f9353825 100644 --- a/actioncable/lib/action_cable/server.rb +++ b/actioncable/lib/action_cable/server.rb @@ -9,7 +9,7 @@ module ActionCable autoload :Configuration autoload :Worker - autoload :ActiveRecordConnectionManagement, 'action_cable/server/worker/active_record_connection_management' + autoload :ActiveRecordConnectionManagement, "action_cable/server/worker/active_record_connection_management" end end end diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb index 0ad1e408a9..419eccd73c 100644 --- a/actioncable/lib/action_cable/server/base.rb +++ b/actioncable/lib/action_cable/server/base.rb @@ -1,4 +1,4 @@ -require 'monitor' +require "monitor" module ActionCable module Server @@ -37,9 +37,13 @@ module ActionCable connections.each(&:close) @mutex.synchronize do - worker_pool.halt if @worker_pool - + # Shutdown the worker pool + @worker_pool.halt if @worker_pool @worker_pool = nil + + # Shutdown the pub/sub adapter + @pubsub.shutdown if @pubsub + @pubsub = nil end end @@ -49,12 +53,12 @@ module ActionCable end def event_loop - @event_loop || @mutex.synchronize { @event_loop ||= config.event_loop_class.new } + @event_loop || @mutex.synchronize { @event_loop ||= ActionCable::Connection::StreamEventLoop.new } end # The worker pool is where we run connection callbacks and channel actions. We do as little as possible on the server's main thread. # The worker pool is an executor service that's backed by a pool of threads working from a task queue. The thread pool size maxes out - # at 4 worker threads by default. Tune the size yourself with `config.action_cable.worker_pool_size`. + # at 4 worker threads by default. Tune the size yourself with <tt>config.action_cable.worker_pool_size</tt>. # # Using Active Record, Redis, etc within your channel actions means you'll get a separate connection from each thread in the worker pool. # Plan your deployment accordingly: 5 servers each running 5 Puma workers each running an 8-thread worker pool means at least 200 database diff --git a/actioncable/lib/action_cable/server/configuration.rb b/actioncable/lib/action_cable/server/configuration.rb index ada1ac22cc..17e0dee064 100644 --- a/actioncable/lib/action_cable/server/configuration.rb +++ b/actioncable/lib/action_cable/server/configuration.rb @@ -4,8 +4,8 @@ module ActionCable # in a Rails config initializer. class Configuration attr_accessor :logger, :log_tags - attr_accessor :use_faye, :connection_class, :worker_pool_size - attr_accessor :disable_request_forgery_protection, :allowed_request_origins + attr_accessor :connection_class, :worker_pool_size + attr_accessor :disable_request_forgery_protection, :allowed_request_origins, :allow_same_origin_as_host attr_accessor :cable, :url, :mount_path def initialize @@ -15,13 +15,14 @@ module ActionCable @worker_pool_size = 4 @disable_request_forgery_protection = false + @allow_same_origin_as_host = true end # Returns constant of subscription adapter specified in config/cable.yml. # If the adapter cannot be found, this will default to the Redis adapter. # Also makes sure proper dependencies are required. def pubsub_adapter - adapter = (cable.fetch('adapter') { 'redis' }) + adapter = (cable.fetch("adapter") { "redis" }) path_to_adapter = "action_cable/subscription_adapter/#{adapter}" begin require path_to_adapter @@ -32,25 +33,9 @@ module ActionCable end adapter = adapter.camelize - adapter = 'PostgreSQL' if adapter == 'Postgresql' + adapter = "PostgreSQL" if adapter == "Postgresql" "ActionCable::SubscriptionAdapter::#{adapter}".constantize end - - def event_loop_class - if use_faye - ActionCable::Connection::FayeEventLoop - else - ActionCable::Connection::StreamEventLoop - end - end - - def client_socket_class - if use_faye - ActionCable::Connection::FayeClientSocket - else - ActionCable::Connection::ClientSocket - end - end end end end diff --git a/actioncable/lib/action_cable/server/worker.rb b/actioncable/lib/action_cable/server/worker.rb index f3a4fc5a5b..43639c27af 100644 --- a/actioncable/lib/action_cable/server/worker.rb +++ b/actioncable/lib/action_cable/server/worker.rb @@ -1,6 +1,6 @@ -require 'active_support/callbacks' -require 'active_support/core_ext/module/attribute_accessors_per_thread' -require 'concurrent' +require "active_support/callbacks" +require "active_support/core_ext/module/attribute_accessors_per_thread" +require "concurrent" module ActionCable module Server @@ -25,7 +25,7 @@ module ActionCable # Stop processing work: any work that has not already started # running will be discarded from the queue def halt - @executor.kill + @executor.shutdown end def stopping? diff --git a/actioncable/lib/action_cable/subscription_adapter/async.rb b/actioncable/lib/action_cable/subscription_adapter/async.rb index 10b3ac8cd8..46819dbfec 100644 --- a/actioncable/lib/action_cable/subscription_adapter/async.rb +++ b/actioncable/lib/action_cable/subscription_adapter/async.rb @@ -1,4 +1,4 @@ -require 'action_cable/subscription_adapter/inline' +require "action_cable/subscription_adapter/inline" module ActionCable module SubscriptionAdapter diff --git a/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb index 4735a4bfa8..bcd46d2a0e 100644 --- a/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb @@ -1,9 +1,9 @@ -require 'thread' +require "thread" -gem 'em-hiredis', '~> 0.3.0' -gem 'redis', '~> 3.0' -require 'em-hiredis' -require 'redis' +gem "em-hiredis", "~> 0.3.0" +gem "redis", "~> 3.0" +require "em-hiredis" +require "redis" EventMachine.epoll if EventMachine.epoll? EventMachine.kqueue if EventMachine.kqueue? diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb index 66c7852f6e..bdab5205ec 100644 --- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb +++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb @@ -1,6 +1,6 @@ -gem 'pg', '~> 0.18' -require 'pg' -require 'thread' +gem "pg", "~> 0.18" +require "pg" +require "thread" module ActionCable module SubscriptionAdapter @@ -33,7 +33,7 @@ module ActionCable pg_conn = ar_conn.raw_connection unless pg_conn.is_a?(PG::Connection) - raise 'ActiveRecord database must be Postgres in order to use the Postgres ActionCable storage adapter' + raise "The Active Record database must be PostgreSQL in order to use the PostgreSQL Action Cable storage adapter" end yield pg_conn diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb index 65434f7107..62bd284a6b 100644 --- a/actioncable/lib/action_cable/subscription_adapter/redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb @@ -1,7 +1,7 @@ -require 'thread' +require "thread" -gem 'redis', '~> 3.0' -require 'redis' +gem "redis", "~> 3.0" +require "redis" module ActionCable module SubscriptionAdapter @@ -72,7 +72,7 @@ module ActionCable conn.without_reconnect do original_client = conn.client - conn.subscribe('_action_cable_internal') do |on| + conn.subscribe("_action_cable_internal") do |on| on.subscribe do |chan, count| @subscription_lock.synchronize do if count == 1 @@ -111,7 +111,7 @@ module ActionCable return if @thread.nil? when_connected do - send_command('unsubscribe') + send_command("unsubscribe") @raw_client = nil end end @@ -123,13 +123,13 @@ module ActionCable @subscription_lock.synchronize do ensure_listener_running @subscribe_callbacks[channel] << on_success - when_connected { send_command('subscribe', channel) } + when_connected { send_command("subscribe", channel) } end end def remove_channel(channel) @subscription_lock.synchronize do - when_connected { send_command('unsubscribe', channel) } + when_connected { send_command("unsubscribe", channel) } end end diff --git a/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb b/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb index 4ec513e3ba..4cce86dcca 100644 --- a/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb +++ b/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb @@ -2,7 +2,7 @@ module ActionCable module SubscriptionAdapter class SubscriberMap def initialize - @subscribers = Hash.new { |h,k| h[k] = [] } + @subscribers = Hash.new { |h, k| h[k] = [] } @sync = Mutex.new end diff --git a/actioncable/lib/action_cable/version.rb b/actioncable/lib/action_cable/version.rb index e17877202b..d6081409f0 100644 --- a/actioncable/lib/action_cable/version.rb +++ b/actioncable/lib/action_cable/version.rb @@ -1,4 +1,4 @@ -require_relative 'gem_version' +require_relative "gem_version" module ActionCable # Returns the version of the currently loaded Action Cable as a <tt>Gem::Version</tt> diff --git a/actioncable/lib/rails/generators/channel/channel_generator.rb b/actioncable/lib/rails/generators/channel/channel_generator.rb index 47232252e3..20d807c033 100644 --- a/actioncable/lib/rails/generators/channel/channel_generator.rb +++ b/actioncable/lib/rails/generators/channel/channel_generator.rb @@ -10,14 +10,14 @@ module Rails check_class_collision suffix: "Channel" def create_channel_file - template "channel.rb", File.join('app/channels', class_path, "#{file_name}_channel.rb") + template "channel.rb", File.join("app/channels", class_path, "#{file_name}_channel.rb") if options[:assets] if self.behavior == :invoke template "assets/cable.js", "app/assets/javascripts/cable.js" end - js_template "assets/channel", File.join('app/assets/javascripts/channels', class_path, "#{file_name}") + js_template "assets/channel", File.join("app/assets/javascripts/channels", class_path, "#{file_name}") end generate_application_cable_files @@ -25,7 +25,7 @@ module Rails protected def file_name - @_file_name ||= super.gsub(/_channel/i, '') + @_file_name ||= super.gsub(/_channel/i, "") end # FIXME: Change these files to symlinks once RubyGems 2.5.0 is required. @@ -33,12 +33,12 @@ module Rails return if self.behavior != :invoke files = [ - 'application_cable/channel.rb', - 'application_cable/connection.rb' + "application_cable/channel.rb", + "application_cable/connection.rb" ] files.each do |name| - path = File.join('app/channels/', name) + path = File.join("app/channels/", name) template(name, path) if !File.exist?(path) end end diff --git a/actioncable/test/channel/base_test.rb b/actioncable/test/channel/base_test.rb index daa782eeb3..9a3a3581e6 100644 --- a/actioncable/test/channel/base_test.rb +++ b/actioncable/test/channel/base_test.rb @@ -1,6 +1,6 @@ -require 'test_helper' -require 'stubs/test_connection' -require 'stubs/room' +require "test_helper" +require "stubs/test_connection" +require "stubs/room" class ActionCable::Channel::BaseTest < ActiveSupport::TestCase class ActionCable::Channel::Base @@ -58,7 +58,7 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase end def get_latest - transmit data: 'latest' + transmit data: "latest" end def receive @@ -74,14 +74,16 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase setup do @user = User.new "lifo" @connection = TestConnection.new(@user) - @channel = ChatChannel.new @connection, "{id: 1}", { id: 1 } + @channel = ChatChannel.new @connection, "{id: 1}", id: 1 end - test "should subscribe to a channel on initialize" do + test "should subscribe to a channel" do + @channel.subscribe_to_channel assert_equal 1, @channel.room.id end test "on subscribe callbacks" do + @channel.subscribe_to_channel assert @channel.subscribed end @@ -90,6 +92,8 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase end test "unsubscribing from a channel" do + @channel.subscribe_to_channel + assert @channel.room assert @channel.subscribed? @@ -104,54 +108,59 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase end test "callable action without any argument" do - @channel.perform_action 'action' => :leave + @channel.perform_action "action" => :leave assert_equal [ :leave ], @channel.last_action end test "callable action with arguments" do - data = { 'action' => :speak, 'content' => "Hello World" } + data = { "action" => :speak, "content" => "Hello World" } @channel.perform_action data assert_equal [ :speak, data ], @channel.last_action end test "should not dispatch a private method" do - @channel.perform_action 'action' => :rm_rf + @channel.perform_action "action" => :rm_rf assert_nil @channel.last_action end test "should not dispatch a public method defined on Base" do - @channel.perform_action 'action' => :kick + @channel.perform_action "action" => :kick assert_nil @channel.last_action end test "should dispatch a public method defined on Base and redefined on channel" do - data = { 'action' => :topic, 'content' => "This is Sparta!" } + data = { "action" => :topic, "content" => "This is Sparta!" } @channel.perform_action data assert_equal [ :topic, data ], @channel.last_action end test "should dispatch calling a public method defined in an ancestor" do - @channel.perform_action 'action' => :chatters + @channel.perform_action "action" => :chatters assert_equal [ :chatters ], @channel.last_action end test "should dispatch receive action when perform_action is called with empty action" do - data = { 'content' => 'hello' } + data = { "content" => "hello" } @channel.perform_action data assert_equal [ :receive ], @channel.last_action end test "transmitting data" do - @channel.perform_action 'action' => :get_latest + @channel.perform_action "action" => :get_latest - expected = { "identifier" => "{id: 1}", "message" => { "data" => "latest" }} + expected = { "identifier" => "{id: 1}", "message" => { "data" => "latest" } } assert_equal expected, @connection.last_transmission end - test "subscription confirmation" do + test "do not send subscription confirmation on initialize" do + assert_nil @connection.last_transmission + end + + test "subscription confirmation on subscribe_to_channel" do expected = { "identifier" => "{id: 1}", "type" => "confirm_subscription" } + @channel.subscribe_to_channel assert_equal expected, @connection.last_transmission end @@ -162,7 +171,7 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase test "invalid action on Channel" do assert_logged("Unable to process ActionCable::Channel::BaseTest::ChatChannel#invalid_action") do - @channel.perform_action 'action' => :invalid_action + @channel.perform_action "action" => :invalid_action end end @@ -173,12 +182,12 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase events << ActiveSupport::Notifications::Event.new(*args) end - data = {'action' => :speak, 'content' => 'hello'} + data = { "action" => :speak, "content" => "hello" } @channel.perform_action data assert_equal 1, events.length - assert_equal 'perform_action.action_cable', events[0].name - assert_equal 'ActionCable::Channel::BaseTest::ChatChannel', events[0].payload[:channel_class] + assert_equal "perform_action.action_cable", events[0].name + assert_equal "ActionCable::Channel::BaseTest::ChatChannel", events[0].payload[:channel_class] assert_equal :speak, events[0].payload[:action] assert_equal data, events[0].payload[:data] ensure @@ -189,27 +198,29 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase test "notification for transmit" do begin events = [] - ActiveSupport::Notifications.subscribe 'transmit.action_cable' do |*args| + ActiveSupport::Notifications.subscribe "transmit.action_cable" do |*args| events << ActiveSupport::Notifications::Event.new(*args) end - @channel.perform_action 'action' => :get_latest - expected_data = {data: 'latest'} + @channel.perform_action "action" => :get_latest + expected_data = { data: "latest" } assert_equal 1, events.length - assert_equal 'transmit.action_cable', events[0].name - assert_equal 'ActionCable::Channel::BaseTest::ChatChannel', events[0].payload[:channel_class] + assert_equal "transmit.action_cable", events[0].name + assert_equal "ActionCable::Channel::BaseTest::ChatChannel", events[0].payload[:channel_class] assert_equal expected_data, events[0].payload[:data] assert_nil events[0].payload[:via] ensure - ActiveSupport::Notifications.unsubscribe 'transmit.action_cable' + ActiveSupport::Notifications.unsubscribe "transmit.action_cable" end end test "notification for transmit_subscription_confirmation" do begin + @channel.subscribe_to_channel + events = [] - ActiveSupport::Notifications.subscribe 'transmit_subscription_confirmation.action_cable' do |*args| + ActiveSupport::Notifications.subscribe "transmit_subscription_confirmation.action_cable" do |*args| events << ActiveSupport::Notifications::Event.new(*args) end @@ -217,27 +228,27 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase @channel.send(:transmit_subscription_confirmation) assert_equal 1, events.length - assert_equal 'transmit_subscription_confirmation.action_cable', events[0].name - assert_equal 'ActionCable::Channel::BaseTest::ChatChannel', events[0].payload[:channel_class] + assert_equal "transmit_subscription_confirmation.action_cable", events[0].name + assert_equal "ActionCable::Channel::BaseTest::ChatChannel", events[0].payload[:channel_class] ensure - ActiveSupport::Notifications.unsubscribe 'transmit_subscription_confirmation.action_cable' + ActiveSupport::Notifications.unsubscribe "transmit_subscription_confirmation.action_cable" end end test "notification for transmit_subscription_rejection" do begin events = [] - ActiveSupport::Notifications.subscribe 'transmit_subscription_rejection.action_cable' do |*args| + ActiveSupport::Notifications.subscribe "transmit_subscription_rejection.action_cable" do |*args| events << ActiveSupport::Notifications::Event.new(*args) end @channel.send(:transmit_subscription_rejection) assert_equal 1, events.length - assert_equal 'transmit_subscription_rejection.action_cable', events[0].name - assert_equal 'ActionCable::Channel::BaseTest::ChatChannel', events[0].payload[:channel_class] + assert_equal "transmit_subscription_rejection.action_cable", events[0].name + assert_equal "ActionCable::Channel::BaseTest::ChatChannel", events[0].payload[:channel_class] ensure - ActiveSupport::Notifications.unsubscribe 'transmit_subscription_rejection.action_cable' + ActiveSupport::Notifications.unsubscribe "transmit_subscription_rejection.action_cable" end end diff --git a/actioncable/test/channel/broadcasting_test.rb b/actioncable/test/channel/broadcasting_test.rb index 1de04243e5..3476c1db31 100644 --- a/actioncable/test/channel/broadcasting_test.rb +++ b/actioncable/test/channel/broadcasting_test.rb @@ -1,6 +1,6 @@ -require 'test_helper' -require 'stubs/test_connection' -require 'stubs/room' +require "test_helper" +require "stubs/test_connection" +require "stubs/room" class ActionCable::Channel::BroadcastingTest < ActiveSupport::TestCase class ChatChannel < ActionCable::Channel::Base @@ -11,7 +11,7 @@ class ActionCable::Channel::BroadcastingTest < ActiveSupport::TestCase end test "broadcasts_to" do - ActionCable.stubs(:server).returns mock().tap { |m| m.expects(:broadcast).with('action_cable:channel:broadcasting_test:chat:Room#1-Campfire', "Hello World") } + ActionCable.stubs(:server).returns mock().tap { |m| m.expects(:broadcast).with("action_cable:channel:broadcasting_test:chat:Room#1-Campfire", "Hello World") } ChatChannel.broadcast_to(Room.new(1), "Hello World") end diff --git a/actioncable/test/channel/naming_test.rb b/actioncable/test/channel/naming_test.rb index 89ef6ad8b0..08f0e7be48 100644 --- a/actioncable/test/channel/naming_test.rb +++ b/actioncable/test/channel/naming_test.rb @@ -1,4 +1,4 @@ -require 'test_helper' +require "test_helper" class ActionCable::Channel::NamingTest < ActiveSupport::TestCase class ChatChannel < ActionCable::Channel::Base diff --git a/actioncable/test/channel/periodic_timers_test.rb b/actioncable/test/channel/periodic_timers_test.rb index 03464003cf..0cc4992ef6 100644 --- a/actioncable/test/channel/periodic_timers_test.rb +++ b/actioncable/test/channel/periodic_timers_test.rb @@ -1,7 +1,7 @@ -require 'test_helper' -require 'stubs/test_connection' -require 'stubs/room' -require 'active_support/time' +require "test_helper" +require "stubs/test_connection" +require "stubs/room" +require "active_support/time" class ActionCable::Channel::PeriodicTimersTest < ActiveSupport::TestCase class ChatChannel < ActionCable::Channel::Base @@ -32,26 +32,26 @@ class ActionCable::Channel::PeriodicTimersTest < ActiveSupport::TestCase timers.each_with_index do |timer, i| assert_kind_of Proc, timer[0] - assert_equal i+1, timer[1][:every] + assert_equal i + 1, timer[1][:every] end end - test 'disallow negative and zero periods' do - [ 0, 0.0, 0.seconds, -1, -1.seconds, 'foo', :foo, Object.new ].each do |invalid| + test "disallow negative and zero periods" do + [ 0, 0.0, 0.seconds, -1, -1.seconds, "foo", :foo, Object.new ].each do |invalid| assert_raise ArgumentError, /Expected every:/ do ChatChannel.periodically :send_updates, every: invalid end end end - test 'disallow block and arg together' do + test "disallow block and arg together" do assert_raise ArgumentError, /not both/ do ChatChannel.periodically(:send_updates, every: 1) { ping } end end - test 'disallow unknown args' do - [ 'send_updates', Object.new, nil ].each do |invalid| + test "disallow unknown args" do + [ "send_updates", Object.new, nil ].each do |invalid| assert_raise ArgumentError, /Expected a Symbol/ do ChatChannel.periodically invalid, every: 1 end @@ -60,8 +60,9 @@ class ActionCable::Channel::PeriodicTimersTest < ActiveSupport::TestCase test "timer start and stop" do @connection.server.event_loop.expects(:timer).times(3).returns(stub(shutdown: nil)) - channel = ChatChannel.new @connection, "{id: 1}", { id: 1 } + channel = ChatChannel.new @connection, "{id: 1}", id: 1 + channel.subscribe_to_channel channel.unsubscribe_from_channel assert_equal [], channel.send(:active_periodic_timers) end diff --git a/actioncable/test/channel/rejection_test.rb b/actioncable/test/channel/rejection_test.rb index 15db57d6ba..99c4a7603a 100644 --- a/actioncable/test/channel/rejection_test.rb +++ b/actioncable/test/channel/rejection_test.rb @@ -1,12 +1,15 @@ -require 'test_helper' -require 'stubs/test_connection' -require 'stubs/room' +require "test_helper" +require "stubs/test_connection" +require "stubs/room" class ActionCable::Channel::RejectionTest < ActiveSupport::TestCase class SecretChannel < ActionCable::Channel::Base def subscribed reject if params[:id] > 0 end + + def secret_action + end end setup do @@ -16,10 +19,23 @@ class ActionCable::Channel::RejectionTest < ActiveSupport::TestCase test "subscription rejection" do @connection.expects(:subscriptions).returns mock().tap { |m| m.expects(:remove_subscription).with instance_of(SecretChannel) } - @channel = SecretChannel.new @connection, "{id: 1}", { id: 1 } + @channel = SecretChannel.new @connection, "{id: 1}", id: 1 + @channel.subscribe_to_channel expected = { "identifier" => "{id: 1}", "type" => "reject_subscription" } assert_equal expected, @connection.last_transmission end + test "does not execute action if subscription is rejected" do + @connection.expects(:subscriptions).returns mock().tap { |m| m.expects(:remove_subscription).with instance_of(SecretChannel) } + @channel = SecretChannel.new @connection, "{id: 1}", id: 1 + @channel.subscribe_to_channel + + expected = { "identifier" => "{id: 1}", "type" => "reject_subscription" } + assert_equal expected, @connection.last_transmission + assert_equal 1, @connection.transmissions.size + + @channel.perform_action("action" => :secret_action) + assert_equal 1, @connection.transmissions.size + end end diff --git a/actioncable/test/channel/stream_test.rb b/actioncable/test/channel/stream_test.rb index 38543920d3..31dcde2e29 100644 --- a/actioncable/test/channel/stream_test.rb +++ b/actioncable/test/channel/stream_test.rb @@ -1,6 +1,6 @@ -require 'test_helper' -require 'stubs/test_connection' -require 'stubs/room' +require "test_helper" +require "stubs/test_connection" +require "stubs/room" module ActionCable::StreamTests class Connection < ActionCable::Connection::Base @@ -25,11 +25,11 @@ module ActionCable::StreamTests private def pick_coder(coder) case coder - when nil, 'json' + when nil, "json" ActiveSupport::JSON - when 'custom' + when "custom" DummyEncoder - when 'none' + when "none" nil end end @@ -38,7 +38,7 @@ module ActionCable::StreamTests module DummyEncoder extend self def encode(*) '{ "foo": "encoded" }' end - def decode(*) { foo: 'decoded' } end + def decode(*) { foo: "decoded" } end end class SymbolChannel < ActionCable::Channel::Base @@ -52,7 +52,8 @@ module ActionCable::StreamTests run_in_eventmachine do connection = TestConnection.new connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("test_room_1", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } - channel = ChatChannel.new connection, "{id: 1}", { id: 1 } + channel = ChatChannel.new connection, "{id: 1}", id: 1 + channel.subscribe_to_channel connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe) } channel.unsubscribe_from_channel @@ -64,6 +65,7 @@ module ActionCable::StreamTests connection = TestConnection.new connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("channel", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } channel = SymbolChannel.new connection, "" + channel.subscribe_to_channel connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe) } channel.unsubscribe_from_channel @@ -76,6 +78,7 @@ module ActionCable::StreamTests connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:stream_tests:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } channel = ChatChannel.new connection, "" + channel.subscribe_to_channel channel.stream_for Room.new(1) end end @@ -84,7 +87,9 @@ module ActionCable::StreamTests run_in_eventmachine do connection = TestConnection.new - ChatChannel.new connection, "{id: 1}", { id: 1 } + channel = ChatChannel.new connection, "{id: 1}", id: 1 + channel.subscribe_to_channel + assert_nil connection.last_transmission wait_for_async @@ -114,7 +119,7 @@ module ActionCable::StreamTests end end - require 'action_cable/subscription_adapter/inline' + require "action_cable/subscription_adapter/async" class UserCallbackChannel < ActionCable::Channel::Base def subscribed @@ -124,19 +129,26 @@ module ActionCable::StreamTests end end - class StreamEncodingTest < ActionCable::TestCase + class MultiChatChannel < ActionCable::Channel::Base + def subscribed + stream_from "main_room" + stream_from "test_all_rooms" + end + end + + class StreamFromTest < ActionCable::TestCase setup do - @server = TestServer.new(subscription_adapter: ActionCable::SubscriptionAdapter::Inline) + @server = TestServer.new(subscription_adapter: ActionCable::SubscriptionAdapter::Async) @server.config.allowed_request_origins = %w( http://rubyonrails.com ) end - test 'custom encoder' do + test "custom encoder" do run_in_eventmachine do connection = open_connection subscribe_to connection, identifiers: { id: 1 } connection.websocket.expects(:transmit) - @server.broadcast 'test_room_1', { foo: 'bar' }, coder: DummyEncoder + @server.broadcast "test_room_1", { foo: "bar" }, coder: DummyEncoder wait_for_async wait_for_executor connection.server.worker_pool.executor end @@ -145,21 +157,32 @@ module ActionCable::StreamTests test "user supplied callbacks are run through the worker pool" do run_in_eventmachine do connection = open_connection - receive(connection, command: 'subscribe', channel: UserCallbackChannel.name, identifiers: { id: 1 }) + receive(connection, command: "subscribe", channel: UserCallbackChannel.name, identifiers: { id: 1 }) - @server.broadcast 'channel', {} + @server.broadcast "channel", {} wait_for_async refute Thread.current[:ran_callback], "User callback was not run through the worker pool" end end + test "subscription confirmation should only be sent out once with muptiple stream_from" do + run_in_eventmachine do + connection = open_connection + expected = { "identifier" => { "channel" => MultiChatChannel.name }.to_json, "type" => "confirm_subscription" } + connection.websocket.expects(:transmit).with(expected.to_json) + receive(connection, command: "subscribe", channel: MultiChatChannel.name, identifiers: {}) + + wait_for_async + end + end + private def subscribe_to(connection, identifiers:) - receive connection, command: 'subscribe', identifiers: identifiers + receive connection, command: "subscribe", identifiers: identifiers end def open_connection - env = Rack::MockRequest.env_for '/test', 'HTTP_HOST' => 'localhost', 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket', 'HTTP_ORIGIN' => 'http://rubyonrails.com' + env = Rack::MockRequest.env_for "/test", "HTTP_HOST" => "localhost", "HTTP_CONNECTION" => "upgrade", "HTTP_UPGRADE" => "websocket", "HTTP_ORIGIN" => "http://rubyonrails.com" Connection.new(@server, env).tap do |connection| connection.process @@ -170,7 +193,7 @@ module ActionCable::StreamTests end end - def receive(connection, command:, identifiers:, channel: 'ActionCable::StreamTests::ChatChannel') + def receive(connection, command:, identifiers:, channel: "ActionCable::StreamTests::ChatChannel") identifier = JSON.generate(channel: channel, **identifiers) connection.dispatch_websocket_message JSON.generate(command: command, identifier: identifier) wait_for_async diff --git a/actioncable/test/client_test.rb b/actioncable/test/client_test.rb index 82d9f12fdd..98a114a5f4 100644 --- a/actioncable/test/client_test.rb +++ b/actioncable/test/client_test.rb @@ -1,13 +1,31 @@ -require 'test_helper' -require 'concurrent' +require "test_helper" +require "concurrent" -require 'faye/websocket' -require 'json' +require "websocket-client-simple" +require "json" -require 'active_support/hash_with_indifferent_access' +require "active_support/hash_with_indifferent_access" + +#### +# 😷 Warning suppression 😷 +WebSocket::Frame::Handler::Handler03.prepend Module.new { + def initialize(*) + @application_data_buffer = nil + super + end +} + +WebSocket::Frame::Data.prepend Module.new { + def initialize(*) + @masking_key = nil + super + end +} +# +#### class ClientTest < ActionCable::TestCase - WAIT_WHEN_EXPECTING_EVENT = 8 + WAIT_WHEN_EXPECTING_EVENT = 2 WAIT_WHEN_NOT_EXPECTING_EVENT = 0.5 class EchoChannel < ActionCable::Channel::Base @@ -16,20 +34,20 @@ class ClientTest < ActionCable::TestCase end def unsubscribed - 'Goodbye from EchoChannel!' + "Goodbye from EchoChannel!" end def ding(data) - transmit(dong: data['message']) + transmit(dong: data["message"]) end def delay(data) sleep 1 - transmit(dong: data['message']) + transmit(dong: data["message"]) end def bulk(data) - ActionCable.server.broadcast "global", wide: data['message'] + ActionCable.server.broadcast "global", wide: data["message"] end end @@ -38,26 +56,15 @@ class ClientTest < ActionCable::TestCase server = ActionCable.server server.config.logger = Logger.new(StringIO.new).tap { |l| l.level = Logger::UNKNOWN } - server.config.cable = ActiveSupport::HashWithIndifferentAccess.new(adapter: 'async') - server.config.use_faye = ENV['FAYE'].present? + server.config.cable = ActiveSupport::HashWithIndifferentAccess.new(adapter: "async") # and now the "real" setup for our test: server.config.disable_request_forgery_protection = true - - Thread.new { EventMachine.run } unless EventMachine.reactor_running? - Thread.pass until EventMachine.reactor_running? - - # faye-websocket is warning-rich - @previous_verbose, $VERBOSE = $VERBOSE, nil - end - - def teardown - $VERBOSE = @previous_verbose end def with_puma_server(rack_app = ActionCable.server, port = 3099) server = ::Puma::Server.new(rack_app, ::Puma::Events.strings) - server.add_tcp_listener '127.0.0.1', port + server.add_tcp_listener "127.0.0.1", port server.min_threads = 1 server.max_threads = 4 @@ -73,44 +80,49 @@ class ClientTest < ActionCable::TestCase attr_reader :pings def initialize(port) - @ws = Faye::WebSocket::Client.new("ws://127.0.0.1:#{port}/") - @messages = Queue.new - @closed = Concurrent::Event.new - @has_messages = Concurrent::Semaphore.new(0) - @pings = 0 - - open = Concurrent::Event.new - error = nil - - @ws.on(:error) do |event| - if open.set? - @messages << RuntimeError.new(event.message) - else - error = event.message - open.set + messages = @messages = Queue.new + closed = @closed = Concurrent::Event.new + has_messages = @has_messages = Concurrent::Semaphore.new(0) + pings = @pings = Concurrent::AtomicFixnum.new(0) + + open = Concurrent::Promise.new + + @ws = WebSocket::Client::Simple.connect("ws://127.0.0.1:#{port}/") do |ws| + ws.on(:error) do |event| + event = RuntimeError.new(event.message) unless event.is_a?(Exception) + + if open.pending? + open.fail(event) + else + messages << event + has_messages.release + end end - end - @ws.on(:open) do |event| - open.set - end + ws.on(:open) do |event| + open.set(true) + end - @ws.on(:message) do |event| - message = JSON.parse(event.data) - if message['type'] == 'ping' - @pings += 1 - else - @messages << message - @has_messages.release + ws.on(:message) do |event| + if event.type == :close + closed.set + else + message = JSON.parse(event.data) + if message["type"] == "ping" + pings.increment + else + messages << message + has_messages.release + end + end end - end - @ws.on(:close) do |event| - @closed.set + ws.on(:close) do |event| + closed.set + end end - open.wait(WAIT_WHEN_EXPECTING_EVENT) - raise error if error + open.wait!(WAIT_WHEN_EXPECTING_EVENT) end def read_message @@ -161,76 +173,80 @@ class ClientTest < ActionCable::TestCase end end - def faye_client(port) + def websocket_client(port) SyncClient.new(port) end + def concurrently(enum) + enum.map { |*x| Concurrent::Future.execute { yield(*x) } }.map(&:value!) + end + def test_single_client with_puma_server do |port| - c = faye_client(port) - assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack - c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'ClientTest::EchoChannel') - assert_equal({"identifier"=>"{\"channel\":\"ClientTest::EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message) - c.send_message command: 'message', identifier: JSON.generate(channel: 'ClientTest::EchoChannel'), data: JSON.generate(action: 'ding', message: 'hello') - assert_equal({"identifier"=>"{\"channel\":\"ClientTest::EchoChannel\"}", "message"=>{"dong"=>"hello"}}, c.read_message) + c = websocket_client(port) + assert_equal({ "type" => "welcome" }, c.read_message) # pop the first welcome message off the stack + c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel") + assert_equal({ "identifier" => "{\"channel\":\"ClientTest::EchoChannel\"}", "type" => "confirm_subscription" }, c.read_message) + c.send_message command: "message", identifier: JSON.generate(channel: "ClientTest::EchoChannel"), data: JSON.generate(action: "ding", message: "hello") + assert_equal({ "identifier" => "{\"channel\":\"ClientTest::EchoChannel\"}", "message" => { "dong" => "hello" } }, c.read_message) c.close end end def test_interacting_clients with_puma_server do |port| - clients = 10.times.map { faye_client(port) } + clients = concurrently(10.times) { websocket_client(port) } barrier_1 = Concurrent::CyclicBarrier.new(clients.size) barrier_2 = Concurrent::CyclicBarrier.new(clients.size) - clients.map {|c| Concurrent::Future.execute { - assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack - c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'ClientTest::EchoChannel') - assert_equal({"identifier"=>'{"channel":"ClientTest::EchoChannel"}', "type"=>"confirm_subscription"}, c.read_message) - c.send_message command: 'message', identifier: JSON.generate(channel: 'ClientTest::EchoChannel'), data: JSON.generate(action: 'ding', message: 'hello') - assert_equal({"identifier"=>'{"channel":"ClientTest::EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message) + concurrently(clients) do |c| + assert_equal({ "type" => "welcome" }, c.read_message) # pop the first welcome message off the stack + c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel") + assert_equal({ "identifier" => '{"channel":"ClientTest::EchoChannel"}', "type" => "confirm_subscription" }, c.read_message) + c.send_message command: "message", identifier: JSON.generate(channel: "ClientTest::EchoChannel"), data: JSON.generate(action: "ding", message: "hello") + assert_equal({ "identifier" => '{"channel":"ClientTest::EchoChannel"}', "message" => { "dong" => "hello" } }, c.read_message) barrier_1.wait WAIT_WHEN_EXPECTING_EVENT - c.send_message command: 'message', identifier: JSON.generate(channel: 'ClientTest::EchoChannel'), data: JSON.generate(action: 'bulk', message: 'hello') + c.send_message command: "message", identifier: JSON.generate(channel: "ClientTest::EchoChannel"), data: JSON.generate(action: "bulk", message: "hello") barrier_2.wait WAIT_WHEN_EXPECTING_EVENT assert_equal clients.size, c.read_messages(clients.size).size - } }.each(&:wait!) + end - clients.map {|c| Concurrent::Future.execute { c.close } }.each(&:wait!) + concurrently(clients, &:close) end end def test_many_clients with_puma_server do |port| - clients = 100.times.map { faye_client(port) } - - clients.map {|c| Concurrent::Future.execute { - assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack - c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'ClientTest::EchoChannel') - assert_equal({"identifier"=>'{"channel":"ClientTest::EchoChannel"}', "type"=>"confirm_subscription"}, c.read_message) - c.send_message command: 'message', identifier: JSON.generate(channel: 'ClientTest::EchoChannel'), data: JSON.generate(action: 'ding', message: 'hello') - assert_equal({"identifier"=>'{"channel":"ClientTest::EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message) - } }.each(&:wait!) + clients = concurrently(100.times) { websocket_client(port) } + + concurrently(clients) do |c| + assert_equal({ "type" => "welcome" }, c.read_message) # pop the first welcome message off the stack + c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel") + assert_equal({ "identifier" => '{"channel":"ClientTest::EchoChannel"}', "type" => "confirm_subscription" }, c.read_message) + c.send_message command: "message", identifier: JSON.generate(channel: "ClientTest::EchoChannel"), data: JSON.generate(action: "ding", message: "hello") + assert_equal({ "identifier" => '{"channel":"ClientTest::EchoChannel"}', "message" => { "dong" => "hello" } }, c.read_message) + end - clients.map {|c| Concurrent::Future.execute { c.close } }.each(&:wait!) + concurrently(clients, &:close) end end def test_disappearing_client with_puma_server do |port| - c = faye_client(port) - assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack - c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'ClientTest::EchoChannel') - assert_equal({"identifier"=>"{\"channel\":\"ClientTest::EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message) - c.send_message command: 'message', identifier: JSON.generate(channel: 'ClientTest::EchoChannel'), data: JSON.generate(action: 'delay', message: 'hello') + c = websocket_client(port) + assert_equal({ "type" => "welcome" }, c.read_message) # pop the first welcome message off the stack + c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel") + assert_equal({ "identifier" => "{\"channel\":\"ClientTest::EchoChannel\"}", "type" => "confirm_subscription" }, c.read_message) + c.send_message command: "message", identifier: JSON.generate(channel: "ClientTest::EchoChannel"), data: JSON.generate(action: "delay", message: "hello") c.close # disappear before write - c = faye_client(port) - assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack - c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'ClientTest::EchoChannel') - assert_equal({"identifier"=>"{\"channel\":\"ClientTest::EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message) - c.send_message command: 'message', identifier: JSON.generate(channel: 'ClientTest::EchoChannel'), data: JSON.generate(action: 'ding', message: 'hello') - assert_equal({"identifier"=>'{"channel":"ClientTest::EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message) + c = websocket_client(port) + assert_equal({ "type" => "welcome" }, c.read_message) # pop the first welcome message off the stack + c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel") + assert_equal({ "identifier" => "{\"channel\":\"ClientTest::EchoChannel\"}", "type" => "confirm_subscription" }, c.read_message) + c.send_message command: "message", identifier: JSON.generate(channel: "ClientTest::EchoChannel"), data: JSON.generate(action: "ding", message: "hello") + assert_equal({ "identifier" => '{"channel":"ClientTest::EchoChannel"}', "message" => { "dong" => "hello" } }, c.read_message) c.close # disappear before read end end @@ -238,17 +254,17 @@ class ClientTest < ActionCable::TestCase def test_unsubscribe_client with_puma_server do |port| app = ActionCable.server - identifier = JSON.generate(channel: 'ClientTest::EchoChannel') + identifier = JSON.generate(channel: "ClientTest::EchoChannel") - c = faye_client(port) - assert_equal({"type" => "welcome"}, c.read_message) - c.send_message command: 'subscribe', identifier: identifier - assert_equal({"identifier"=>"{\"channel\":\"ClientTest::EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message) + c = websocket_client(port) + assert_equal({ "type" => "welcome" }, c.read_message) + c.send_message command: "subscribe", identifier: identifier + assert_equal({ "identifier" => "{\"channel\":\"ClientTest::EchoChannel\"}", "type" => "confirm_subscription" }, c.read_message) assert_equal(1, app.connections.count) assert(app.remote_connections.where(identifier: identifier)) subscriptions = app.connections.first.subscriptions.send(:subscriptions) - assert_not_equal 0, subscriptions.size, 'Missing EchoChannel subscription' + assert_not_equal 0, subscriptions.size, "Missing EchoChannel subscription" channel = subscriptions.first[1] channel.expects(:unsubscribed) c.close @@ -261,10 +277,10 @@ class ClientTest < ActionCable::TestCase def test_server_restart with_puma_server do |port| - c = faye_client(port) - assert_equal({"type" => "welcome"}, c.read_message) - c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'ClientTest::EchoChannel') - assert_equal({"identifier"=>"{\"channel\":\"ClientTest::EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message) + c = websocket_client(port) + assert_equal({ "type" => "welcome" }, c.read_message) + c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel") + assert_equal({ "identifier" => "{\"channel\":\"ClientTest::EchoChannel\"}", "type" => "confirm_subscription" }, c.read_message) ActionCable.server.restart c.wait_for_close diff --git a/actioncable/test/connection/authorization_test.rb b/actioncable/test/connection/authorization_test.rb index a0506cb9c0..dcdbe9c1d1 100644 --- a/actioncable/test/connection/authorization_test.rb +++ b/actioncable/test/connection/authorization_test.rb @@ -1,5 +1,5 @@ -require 'test_helper' -require 'stubs/test_server' +require "test_helper" +require "stubs/test_server" class ActionCable::Connection::AuthorizationTest < ActionCable::TestCase class Connection < ActionCable::Connection::Base @@ -19,8 +19,8 @@ class ActionCable::Connection::AuthorizationTest < ActionCable::TestCase server = TestServer.new server.config.allowed_request_origins = %w( http://rubyonrails.com ) - env = Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket', - 'HTTP_HOST' => 'localhost', 'HTTP_ORIGIN' => 'http://rubyonrails.com' + env = Rack::MockRequest.env_for "/test", "HTTP_CONNECTION" => "upgrade", "HTTP_UPGRADE" => "websocket", + "HTTP_HOST" => "localhost", "HTTP_ORIGIN" => "http://rubyonrails.com" connection = Connection.new(server, env) connection.websocket.expects(:close) diff --git a/actioncable/test/connection/base_test.rb b/actioncable/test/connection/base_test.rb index d7e1041e68..9bcd0700cf 100644 --- a/actioncable/test/connection/base_test.rb +++ b/actioncable/test/connection/base_test.rb @@ -1,6 +1,6 @@ -require 'test_helper' -require 'stubs/test_server' -require 'active_support/core_ext/object/json' +require "test_helper" +require "stubs/test_server" +require "active_support/core_ext/object/json" class ActionCable::Connection::BaseTest < ActionCable::TestCase class Connection < ActionCable::Connection::Base @@ -113,14 +113,14 @@ class ActionCable::Connection::BaseTest < ActionCable::TestCase run_in_eventmachine do class CallMeMaybe def call(*) - raise 'Do not call me!' + raise "Do not call me!" end end env = Rack::MockRequest.env_for( "/test", - { 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket', - 'HTTP_HOST' => 'localhost', 'HTTP_ORIGIN' => 'http://rubyonrails.org', 'rack.hijack' => CallMeMaybe.new } + "HTTP_CONNECTION" => "upgrade", "HTTP_UPGRADE" => "websocket", + "HTTP_HOST" => "localhost", "HTTP_ORIGIN" => "http://rubyonrails.org", "rack.hijack" => CallMeMaybe.new ) connection = ActionCable::Connection::Base.new(@server, env) @@ -131,8 +131,8 @@ class ActionCable::Connection::BaseTest < ActionCable::TestCase private def open_connection - env = Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket', - 'HTTP_HOST' => 'localhost', 'HTTP_ORIGIN' => 'http://rubyonrails.com' + env = Rack::MockRequest.env_for "/test", "HTTP_CONNECTION" => "upgrade", "HTTP_UPGRADE" => "websocket", + "HTTP_HOST" => "localhost", "HTTP_ORIGIN" => "http://rubyonrails.com" Connection.new(@server, env) end diff --git a/actioncable/test/connection/client_socket_test.rb b/actioncable/test/connection/client_socket_test.rb index fe9077ae7f..bc3ff6a3d7 100644 --- a/actioncable/test/connection/client_socket_test.rb +++ b/actioncable/test/connection/client_socket_test.rb @@ -1,5 +1,5 @@ -require 'test_helper' -require 'stubs/test_server' +require "test_helper" +require "stubs/test_server" class ActionCable::Connection::ClientSocketTest < ActionCable::TestCase class Connection < ActionCable::Connection::Base @@ -32,42 +32,46 @@ class ActionCable::Connection::ClientSocketTest < ActionCable::TestCase @server.config.allowed_request_origins = %w( http://rubyonrails.com ) end - test 'delegate socket errors to on_error handler' do - skip if ENV['FAYE'].present? - + test "delegate socket errors to on_error handler" do run_in_eventmachine do connection = open_connection # Internal hax = :( client = connection.websocket.send(:websocket) - client.instance_variable_get('@stream').expects(:write).raises('foo') + client.instance_variable_get("@stream").expects(:write).raises("foo") client.expects(:client_gone).never - client.write('boo') + client.write("boo") assert_equal %w[ foo ], connection.errors end end - test 'closes hijacked i/o socket at shutdown' do - skip if ENV['FAYE'].present? - + test "closes hijacked i/o socket at shutdown" do run_in_eventmachine do connection = open_connection client = connection.websocket.send(:websocket) - client.instance_variable_get('@stream') - .instance_variable_get('@rack_hijack_io') - .expects(:close) + event = Concurrent::Event.new + client.instance_variable_get("@stream") + .instance_variable_get("@rack_hijack_io") + .define_singleton_method(:close) { event.set } connection.close + event.wait end end private def open_connection - env = Rack::MockRequest.env_for '/test', - 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket', - 'HTTP_HOST' => 'localhost', 'HTTP_ORIGIN' => 'http://rubyonrails.com' - env['rack.hijack'] = -> { env['rack.hijack_io'] = StringIO.new } + env = Rack::MockRequest.env_for "/test", + "HTTP_CONNECTION" => "upgrade", "HTTP_UPGRADE" => "websocket", + "HTTP_HOST" => "localhost", "HTTP_ORIGIN" => "http://rubyonrails.com" + io = \ + begin + Socket.pair(Socket::AF_UNIX, Socket::SOCK_STREAM, 0).first + rescue + StringIO.new + end + env["rack.hijack"] = -> { env["rack.hijack_io"] = io } Connection.new(@server, env).tap do |connection| connection.process diff --git a/actioncable/test/connection/cross_site_forgery_test.rb b/actioncable/test/connection/cross_site_forgery_test.rb index 2d516b0533..37bedfd734 100644 --- a/actioncable/test/connection/cross_site_forgery_test.rb +++ b/actioncable/test/connection/cross_site_forgery_test.rb @@ -1,8 +1,8 @@ -require 'test_helper' -require 'stubs/test_server' +require "test_helper" +require "stubs/test_server" class ActionCable::Connection::CrossSiteForgeryTest < ActionCable::TestCase - HOST = 'rubyonrails.com' + HOST = "rubyonrails.com" class Connection < ActionCable::Connection::Base def send_async(method, *args) @@ -13,44 +13,53 @@ class ActionCable::Connection::CrossSiteForgeryTest < ActionCable::TestCase setup do @server = TestServer.new @server.config.allowed_request_origins = %w( http://rubyonrails.com ) + @server.config.allow_same_origin_as_host = false end teardown do @server.config.disable_request_forgery_protection = false @server.config.allowed_request_origins = [] + @server.config.allow_same_origin_as_host = true end test "disable forgery protection" do @server.config.disable_request_forgery_protection = true - assert_origin_allowed 'http://rubyonrails.com' - assert_origin_allowed 'http://hax.com' + assert_origin_allowed "http://rubyonrails.com" + assert_origin_allowed "http://hax.com" end test "explicitly specified a single allowed origin" do - @server.config.allowed_request_origins = 'http://hax.com' - assert_origin_not_allowed 'http://rubyonrails.com' - assert_origin_allowed 'http://hax.com' + @server.config.allowed_request_origins = "http://hax.com" + assert_origin_not_allowed "http://rubyonrails.com" + assert_origin_allowed "http://hax.com" end test "explicitly specified multiple allowed origins" do @server.config.allowed_request_origins = %w( http://rubyonrails.com http://www.rubyonrails.com ) - assert_origin_allowed 'http://rubyonrails.com' - assert_origin_allowed 'http://www.rubyonrails.com' - assert_origin_not_allowed 'http://hax.com' + assert_origin_allowed "http://rubyonrails.com" + assert_origin_allowed "http://www.rubyonrails.com" + assert_origin_not_allowed "http://hax.com" end test "explicitly specified a single regexp allowed origin" do @server.config.allowed_request_origins = /.*ha.*/ - assert_origin_not_allowed 'http://rubyonrails.com' - assert_origin_allowed 'http://hax.com' + assert_origin_not_allowed "http://rubyonrails.com" + assert_origin_allowed "http://hax.com" end test "explicitly specified multiple regexp allowed origins" do - @server.config.allowed_request_origins = [/http:\/\/ruby.*/, /.*rai.s.*com/, 'string' ] - assert_origin_allowed 'http://rubyonrails.com' - assert_origin_allowed 'http://www.rubyonrails.com' - assert_origin_not_allowed 'http://hax.com' - assert_origin_not_allowed 'http://rails.co.uk' + @server.config.allowed_request_origins = [/http:\/\/ruby.*/, /.*rai.s.*com/, "string" ] + assert_origin_allowed "http://rubyonrails.com" + assert_origin_allowed "http://www.rubyonrails.com" + assert_origin_not_allowed "http://hax.com" + assert_origin_not_allowed "http://rails.co.uk" + end + + test "allow same origin as host" do + @server.config.allow_same_origin_as_host = true + assert_origin_allowed "http://#{HOST}" + assert_origin_not_allowed "http://hax.com" + assert_origin_not_allowed "http://rails.co.uk" end private @@ -75,7 +84,7 @@ class ActionCable::Connection::CrossSiteForgeryTest < ActionCable::TestCase end def env_for_origin(origin) - Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket', 'SERVER_NAME' => HOST, - 'HTTP_HOST' => HOST, 'HTTP_ORIGIN' => origin + Rack::MockRequest.env_for "/test", "HTTP_CONNECTION" => "upgrade", "HTTP_UPGRADE" => "websocket", "SERVER_NAME" => HOST, + "HTTP_HOST" => HOST, "HTTP_ORIGIN" => origin end end diff --git a/actioncable/test/connection/identifier_test.rb b/actioncable/test/connection/identifier_test.rb index b48d9af809..a4dfcc06f0 100644 --- a/actioncable/test/connection/identifier_test.rb +++ b/actioncable/test/connection/identifier_test.rb @@ -1,6 +1,6 @@ -require 'test_helper' -require 'stubs/test_server' -require 'stubs/user' +require "test_helper" +require "stubs/test_server" +require "stubs/user" class ActionCable::Connection::IdentifierTest < ActionCable::TestCase class Connection < ActionCable::Connection::Base @@ -23,9 +23,9 @@ class ActionCable::Connection::IdentifierTest < ActionCable::TestCase test "should subscribe to internal channel on open and unsubscribe on close" do run_in_eventmachine do - pubsub = mock('pubsub_adapter') - pubsub.expects(:subscribe).with('action_cable/User#lifo', kind_of(Proc)) - pubsub.expects(:unsubscribe).with('action_cable/User#lifo', kind_of(Proc)) + pubsub = mock("pubsub_adapter") + pubsub.expects(:subscribe).with("action_cable/User#lifo", kind_of(Proc)) + pubsub.expects(:unsubscribe).with("action_cable/User#lifo", kind_of(Proc)) server = TestServer.new server.stubs(:pubsub).returns(pubsub) @@ -40,7 +40,7 @@ class ActionCable::Connection::IdentifierTest < ActionCable::TestCase open_connection_with_stubbed_pubsub @connection.websocket.expects(:close) - @connection.process_internal_message 'type' => 'disconnect' + @connection.process_internal_message "type" => "disconnect" end end @@ -49,20 +49,20 @@ class ActionCable::Connection::IdentifierTest < ActionCable::TestCase open_connection_with_stubbed_pubsub @connection.websocket.expects(:close).never - @connection.process_internal_message 'type' => 'unknown' + @connection.process_internal_message "type" => "unknown" end end protected def open_connection_with_stubbed_pubsub server = TestServer.new - server.stubs(:adapter).returns(stub_everything('adapter')) + server.stubs(:adapter).returns(stub_everything("adapter")) open_connection server: server end def open_connection(server:) - env = Rack::MockRequest.env_for "/test", 'HTTP_HOST' => 'localhost', 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket' + env = Rack::MockRequest.env_for "/test", "HTTP_HOST" => "localhost", "HTTP_CONNECTION" => "upgrade", "HTTP_UPGRADE" => "websocket" @connection = Connection.new(server, env) @connection.process diff --git a/actioncable/test/connection/multiple_identifiers_test.rb b/actioncable/test/connection/multiple_identifiers_test.rb index 484e73bb30..67e68355c5 100644 --- a/actioncable/test/connection/multiple_identifiers_test.rb +++ b/actioncable/test/connection/multiple_identifiers_test.rb @@ -1,6 +1,6 @@ -require 'test_helper' -require 'stubs/test_server' -require 'stubs/user' +require "test_helper" +require "stubs/test_server" +require "stubs/user" class ActionCable::Connection::MultipleIdentifiersTest < ActionCable::TestCase class Connection < ActionCable::Connection::Base @@ -22,13 +22,13 @@ class ActionCable::Connection::MultipleIdentifiersTest < ActionCable::TestCase protected def open_connection_with_stubbed_pubsub server = TestServer.new - server.stubs(:pubsub).returns(stub_everything('pubsub')) + server.stubs(:pubsub).returns(stub_everything("pubsub")) open_connection server: server end def open_connection(server:) - env = Rack::MockRequest.env_for "/test", 'HTTP_HOST' => 'localhost', 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket' + env = Rack::MockRequest.env_for "/test", "HTTP_HOST" => "localhost", "HTTP_CONNECTION" => "upgrade", "HTTP_UPGRADE" => "websocket" @connection = Connection.new(server, env) @connection.process diff --git a/actioncable/test/connection/stream_test.rb b/actioncable/test/connection/stream_test.rb index a7a61d8d6f..36e1d3c095 100644 --- a/actioncable/test/connection/stream_test.rb +++ b/actioncable/test/connection/stream_test.rb @@ -1,5 +1,5 @@ -require 'test_helper' -require 'stubs/test_server' +require "test_helper" +require "stubs/test_server" class ActionCable::Connection::StreamTest < ActionCable::TestCase class Connection < ActionCable::Connection::Base @@ -34,17 +34,15 @@ class ActionCable::Connection::StreamTest < ActionCable::TestCase [ EOFError, Errno::ECONNRESET ].each do |closed_exception| test "closes socket on #{closed_exception}" do - skip if ENV['FAYE'].present? - run_in_eventmachine do connection = open_connection # Internal hax = :( client = connection.websocket.send(:websocket) - client.instance_variable_get('@stream').instance_variable_get('@rack_hijack_io').expects(:write).raises(closed_exception, 'foo') + client.instance_variable_get("@stream").instance_variable_get("@rack_hijack_io").expects(:write).raises(closed_exception, "foo") client.expects(:client_gone) - client.write('boo') + client.write("boo") assert_equal [], connection.errors end end @@ -52,10 +50,10 @@ class ActionCable::Connection::StreamTest < ActionCable::TestCase private def open_connection - env = Rack::MockRequest.env_for '/test', - 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket', - 'HTTP_HOST' => 'localhost', 'HTTP_ORIGIN' => 'http://rubyonrails.com' - env['rack.hijack'] = -> { env['rack.hijack_io'] = StringIO.new } + env = Rack::MockRequest.env_for "/test", + "HTTP_CONNECTION" => "upgrade", "HTTP_UPGRADE" => "websocket", + "HTTP_HOST" => "localhost", "HTTP_ORIGIN" => "http://rubyonrails.com" + env["rack.hijack"] = -> { env["rack.hijack_io"] = StringIO.new } Connection.new(@server, env).tap do |connection| connection.process diff --git a/actioncable/test/connection/string_identifier_test.rb b/actioncable/test/connection/string_identifier_test.rb index eca0c31060..87484765e5 100644 --- a/actioncable/test/connection/string_identifier_test.rb +++ b/actioncable/test/connection/string_identifier_test.rb @@ -1,5 +1,5 @@ -require 'test_helper' -require 'stubs/test_server' +require "test_helper" +require "stubs/test_server" class ActionCable::Connection::StringIdentifierTest < ActionCable::TestCase class Connection < ActionCable::Connection::Base @@ -24,13 +24,13 @@ class ActionCable::Connection::StringIdentifierTest < ActionCable::TestCase protected def open_connection_with_stubbed_pubsub @server = TestServer.new - @server.stubs(:pubsub).returns(stub_everything('pubsub')) + @server.stubs(:pubsub).returns(stub_everything("pubsub")) open_connection end def open_connection - env = Rack::MockRequest.env_for "/test", 'HTTP_HOST' => 'localhost', 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket' + env = Rack::MockRequest.env_for "/test", "HTTP_HOST" => "localhost", "HTTP_CONNECTION" => "upgrade", "HTTP_UPGRADE" => "websocket" @connection = Connection.new(@server, env) @connection.process diff --git a/actioncable/test/connection/subscriptions_test.rb b/actioncable/test/connection/subscriptions_test.rb index a5b1e5dcf3..a1c8a4613c 100644 --- a/actioncable/test/connection/subscriptions_test.rb +++ b/actioncable/test/connection/subscriptions_test.rb @@ -1,4 +1,4 @@ -require 'test_helper' +require "test_helper" class ActionCable::Connection::SubscriptionsTest < ActionCable::TestCase class Connection < ActionCable::Connection::Base @@ -25,7 +25,7 @@ class ActionCable::Connection::SubscriptionsTest < ActionCable::TestCase setup do @server = TestServer.new - @chat_identifier = ActiveSupport::JSON.encode(id: 1, channel: 'ActionCable::Connection::SubscriptionsTest::ChatChannel') + @chat_identifier = ActiveSupport::JSON.encode(id: 1, channel: "ActionCable::Connection::SubscriptionsTest::ChatChannel") end test "subscribe command" do @@ -42,7 +42,7 @@ class ActionCable::Connection::SubscriptionsTest < ActionCable::TestCase run_in_eventmachine do setup_connection - @subscriptions.execute_command 'command' => 'subscribe' + @subscriptions.execute_command "command" => "subscribe" assert @subscriptions.identifiers.empty? end end @@ -55,7 +55,7 @@ class ActionCable::Connection::SubscriptionsTest < ActionCable::TestCase channel = subscribe_to_chat_channel channel.expects(:unsubscribe_from_channel) - @subscriptions.execute_command 'command' => 'unsubscribe', 'identifier' => @chat_identifier + @subscriptions.execute_command "command" => "unsubscribe", "identifier" => @chat_identifier assert @subscriptions.identifiers.empty? end end @@ -64,7 +64,7 @@ class ActionCable::Connection::SubscriptionsTest < ActionCable::TestCase run_in_eventmachine do setup_connection - @subscriptions.execute_command 'command' => 'unsubscribe' + @subscriptions.execute_command "command" => "unsubscribe" assert @subscriptions.identifiers.empty? end end @@ -74,8 +74,8 @@ class ActionCable::Connection::SubscriptionsTest < ActionCable::TestCase setup_connection channel = subscribe_to_chat_channel - data = { 'content' => 'Hello World!', 'action' => 'speak' } - @subscriptions.execute_command 'command' => 'message', 'identifier' => @chat_identifier, 'data' => ActiveSupport::JSON.encode(data) + data = { "content" => "Hello World!", "action" => "speak" } + @subscriptions.execute_command "command" => "message", "identifier" => @chat_identifier, "data" => ActiveSupport::JSON.encode(data) assert_equal [ data ], channel.lines end @@ -87,7 +87,7 @@ class ActionCable::Connection::SubscriptionsTest < ActionCable::TestCase channel1 = subscribe_to_chat_channel - channel2_id = ActiveSupport::JSON.encode(id: 2, channel: 'ActionCable::Connection::SubscriptionsTest::ChatChannel') + channel2_id = ActiveSupport::JSON.encode(id: 2, channel: "ActionCable::Connection::SubscriptionsTest::ChatChannel") channel2 = subscribe_to_chat_channel(channel2_id) channel1.expects(:unsubscribe_from_channel) @@ -99,14 +99,14 @@ class ActionCable::Connection::SubscriptionsTest < ActionCable::TestCase private def subscribe_to_chat_channel(identifier = @chat_identifier) - @subscriptions.execute_command 'command' => 'subscribe', 'identifier' => identifier + @subscriptions.execute_command "command" => "subscribe", "identifier" => identifier assert_equal identifier, @subscriptions.identifiers.last - @subscriptions.send :find, 'identifier' => identifier + @subscriptions.send :find, "identifier" => identifier end def setup_connection - env = Rack::MockRequest.env_for "/test", 'HTTP_HOST' => 'localhost', 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket' + env = Rack::MockRequest.env_for "/test", "HTTP_HOST" => "localhost", "HTTP_CONNECTION" => "upgrade", "HTTP_UPGRADE" => "websocket" @connection = Connection.new(@server, env) @subscriptions = ActionCable::Connection::Subscriptions.new(@connection) diff --git a/actioncable/test/server/base_test.rb b/actioncable/test/server/base_test.rb new file mode 100644 index 0000000000..f0a51c5a7d --- /dev/null +++ b/actioncable/test/server/base_test.rb @@ -0,0 +1,33 @@ +require "test_helper" +require "stubs/test_server" +require "active_support/core_ext/hash/indifferent_access" + +class BaseTest < ActiveSupport::TestCase + def setup + @server = ActionCable::Server::Base.new + @server.config.cable = { adapter: "async" }.with_indifferent_access + end + + class FakeConnection + def close + end + end + + test "#restart closes all open connections" do + conn = FakeConnection.new + @server.add_connection(conn) + + conn.expects(:close) + @server.restart + end + + test "#restart shuts down worker pool" do + @server.worker_pool.expects(:halt) + @server.restart + end + + test "#restart shuts down pub/sub adapter" do + @server.pubsub.expects(:shutdown) + @server.restart + end +end diff --git a/actioncable/test/stubs/room.rb b/actioncable/test/stubs/room.rb index cd66a0b687..1664b07d12 100644 --- a/actioncable/test/stubs/room.rb +++ b/actioncable/test/stubs/room.rb @@ -1,7 +1,7 @@ class Room attr_reader :id, :name - def initialize(id, name='Campfire') + def initialize(id, name = "Campfire") @id = id @name = name end diff --git a/actioncable/test/stubs/test_connection.rb b/actioncable/test/stubs/test_connection.rb index 885450dda6..cd2e219d88 100644 --- a/actioncable/test/stubs/test_connection.rb +++ b/actioncable/test/stubs/test_connection.rb @@ -1,4 +1,4 @@ -require 'stubs/user' +require "stubs/user" class TestConnection attr_reader :identifiers, :logger, :current_user, :server, :transmissions diff --git a/actioncable/test/stubs/test_server.rb b/actioncable/test/stubs/test_server.rb index b86f422a13..5bf2a151dc 100644 --- a/actioncable/test/stubs/test_server.rb +++ b/actioncable/test/stubs/test_server.rb @@ -1,4 +1,4 @@ -require 'ostruct' +require "ostruct" class TestServer include ActionCable::Server::Connections @@ -10,14 +10,8 @@ class TestServer @logger = ActiveSupport::TaggedLogging.new ActiveSupport::Logger.new(StringIO.new) @config = OpenStruct.new(log_tags: [], subscription_adapter: subscription_adapter) - @config.use_faye = ENV['FAYE'].present? - @config.client_socket_class = if @config.use_faye - ActionCable::Connection::FayeClientSocket - else - ActionCable::Connection::ClientSocket - end - - @mutex = Monitor.new + + @mutex = Monitor.new end def pubsub @@ -25,11 +19,9 @@ class TestServer end def event_loop - @event_loop ||= if @config.use_faye - ActionCable::Connection::FayeEventLoop.new - else - ActionCable::Connection::StreamEventLoop.new - end + @event_loop ||= ActionCable::Connection::StreamEventLoop.new.tap do |loop| + loop.instance_variable_set(:@executor, Concurrent.global_io_executor) + end end def worker_pool diff --git a/actioncable/test/subscription_adapter/async_test.rb b/actioncable/test/subscription_adapter/async_test.rb index 8f413f14c2..7bc2e55d40 100644 --- a/actioncable/test/subscription_adapter/async_test.rb +++ b/actioncable/test/subscription_adapter/async_test.rb @@ -1,5 +1,5 @@ -require 'test_helper' -require_relative './common' +require "test_helper" +require_relative "./common" class AsyncAdapterTest < ActionCable::TestCase include CommonSubscriptionAdapterTest @@ -12,6 +12,6 @@ class AsyncAdapterTest < ActionCable::TestCase end def cable_config - { adapter: 'async' } + { adapter: "async" } end end diff --git a/actioncable/test/subscription_adapter/base_test.rb b/actioncable/test/subscription_adapter/base_test.rb index 256dce673f..212ea49d2f 100644 --- a/actioncable/test/subscription_adapter/base_test.rb +++ b/actioncable/test/subscription_adapter/base_test.rb @@ -1,5 +1,5 @@ -require 'test_helper' -require 'stubs/test_server' +require "test_helper" +require "stubs/test_server" class ActionCable::SubscriptionAdapter::BaseTest < ActionCable::TestCase ## TEST THAT ERRORS ARE RETURNED FOR INHERITORS THAT DON'T OVERRIDE METHODS @@ -15,31 +15,31 @@ class ActionCable::SubscriptionAdapter::BaseTest < ActionCable::TestCase test "#broadcast returns NotImplementedError by default" do assert_raises NotImplementedError do - BrokenAdapter.new(@server).broadcast('channel', 'payload') + BrokenAdapter.new(@server).broadcast("channel", "payload") end end test "#subscribe returns NotImplementedError by default" do - callback = lambda { puts 'callback' } - success_callback = lambda { puts 'success' } + callback = lambda { puts "callback" } + success_callback = lambda { puts "success" } assert_raises NotImplementedError do - BrokenAdapter.new(@server).subscribe('channel', callback, success_callback) + BrokenAdapter.new(@server).subscribe("channel", callback, success_callback) end end test "#unsubscribe returns NotImplementedError by default" do - callback = lambda { puts 'callback' } + callback = lambda { puts "callback" } assert_raises NotImplementedError do - BrokenAdapter.new(@server).unsubscribe('channel', callback) + BrokenAdapter.new(@server).unsubscribe("channel", callback) end end # TEST METHODS THAT ARE REQUIRED OF THE ADAPTER'S BACKEND STORAGE OBJECT test "#broadcast is implemented" do - broadcast = SuccessAdapter.new(@server).broadcast('channel', 'payload') + broadcast = SuccessAdapter.new(@server).broadcast("channel", "payload") assert_respond_to(SuccessAdapter.new(@server), :broadcast) @@ -49,9 +49,9 @@ class ActionCable::SubscriptionAdapter::BaseTest < ActionCable::TestCase end test "#subscribe is implemented" do - callback = lambda { puts 'callback' } - success_callback = lambda { puts 'success' } - subscribe = SuccessAdapter.new(@server).subscribe('channel', callback, success_callback) + callback = lambda { puts "callback" } + success_callback = lambda { puts "success" } + subscribe = SuccessAdapter.new(@server).subscribe("channel", callback, success_callback) assert_respond_to(SuccessAdapter.new(@server), :subscribe) @@ -61,8 +61,8 @@ class ActionCable::SubscriptionAdapter::BaseTest < ActionCable::TestCase end test "#unsubscribe is implemented" do - callback = lambda { puts 'callback' } - unsubscribe = SuccessAdapter.new(@server).unsubscribe('channel', callback) + callback = lambda { puts "callback" } + unsubscribe = SuccessAdapter.new(@server).unsubscribe("channel", callback) assert_respond_to(SuccessAdapter.new(@server), :unsubscribe) diff --git a/actioncable/test/subscription_adapter/common.rb b/actioncable/test/subscription_adapter/common.rb index 285c690df0..3aa88c2caa 100644 --- a/actioncable/test/subscription_adapter/common.rb +++ b/actioncable/test/subscription_adapter/common.rb @@ -1,8 +1,8 @@ -require 'test_helper' -require 'concurrent' +require "test_helper" +require "concurrent" -require 'active_support/core_ext/hash/indifferent_access' -require 'pathname' +require "active_support/core_ext/hash/indifferent_access" +require "pathname" module CommonSubscriptionAdapterTest WAIT_WHEN_EXPECTING_EVENT = 3 @@ -11,7 +11,7 @@ module CommonSubscriptionAdapterTest def setup server = ActionCable::Server::Base.new server.config.cable = cable_config.with_indifferent_access - server.config.use_faye = ENV['FAYE'].present? + server.config.logger = Logger.new(StringIO.new).tap { |l| l.level = Logger::UNKNOWN } adapter_klass = server.config.pubsub_adapter @@ -20,10 +20,9 @@ module CommonSubscriptionAdapterTest end def teardown - [@rx_adapter, @tx_adapter].uniq.each(&:shutdown) + [@rx_adapter, @tx_adapter].uniq.compact.each(&:shutdown) end - def subscribe_as_queue(channel, adapter = @rx_adapter) queue = Queue.new @@ -41,77 +40,76 @@ module CommonSubscriptionAdapterTest adapter.unsubscribe(channel, callback) if subscribed.set? end - def test_subscribe_and_unsubscribe - subscribe_as_queue('channel') do |queue| + subscribe_as_queue("channel") do |queue| end end def test_basic_broadcast - subscribe_as_queue('channel') do |queue| - @tx_adapter.broadcast('channel', 'hello world') + subscribe_as_queue("channel") do |queue| + @tx_adapter.broadcast("channel", "hello world") - assert_equal 'hello world', queue.pop + assert_equal "hello world", queue.pop end end def test_broadcast_after_unsubscribe keep_queue = nil - subscribe_as_queue('channel') do |queue| + subscribe_as_queue("channel") do |queue| keep_queue = queue - @tx_adapter.broadcast('channel', 'hello world') + @tx_adapter.broadcast("channel", "hello world") - assert_equal 'hello world', queue.pop + assert_equal "hello world", queue.pop end - @tx_adapter.broadcast('channel', 'hello void') + @tx_adapter.broadcast("channel", "hello void") sleep WAIT_WHEN_NOT_EXPECTING_EVENT assert_empty keep_queue end def test_multiple_broadcast - subscribe_as_queue('channel') do |queue| - @tx_adapter.broadcast('channel', 'bananas') - @tx_adapter.broadcast('channel', 'apples') + subscribe_as_queue("channel") do |queue| + @tx_adapter.broadcast("channel", "bananas") + @tx_adapter.broadcast("channel", "apples") received = [] 2.times { received << queue.pop } - assert_equal ['apples', 'bananas'], received.sort + assert_equal ["apples", "bananas"], received.sort end end def test_identical_subscriptions - subscribe_as_queue('channel') do |queue| - subscribe_as_queue('channel') do |queue_2| - @tx_adapter.broadcast('channel', 'hello') + subscribe_as_queue("channel") do |queue| + subscribe_as_queue("channel") do |queue_2| + @tx_adapter.broadcast("channel", "hello") - assert_equal 'hello', queue_2.pop + assert_equal "hello", queue_2.pop end - assert_equal 'hello', queue.pop + assert_equal "hello", queue.pop end end def test_simultaneous_subscriptions - subscribe_as_queue('channel') do |queue| - subscribe_as_queue('other channel') do |queue_2| - @tx_adapter.broadcast('channel', 'apples') - @tx_adapter.broadcast('other channel', 'oranges') + subscribe_as_queue("channel") do |queue| + subscribe_as_queue("other channel") do |queue_2| + @tx_adapter.broadcast("channel", "apples") + @tx_adapter.broadcast("other channel", "oranges") - assert_equal 'apples', queue.pop - assert_equal 'oranges', queue_2.pop + assert_equal "apples", queue.pop + assert_equal "oranges", queue_2.pop end end end def test_channel_filtered_broadcast - subscribe_as_queue('channel') do |queue| - @tx_adapter.broadcast('other channel', 'one') - @tx_adapter.broadcast('channel', 'two') + subscribe_as_queue("channel") do |queue| + @tx_adapter.broadcast("other channel", "one") + @tx_adapter.broadcast("channel", "two") - assert_equal 'two', queue.pop + assert_equal "two", queue.pop end end end diff --git a/actioncable/test/subscription_adapter/evented_redis_test.rb b/actioncable/test/subscription_adapter/evented_redis_test.rb index 6d20e6ed78..f316bc46ef 100644 --- a/actioncable/test/subscription_adapter/evented_redis_test.rb +++ b/actioncable/test/subscription_adapter/evented_redis_test.rb @@ -1,5 +1,5 @@ -require 'test_helper' -require_relative './common' +require "test_helper" +require_relative "./common" class EventedRedisAdapterTest < ActionCable::TestCase include CommonSubscriptionAdapterTest @@ -12,10 +12,18 @@ class EventedRedisAdapterTest < ActionCable::TestCase end def teardown + super + + # Ensure EM is shut down before we re-enable warnings + EventMachine.reactor_thread.tap do |thread| + EventMachine.stop + thread.join + end + $VERBOSE = @previous_verbose end def cable_config - { adapter: 'evented_redis', url: 'redis://127.0.0.1:6379/12' } + { adapter: "evented_redis", url: "redis://127.0.0.1:6379/12" } end end diff --git a/actioncable/test/subscription_adapter/inline_test.rb b/actioncable/test/subscription_adapter/inline_test.rb index 75ea51e6b3..52bfa00aba 100644 --- a/actioncable/test/subscription_adapter/inline_test.rb +++ b/actioncable/test/subscription_adapter/inline_test.rb @@ -1,5 +1,5 @@ -require 'test_helper' -require_relative './common' +require "test_helper" +require_relative "./common" class InlineAdapterTest < ActionCable::TestCase include CommonSubscriptionAdapterTest @@ -12,6 +12,6 @@ class InlineAdapterTest < ActionCable::TestCase end def cable_config - { adapter: 'inline' } + { adapter: "inline" } end end diff --git a/actioncable/test/subscription_adapter/postgresql_test.rb b/actioncable/test/subscription_adapter/postgresql_test.rb index 214352a0b2..beb6efab28 100644 --- a/actioncable/test/subscription_adapter/postgresql_test.rb +++ b/actioncable/test/subscription_adapter/postgresql_test.rb @@ -1,18 +1,18 @@ -require 'test_helper' -require_relative './common' +require "test_helper" +require_relative "./common" -require 'active_record' +require "active_record" class PostgresqlAdapterTest < ActionCable::TestCase include CommonSubscriptionAdapterTest def setup - database_config = { 'adapter' => 'postgresql', 'database' => 'activerecord_unittest' } - ar_tests = File.expand_path('../../../activerecord/test', __dir__) + database_config = { "adapter" => "postgresql", "database" => "activerecord_unittest" } + ar_tests = File.expand_path("../../../activerecord/test", __dir__) if Dir.exist?(ar_tests) - require File.join(ar_tests, 'config') - require File.join(ar_tests, 'support/config') - local_config = ARTest.config['arunit'] + require File.join(ar_tests, "config") + require File.join(ar_tests, "support/config") + local_config = ARTest.config["arunit"] database_config.update local_config if local_config end @@ -35,6 +35,6 @@ class PostgresqlAdapterTest < ActionCable::TestCase end def cable_config - { adapter: 'postgresql' } + { adapter: "postgresql" } end end diff --git a/actioncable/test/subscription_adapter/redis_test.rb b/actioncable/test/subscription_adapter/redis_test.rb index 4f34dd86c9..2ba5636656 100644 --- a/actioncable/test/subscription_adapter/redis_test.rb +++ b/actioncable/test/subscription_adapter/redis_test.rb @@ -1,16 +1,16 @@ -require 'test_helper' -require_relative './common' +require "test_helper" +require_relative "./common" class RedisAdapterTest < ActionCable::TestCase include CommonSubscriptionAdapterTest def cable_config - { adapter: 'redis', driver: 'ruby', url: 'redis://127.0.0.1:6379/12' } + { adapter: "redis", driver: "ruby", url: "redis://127.0.0.1:6379/12" } end end class RedisAdapterTest::Hiredis < RedisAdapterTest def cable_config - super.merge(driver: 'hiredis') + super.merge(driver: "hiredis") end end diff --git a/actioncable/test/subscription_adapter/subscriber_map_test.rb b/actioncable/test/subscription_adapter/subscriber_map_test.rb index 5965ac2b90..76b984c849 100644 --- a/actioncable/test/subscription_adapter/subscriber_map_test.rb +++ b/actioncable/test/subscription_adapter/subscriber_map_test.rb @@ -1,11 +1,11 @@ -require 'test_helper' +require "test_helper" class SubscriberMapTest < ActionCable::TestCase test "broadcast should not change subscribers" do setup_subscription_map origin = @subscription_map.instance_variable_get(:@subscribers).dup - @subscription_map.broadcast('not_exist_channel', '') + @subscription_map.broadcast("not_exist_channel", "") assert_equal origin, @subscription_map.instance_variable_get(:@subscribers) end diff --git a/actioncable/test/test_helper.rb b/actioncable/test/test_helper.rb index 0a9ee7ce77..a47032753b 100644 --- a/actioncable/test/test_helper.rb +++ b/actioncable/test/test_helper.rb @@ -1,53 +1,19 @@ -require 'action_cable' -require 'active_support/testing/autorun' +require "action_cable" +require "active_support/testing/autorun" -require 'puma' -require 'mocha/setup' -require 'rack/mock' +require "puma" +require "mocha/setup" +require "rack/mock" begin - require 'byebug' + require "byebug" rescue LoadError end # Require all the stubs and models -Dir[File.dirname(__FILE__) + '/stubs/*.rb'].each {|file| require file } +Dir[File.dirname(__FILE__) + "/stubs/*.rb"].each { |file| require file } -if ENV['FAYE'].present? - require 'faye/websocket' - class << Faye::WebSocket - remove_method :ensure_reactor_running - - # We don't want Faye to start the EM reactor in tests because it makes testing much harder. - # We want to be able to start and stop EM loop in tests to make things simpler. - def ensure_reactor_running - # no-op - end - end -end - -module EventMachineConcurrencyHelpers - def wait_for_async - EM.run_deferred_callbacks - end - - def run_in_eventmachine - failure = nil - EM.run do - begin - yield - rescue => ex - failure = ex - ensure - wait_for_async - EM.stop if EM.reactor_running? - end - end - raise failure if failure - end -end - -module ConcurrentRubyConcurrencyHelpers +class ActionCable::TestCase < ActiveSupport::TestCase def wait_for_async wait_for_executor Concurrent.global_io_executor end @@ -56,18 +22,14 @@ module ConcurrentRubyConcurrencyHelpers yield wait_for_async end -end - -class ActionCable::TestCase < ActiveSupport::TestCase - if ENV['FAYE'].present? - include EventMachineConcurrencyHelpers - else - include ConcurrentRubyConcurrencyHelpers - end def wait_for_executor(executor) + # do not wait forever, wait 2s + timeout = 2 until executor.completed_task_count == executor.scheduled_task_count sleep 0.1 + timeout -= 0.1 + raise "Executor could not complete all tasks in 2 seconds" unless timeout > 0 end end end diff --git a/actioncable/test/worker_test.rb b/actioncable/test/worker_test.rb index e2c81fe312..3385593f74 100644 --- a/actioncable/test/worker_test.rb +++ b/actioncable/test/worker_test.rb @@ -1,4 +1,4 @@ -require 'test_helper' +require "test_helper" class WorkerTest < ActiveSupport::TestCase class Receiver @@ -9,7 +9,7 @@ class WorkerTest < ActiveSupport::TestCase end def process(message) - @last_action = [ :process, message ] + @last_action = [ :process, message ] end def connection |