diff options
Diffstat (limited to 'actioncable')
17 files changed, 244 insertions, 77 deletions
diff --git a/actioncable/Rakefile b/actioncable/Rakefile index 1d77fc7067..5ba7b7f7f6 100644 --- a/actioncable/Rakefile +++ b/actioncable/Rakefile @@ -19,6 +19,14 @@ Rake::TestTask.new do |t| t.ruby_opts = ["--dev"] if defined?(JRUBY_VERSION) end +namespace :test do + task :isolated do + Dir.glob("test/**/*_test.rb").all? do |file| + sh(Gem.ruby, '-w', '-Ilib:test', file) + end or raise "Failures" + end +end + namespace :assets do root_path = Pathname.new(dir) destination_path = root_path.join("lib/assets/compiled") diff --git a/actioncable/app/assets/javascripts/action_cable.coffee.erb b/actioncable/app/assets/javascripts/action_cable.coffee.erb index 6a8b4eeb85..f0422d9d9c 100644 --- a/actioncable/app/assets/javascripts/action_cable.coffee.erb +++ b/actioncable/app/assets/javascripts/action_cable.coffee.erb @@ -4,12 +4,13 @@ @ActionCable = INTERNAL: <%= ActionCable::INTERNAL.to_json %> - createConsumer: (url = @getConfig("url")) -> + createConsumer: (url) -> + url ?= @getConfig("url") ? @INTERNAL.default_mount_path new ActionCable.Consumer @createWebSocketURL(url) getConfig: (name) -> element = document.head.querySelector("meta[name='action-cable-#{name}']") - element?.getAttribute("content") ? '/cable' + element?.getAttribute("content") createWebSocketURL: (url) -> if url and not /^wss?:/i.test(url) diff --git a/actioncable/app/assets/javascripts/action_cable/connection.coffee b/actioncable/app/assets/javascripts/action_cable/connection.coffee index 25793ea3d3..3a139acf3a 100644 --- a/actioncable/app/assets/javascripts/action_cable/connection.coffee +++ b/actioncable/app/assets/javascripts/action_cable/connection.coffee @@ -1,3 +1,5 @@ +#= require ./connection_monitor + # Encapsulate the cable connection held by the consumer. This is an internal class not intended for direct user manipulation. {message_types} = ActionCable.INTERNAL @@ -6,11 +8,10 @@ class ActionCable.Connection @reopenDelay: 500 constructor: (@consumer) -> + {@subscriptions} = @consumer + @monitor = new ActionCable.ConnectionMonitor this send: (data) -> - unless @isOpen() - @open() - if @isOpen() @webSocket.send(JSON.stringify(data)) true @@ -18,14 +19,15 @@ class ActionCable.Connection false open: => - if @isAlive() - ActionCable.log("Attemped to open WebSocket, but existing socket is #{@getState()}") + if @isActive() + ActionCable.log("Attempted to open WebSocket, but existing socket is #{@getState()}") throw new Error("Existing connection must be closed before opening") else ActionCable.log("Opening WebSocket, current state is #{@getState()}") @uninstallEventHandlers() if @webSocket? @webSocket = new WebSocket(@consumer.url) @installEventHandlers() + @monitor.start() true close: -> @@ -33,7 +35,7 @@ class ActionCable.Connection reopen: -> ActionCable.log("Reopening WebSocket, current state is #{@getState()}") - if @isAlive() + if @isActive() try @close() catch error @@ -47,10 +49,10 @@ class ActionCable.Connection isOpen: -> @isState("open") - # Private + isActive: -> + @isState("open", "connecting") - isAlive: -> - @webSocket? and not @isState("closing", "closed") + # Private isState: (states...) -> @getState() in states @@ -75,20 +77,20 @@ class ActionCable.Connection {identifier, message, type} = JSON.parse(event.data) switch type when message_types.welcome - @consumer.connectionMonitor.connected() + @monitor.recordConnect() when message_types.ping - @consumer.connectionMonitor.ping() + @monitor.recordPing() when message_types.confirmation - @consumer.subscriptions.notify(identifier, "connected") + @subscriptions.notify(identifier, "connected") when message_types.rejection - @consumer.subscriptions.reject(identifier) + @subscriptions.reject(identifier) else - @consumer.subscriptions.notify(identifier, "received", message) + @subscriptions.notify(identifier, "received", message) open: -> ActionCable.log("WebSocket onopen event") @disconnected = false - @consumer.subscriptions.reload() + @subscriptions.reload() close: -> ActionCable.log("WebSocket onclose event") @@ -101,5 +103,5 @@ class ActionCable.Connection disconnect: -> return if @disconnected @disconnected = true - @consumer.connectionMonitor.disconnected() - @consumer.subscriptions.notifyAll("disconnected") + @subscriptions.notifyAll("disconnected") + @monitor.recordDisconnect() diff --git a/actioncable/app/assets/javascripts/action_cable/connection_monitor.coffee b/actioncable/app/assets/javascripts/action_cable/connection_monitor.coffee index 904a426644..0cc675fa94 100644 --- a/actioncable/app/assets/javascripts/action_cable/connection_monitor.coffee +++ b/actioncable/app/assets/javascripts/action_cable/connection_monitor.coffee @@ -7,60 +7,69 @@ class ActionCable.ConnectionMonitor @staleThreshold: 6 # Server::Connections::BEAT_INTERVAL * 2 (missed two pings) - constructor: (@consumer) -> - @start() + constructor: (@connection) -> + @reconnectAttempts = 0 - connected: -> - @reset() - @pingedAt = now() - delete @disconnectedAt - ActionCable.log("ConnectionMonitor connected") + start: -> + unless @isRunning() + @startedAt = now() + delete @stoppedAt + @startPolling() + document.addEventListener("visibilitychange", @visibilityDidChange) + ActionCable.log("ConnectionMonitor started. pollInterval = #{@getPollInterval()} ms") - disconnected: -> - @disconnectedAt = now() - ActionCable.log("ConnectionMonitor disconnected") + stop: -> + if @isRunning() + @stoppedAt = now() + @stopPolling() + document.removeEventListener("visibilitychange", @visibilityDidChange) + ActionCable.log("ConnectionMonitor stopped") + + isRunning: -> + @startedAt? and not @stoppedAt? - ping: -> + recordPing: -> @pingedAt = now() - reset: -> + recordConnect: -> @reconnectAttempts = 0 - @consumer.connection.isOpen() + @recordPing() + delete @disconnectedAt + ActionCable.log("ConnectionMonitor recorded connect") - start: -> - @reset() - delete @stoppedAt - @startedAt = now() + recordDisconnect: -> + @disconnectedAt = now() + ActionCable.log("ConnectionMonitor recorded disconnect") + + # Private + + startPolling: -> + @stopPolling() @poll() - document.addEventListener("visibilitychange", @visibilityDidChange) - ActionCable.log("ConnectionMonitor started, pollInterval is #{@getInterval()}ms") - stop: -> - @stoppedAt = now() - document.removeEventListener("visibilitychange", @visibilityDidChange) - ActionCable.log("ConnectionMonitor stopped") + stopPolling: -> + clearTimeout(@pollTimeout) poll: -> - setTimeout => - unless @stoppedAt - @reconnectIfStale() - @poll() - , @getInterval() + @pollTimeout = setTimeout => + @reconnectIfStale() + @poll() + , @getPollInterval() - getInterval: -> + getPollInterval: -> {min, max} = @constructor.pollInterval interval = 5 * Math.log(@reconnectAttempts + 1) - clamp(interval, min, max) * 1000 + Math.round(clamp(interval, min, max) * 1000) reconnectIfStale: -> if @connectionIsStale() - ActionCable.log("ConnectionMonitor detected stale connection, reconnectAttempts = #{@reconnectAttempts}") + ActionCable.log("ConnectionMonitor detected stale connection. reconnectAttempts = #{@reconnectAttempts}, pollInterval = #{@getPollInterval()} ms, time disconnected = #{secondsSince(@disconnectedAt)} s, stale threshold = #{@constructor.staleThreshold} s") @reconnectAttempts++ if @disconnectedRecently() - ActionCable.log("ConnectionMonitor skipping reopen because recently disconnected at #{@disconnectedAt}") + ActionCable.log("ConnectionMonitor skipping reopening recent disconnect") else ActionCable.log("ConnectionMonitor reopening") - @consumer.connection.reopen() + @connection.reopen() connectionIsStale: -> secondsSince(@pingedAt ? @startedAt) > @constructor.staleThreshold @@ -71,9 +80,9 @@ class ActionCable.ConnectionMonitor visibilityDidChange: => if document.visibilityState is "visible" setTimeout => - if @connectionIsStale() or not @consumer.connection.isOpen() - ActionCable.log("ConnectionMonitor reopening stale connection after visibilitychange to #{document.visibilityState}") - @consumer.connection.reopen() + if @connectionIsStale() or not @connection.isOpen() + ActionCable.log("ConnectionMonitor reopening stale connection on visibilitychange. visbilityState = #{document.visibilityState}") + @connection.reopen() , 200 now = -> diff --git a/actioncable/app/assets/javascripts/action_cable/consumer.coffee b/actioncable/app/assets/javascripts/action_cable/consumer.coffee index 717c0641a9..7aae1ed8ed 100644 --- a/actioncable/app/assets/javascripts/action_cable/consumer.coffee +++ b/actioncable/app/assets/javascripts/action_cable/consumer.coffee @@ -1,5 +1,4 @@ #= require ./connection -#= require ./connection_monitor #= require ./subscriptions #= require ./subscription @@ -19,7 +18,10 @@ class ActionCable.Consumer constructor: (@url) -> @subscriptions = new ActionCable.Subscriptions this @connection = new ActionCable.Connection this - @connectionMonitor = new ActionCable.ConnectionMonitor this send: (data) -> @connection.send(data) + + ensureActiveConnection: -> + unless @connection.isActive() + @connection.open() diff --git a/actioncable/app/assets/javascripts/action_cable/subscription.coffee b/actioncable/app/assets/javascripts/action_cable/subscription.coffee index 339d676933..61a3fb1309 100644 --- a/actioncable/app/assets/javascripts/action_cable/subscription.coffee +++ b/actioncable/app/assets/javascripts/action_cable/subscription.coffee @@ -1,5 +1,5 @@ -# A new subscription is created through the ActionCable.Subscriptions instance available on the consumer. -# It provides a number of callbacks and a method for calling remote procedure calls on the corresponding +# A new subscription is created through the ActionCable.Subscriptions instance available on the consumer. +# It provides a number of callbacks and a method for calling remote procedure calls on the corresponding # Channel instance on the server side. # # An example demonstrates the basic functionality: @@ -7,13 +7,13 @@ # App.appearance = App.cable.subscriptions.create "AppearanceChannel", # connected: -> # # Called once the subscription has been successfully completed -# +# # appear: -> # @perform 'appear', appearing_on: @appearingOn() -# +# # away: -> # @perform 'away' -# +# # appearingOn: -> # $('main').data 'appearing-on' # @@ -27,15 +27,15 @@ # def subscribed # current_user.appear # end -# +# # def unsubscribed # current_user.disappear # end -# +# # def appear(data) # current_user.appear on: data['appearing_on'] # end -# +# # def away # current_user.away # end @@ -44,11 +44,9 @@ # The "AppearanceChannel" name is automatically mapped between the client-side subscription creation and the server-side Ruby class name. # The AppearanceChannel#appear/away public methods are exposed automatically to client-side invocation through the @perform method. class ActionCable.Subscription - constructor: (@subscriptions, params = {}, mixin) -> + constructor: (@consumer, params = {}, mixin) -> @identifier = JSON.stringify(params) extend(this, mixin) - @subscriptions.add(this) - @consumer = @subscriptions.consumer # Perform a channel action with the optional data passed as an attribute perform: (action, data = {}) -> @@ -59,7 +57,7 @@ class ActionCable.Subscription @consumer.send(command: "message", identifier: @identifier, data: JSON.stringify(data)) unsubscribe: -> - @subscriptions.remove(this) + @consumer.subscriptions.remove(this) extend = (object, properties) -> if properties? diff --git a/actioncable/app/assets/javascripts/action_cable/subscriptions.coffee b/actioncable/app/assets/javascripts/action_cable/subscriptions.coffee index 2443bca14a..aa052bf5d8 100644 --- a/actioncable/app/assets/javascripts/action_cable/subscriptions.coffee +++ b/actioncable/app/assets/javascripts/action_cable/subscriptions.coffee @@ -13,28 +13,33 @@ class ActionCable.Subscriptions create: (channelName, mixin) -> channel = channelName params = if typeof channel is "object" then channel else {channel} - new ActionCable.Subscription this, params, mixin + subscription = new ActionCable.Subscription @consumer, params, mixin + @add(subscription) # Private add: (subscription) -> @subscriptions.push(subscription) + @consumer.ensureActiveConnection() @notify(subscription, "initialized") @sendCommand(subscription, "subscribe") + subscription remove: (subscription) -> @forget(subscription) - unless @findAll(subscription.identifier).length @sendCommand(subscription, "unsubscribe") + subscription reject: (identifier) -> for subscription in @findAll(identifier) @forget(subscription) @notify(subscription, "rejected") + subscription forget: (subscription) -> @subscriptions = (s for s in @subscriptions when s isnt subscription) + subscription findAll: (identifier) -> s for s in @subscriptions when s.identifier is identifier @@ -59,4 +64,3 @@ class ActionCable.Subscriptions sendCommand: (subscription, command) -> {identifier} = subscription @consumer.send({command, identifier}) - diff --git a/actioncable/lib/action_cable.rb b/actioncable/lib/action_cable.rb index a8e4d1cb25..68a5fff3e7 100644 --- a/actioncable/lib/action_cable.rb +++ b/actioncable/lib/action_cable.rb @@ -34,7 +34,8 @@ module ActionCable ping: 'ping'.freeze, confirmation: 'confirm_subscription'.freeze, rejection: 'reject_subscription'.freeze - } + }, + default_mount_path: '/cable'.freeze } # Singleton instance of the server diff --git a/actioncable/lib/action_cable/connection/client_socket.rb b/actioncable/lib/action_cable/connection/client_socket.rb index 9e4dbcd6e6..7d6de78582 100644 --- a/actioncable/lib/action_cable/connection/client_socket.rb +++ b/actioncable/lib/action_cable/connection/client_socket.rb @@ -71,6 +71,8 @@ module ActionCable def write(data) @stream.write(data) + rescue => e + emit_error e.message end def transmit(message) diff --git a/actioncable/lib/action_cable/connection/faye_client_socket.rb b/actioncable/lib/action_cable/connection/faye_client_socket.rb index c9139b6858..47d09a9e14 100644 --- a/actioncable/lib/action_cable/connection/faye_client_socket.rb +++ b/actioncable/lib/action_cable/connection/faye_client_socket.rb @@ -36,6 +36,7 @@ module ActionCable @faye.on(:open) { |event| @event_target.on_open } @faye.on(:message) { |event| @event_target.on_message(event.data) } @faye.on(:close) { |event| @event_target.on_close(event.reason, event.code) } + @faye.on(:error) { |event| @event_target.on_error(event.message) } end end end diff --git a/actioncable/lib/action_cable/connection/stream.rb b/actioncable/lib/action_cable/connection/stream.rb index 2d97b28c09..0cf59091bc 100644 --- a/actioncable/lib/action_cable/connection/stream.rb +++ b/actioncable/lib/action_cable/connection/stream.rb @@ -29,7 +29,7 @@ module ActionCable def write(data) return @rack_hijack_io.write(data) if @rack_hijack_io return @stream_send.call(data) if @stream_send - rescue EOFError + rescue EOFError, Errno::ECONNRESET @socket_object.client_gone end diff --git a/actioncable/lib/action_cable/engine.rb b/actioncable/lib/action_cable/engine.rb index c90aadaf2c..7dc541d00c 100644 --- a/actioncable/lib/action_cable/engine.rb +++ b/actioncable/lib/action_cable/engine.rb @@ -6,7 +6,7 @@ require "active_support/core_ext/hash/indifferent_access" module ActionCable class Railtie < Rails::Engine # :nodoc: config.action_cable = ActiveSupport::OrderedOptions.new - config.action_cable.mount_path = '/cable' + config.action_cable.mount_path = ActionCable::INTERNAL[:default_mount_path] config.eager_load_namespaces << ActionCable diff --git a/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb index e6c862959b..256876cf30 100644 --- a/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb @@ -51,7 +51,7 @@ module ActionCable @redis_connection_for_subscriptions || @server.mutex.synchronize do @redis_connection_for_subscriptions ||= self.class.em_redis_connector.call(@server.config.cable).tap do |redis| redis.on(:reconnect_failed) do - @logger.info "[ActionCable] Redis reconnect failed." + @logger.error "[ActionCable] Redis reconnect failed." end end end diff --git a/actioncable/lib/rails/generators/channel/channel_generator.rb b/actioncable/lib/rails/generators/channel/channel_generator.rb index 6debe40c91..d89ab45816 100644 --- a/actioncable/lib/rails/generators/channel/channel_generator.rb +++ b/actioncable/lib/rails/generators/channel/channel_generator.rb @@ -21,7 +21,7 @@ module Rails protected def file_name - @_file_name ||= super.gsub(/\_channel/i, '') + @_file_name ||= super.gsub(/_channel/i, '') end # FIXME: Change these files to symlinks once RubyGems 2.5.0 is required. diff --git a/actioncable/test/connection/client_socket_test.rb b/actioncable/test/connection/client_socket_test.rb new file mode 100644 index 0000000000..dd730e348f --- /dev/null +++ b/actioncable/test/connection/client_socket_test.rb @@ -0,0 +1,65 @@ +require 'test_helper' +require 'stubs/test_server' + +class ActionCable::Connection::StreamTest < ActionCable::TestCase + class Connection < ActionCable::Connection::Base + attr_reader :websocket, :subscriptions, :message_buffer, :connected + attr_reader :errors + + def initialize(*) + super + @errors = [] + end + + def connect + @connected = true + end + + def disconnect + @connected = false + end + + def send_async(method, *args) + send method, *args + end + + def on_error(message) + @errors << message + end + end + + setup do + @server = TestServer.new + @server.config.allowed_request_origins = %w( http://rubyonrails.com ) + end + + test 'delegate socket errors to on_error handler' do + skip if ENV['FAYE'].present? + + run_in_eventmachine do + connection = open_connection + + # Internal hax = :( + client = connection.websocket.send(:websocket) + client.instance_variable_get('@stream').expects(:write).raises('foo') + client.expects(:client_gone).never + + client.write('boo') + assert_equal %w[ foo ], connection.errors + end + end + + private + def open_connection + env = Rack::MockRequest.env_for '/test', + 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket', + 'HTTP_HOST' => 'localhost', 'HTTP_ORIGIN' => 'http://rubyonrails.com' + env['rack.hijack'] = -> { env['rack.hijack_io'] = StringIO.new } + + Connection.new(@server, env).tap do |connection| + connection.process + connection.send :handle_open + assert connection.connected + end + end +end diff --git a/actioncable/test/connection/stream_test.rb b/actioncable/test/connection/stream_test.rb new file mode 100644 index 0000000000..d5aad63648 --- /dev/null +++ b/actioncable/test/connection/stream_test.rb @@ -0,0 +1,67 @@ +require 'test_helper' +require 'stubs/test_server' + +class ActionCable::Connection::StreamTest < ActionCable::TestCase + class Connection < ActionCable::Connection::Base + attr_reader :websocket, :subscriptions, :message_buffer, :connected + attr_reader :errors + + def initialize(*) + super + @errors = [] + end + + def connect + @connected = true + end + + def disconnect + @connected = false + end + + def send_async(method, *args) + send method, *args + end + + def on_error(message) + @errors << message + end + end + + setup do + @server = TestServer.new + @server.config.allowed_request_origins = %w( http://rubyonrails.com ) + end + + [ EOFError, Errno::ECONNRESET ].each do |closed_exception| + test "closes socket on #{closed_exception}" do + skip if ENV['FAYE'].present? + + run_in_eventmachine do + connection = open_connection + + # Internal hax = :( + client = connection.websocket.send(:websocket) + client.instance_variable_get('@stream').instance_variable_get('@rack_hijack_io').expects(:write).raises(closed_exception, 'foo') + client.expects(:client_gone) + + client.write('boo') + assert_equal [], connection.errors + end + end + end + + private + def open_connection + env = Rack::MockRequest.env_for '/test', + 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket', + 'HTTP_HOST' => 'localhost', 'HTTP_ORIGIN' => 'http://rubyonrails.com' + env['rack.hijack'] = -> { env['rack.hijack_io'] = StringIO.new } + + Connection.new(@server, env).tap do |connection| + connection.process + connection.send :handle_open + assert connection.connected + end + end +end diff --git a/actioncable/test/subscription_adapter/postgresql_test.rb b/actioncable/test/subscription_adapter/postgresql_test.rb index 64c632b0cd..3b4fadbb2a 100644 --- a/actioncable/test/subscription_adapter/postgresql_test.rb +++ b/actioncable/test/subscription_adapter/postgresql_test.rb @@ -15,8 +15,15 @@ class PostgresqlAdapterTest < ActionCable::TestCase local_config = ARTest.config['arunit'] database_config.update local_config if local_config end + ActiveRecord::Base.establish_connection database_config + begin + ActiveRecord::Base.connection + rescue + skip "Couldn't connect to PostgreSQL: #{database_config.inspect}" + end + super end |