diff options
Diffstat (limited to 'actioncable/app')
15 files changed, 1103 insertions, 433 deletions
diff --git a/actioncable/app/assets/javascripts/action_cable.coffee.erb b/actioncable/app/assets/javascripts/action_cable.coffee.erb deleted file mode 100644 index e0758dae72..0000000000 --- a/actioncable/app/assets/javascripts/action_cable.coffee.erb +++ /dev/null @@ -1,38 +0,0 @@ -#= export ActionCable -#= require_self -#= require ./action_cable/consumer - -@ActionCable = - INTERNAL: <%= ActionCable::INTERNAL.to_json %> - WebSocket: window.WebSocket - logger: window.console - - createConsumer: (url) -> - url ?= @getConfig("url") ? @INTERNAL.default_mount_path - new ActionCable.Consumer @createWebSocketURL(url) - - getConfig: (name) -> - element = document.head.querySelector("meta[name='action-cable-#{name}']") - element?.getAttribute("content") - - createWebSocketURL: (url) -> - if url and not /^wss?:/i.test(url) - a = document.createElement("a") - a.href = url - # Fix populating Location properties in IE. Otherwise, protocol will be blank. - a.href = a.href - a.protocol = a.protocol.replace("http", "ws") - a.href - else - url - - startDebugging: -> - @debugging = true - - stopDebugging: -> - @debugging = null - - log: (messages...) -> - if @debugging - messages.push(Date.now()) - @logger.log("[ActionCable]", messages...) diff --git a/actioncable/app/assets/javascripts/action_cable.js b/actioncable/app/assets/javascripts/action_cable.js new file mode 100644 index 0000000000..8349361405 --- /dev/null +++ b/actioncable/app/assets/javascripts/action_cable.js @@ -0,0 +1,517 @@ +(function(global, factory) { + typeof exports === "object" && typeof module !== "undefined" ? factory(exports) : typeof define === "function" && define.amd ? define([ "exports" ], factory) : factory(global.ActionCable = {}); +})(this, function(exports) { + "use strict"; + var adapters = { + logger: self.console, + WebSocket: self.WebSocket + }; + var logger = { + log: function log() { + if (this.enabled) { + var _adapters$logger; + for (var _len = arguments.length, messages = Array(_len), _key = 0; _key < _len; _key++) { + messages[_key] = arguments[_key]; + } + messages.push(Date.now()); + (_adapters$logger = adapters.logger).log.apply(_adapters$logger, [ "[ActionCable]" ].concat(messages)); + } + } + }; + var _typeof = typeof Symbol === "function" && typeof Symbol.iterator === "symbol" ? function(obj) { + return typeof obj; + } : function(obj) { + return obj && typeof Symbol === "function" && obj.constructor === Symbol && obj !== Symbol.prototype ? "symbol" : typeof obj; + }; + var classCallCheck = function(instance, Constructor) { + if (!(instance instanceof Constructor)) { + throw new TypeError("Cannot call a class as a function"); + } + }; + var createClass = function() { + function defineProperties(target, props) { + for (var i = 0; i < props.length; i++) { + var descriptor = props[i]; + descriptor.enumerable = descriptor.enumerable || false; + descriptor.configurable = true; + if ("value" in descriptor) descriptor.writable = true; + Object.defineProperty(target, descriptor.key, descriptor); + } + } + return function(Constructor, protoProps, staticProps) { + if (protoProps) defineProperties(Constructor.prototype, protoProps); + if (staticProps) defineProperties(Constructor, staticProps); + return Constructor; + }; + }(); + var now = function now() { + return new Date().getTime(); + }; + var secondsSince = function secondsSince(time) { + return (now() - time) / 1e3; + }; + var clamp = function clamp(number, min, max) { + return Math.max(min, Math.min(max, number)); + }; + var ConnectionMonitor = function() { + function ConnectionMonitor(connection) { + classCallCheck(this, ConnectionMonitor); + this.visibilityDidChange = this.visibilityDidChange.bind(this); + this.connection = connection; + this.reconnectAttempts = 0; + } + ConnectionMonitor.prototype.start = function start() { + if (!this.isRunning()) { + this.startedAt = now(); + delete this.stoppedAt; + this.startPolling(); + addEventListener("visibilitychange", this.visibilityDidChange); + logger.log("ConnectionMonitor started. pollInterval = " + this.getPollInterval() + " ms"); + } + }; + ConnectionMonitor.prototype.stop = function stop() { + if (this.isRunning()) { + this.stoppedAt = now(); + this.stopPolling(); + removeEventListener("visibilitychange", this.visibilityDidChange); + logger.log("ConnectionMonitor stopped"); + } + }; + ConnectionMonitor.prototype.isRunning = function isRunning() { + return this.startedAt && !this.stoppedAt; + }; + ConnectionMonitor.prototype.recordPing = function recordPing() { + this.pingedAt = now(); + }; + ConnectionMonitor.prototype.recordConnect = function recordConnect() { + this.reconnectAttempts = 0; + this.recordPing(); + delete this.disconnectedAt; + logger.log("ConnectionMonitor recorded connect"); + }; + ConnectionMonitor.prototype.recordDisconnect = function recordDisconnect() { + this.disconnectedAt = now(); + logger.log("ConnectionMonitor recorded disconnect"); + }; + ConnectionMonitor.prototype.startPolling = function startPolling() { + this.stopPolling(); + this.poll(); + }; + ConnectionMonitor.prototype.stopPolling = function stopPolling() { + clearTimeout(this.pollTimeout); + }; + ConnectionMonitor.prototype.poll = function poll() { + var _this = this; + this.pollTimeout = setTimeout(function() { + _this.reconnectIfStale(); + _this.poll(); + }, this.getPollInterval()); + }; + ConnectionMonitor.prototype.getPollInterval = function getPollInterval() { + var _constructor$pollInte = this.constructor.pollInterval, min = _constructor$pollInte.min, max = _constructor$pollInte.max, multiplier = _constructor$pollInte.multiplier; + var interval = multiplier * Math.log(this.reconnectAttempts + 1); + return Math.round(clamp(interval, min, max) * 1e3); + }; + ConnectionMonitor.prototype.reconnectIfStale = function reconnectIfStale() { + if (this.connectionIsStale()) { + logger.log("ConnectionMonitor detected stale connection. reconnectAttempts = " + this.reconnectAttempts + ", pollInterval = " + this.getPollInterval() + " ms, time disconnected = " + secondsSince(this.disconnectedAt) + " s, stale threshold = " + this.constructor.staleThreshold + " s"); + this.reconnectAttempts++; + if (this.disconnectedRecently()) { + logger.log("ConnectionMonitor skipping reopening recent disconnect"); + } else { + logger.log("ConnectionMonitor reopening"); + this.connection.reopen(); + } + } + }; + ConnectionMonitor.prototype.connectionIsStale = function connectionIsStale() { + return secondsSince(this.pingedAt ? this.pingedAt : this.startedAt) > this.constructor.staleThreshold; + }; + ConnectionMonitor.prototype.disconnectedRecently = function disconnectedRecently() { + return this.disconnectedAt && secondsSince(this.disconnectedAt) < this.constructor.staleThreshold; + }; + ConnectionMonitor.prototype.visibilityDidChange = function visibilityDidChange() { + var _this2 = this; + if (document.visibilityState === "visible") { + setTimeout(function() { + if (_this2.connectionIsStale() || !_this2.connection.isOpen()) { + logger.log("ConnectionMonitor reopening stale connection on visibilitychange. visbilityState = " + document.visibilityState); + _this2.connection.reopen(); + } + }, 200); + } + }; + return ConnectionMonitor; + }(); + ConnectionMonitor.pollInterval = { + min: 3, + max: 30, + multiplier: 5 + }; + ConnectionMonitor.staleThreshold = 6; + var INTERNAL = { + message_types: { + welcome: "welcome", + disconnect: "disconnect", + ping: "ping", + confirmation: "confirm_subscription", + rejection: "reject_subscription" + }, + disconnect_reasons: { + unauthorized: "unauthorized", + invalid_request: "invalid_request", + server_restart: "server_restart" + }, + default_mount_path: "/cable", + protocols: [ "actioncable-v1-json", "actioncable-unsupported" ] + }; + var message_types = INTERNAL.message_types, protocols = INTERNAL.protocols; + var supportedProtocols = protocols.slice(0, protocols.length - 1); + var indexOf = [].indexOf; + var Connection = function() { + function Connection(consumer) { + classCallCheck(this, Connection); + this.open = this.open.bind(this); + this.consumer = consumer; + this.subscriptions = this.consumer.subscriptions; + this.monitor = new ConnectionMonitor(this); + this.disconnected = true; + } + Connection.prototype.send = function send(data) { + if (this.isOpen()) { + this.webSocket.send(JSON.stringify(data)); + return true; + } else { + return false; + } + }; + Connection.prototype.open = function open() { + if (this.isActive()) { + logger.log("Attempted to open WebSocket, but existing socket is " + this.getState()); + return false; + } else { + logger.log("Opening WebSocket, current state is " + this.getState() + ", subprotocols: " + protocols); + if (this.webSocket) { + this.uninstallEventHandlers(); + } + this.webSocket = new adapters.WebSocket(this.consumer.url, protocols); + this.installEventHandlers(); + this.monitor.start(); + return true; + } + }; + Connection.prototype.close = function close() { + var _ref = arguments.length > 0 && arguments[0] !== undefined ? arguments[0] : { + allowReconnect: true + }, allowReconnect = _ref.allowReconnect; + if (!allowReconnect) { + this.monitor.stop(); + } + if (this.isActive()) { + return this.webSocket.close(); + } + }; + Connection.prototype.reopen = function reopen() { + logger.log("Reopening WebSocket, current state is " + this.getState()); + if (this.isActive()) { + try { + return this.close(); + } catch (error) { + logger.log("Failed to reopen WebSocket", error); + } finally { + logger.log("Reopening WebSocket in " + this.constructor.reopenDelay + "ms"); + setTimeout(this.open, this.constructor.reopenDelay); + } + } else { + return this.open(); + } + }; + Connection.prototype.getProtocol = function getProtocol() { + if (this.webSocket) { + return this.webSocket.protocol; + } + }; + Connection.prototype.isOpen = function isOpen() { + return this.isState("open"); + }; + Connection.prototype.isActive = function isActive() { + return this.isState("open", "connecting"); + }; + Connection.prototype.isProtocolSupported = function isProtocolSupported() { + return indexOf.call(supportedProtocols, this.getProtocol()) >= 0; + }; + Connection.prototype.isState = function isState() { + for (var _len = arguments.length, states = Array(_len), _key = 0; _key < _len; _key++) { + states[_key] = arguments[_key]; + } + return indexOf.call(states, this.getState()) >= 0; + }; + Connection.prototype.getState = function getState() { + if (this.webSocket) { + for (var state in adapters.WebSocket) { + if (adapters.WebSocket[state] === this.webSocket.readyState) { + return state.toLowerCase(); + } + } + } + return null; + }; + Connection.prototype.installEventHandlers = function installEventHandlers() { + for (var eventName in this.events) { + var handler = this.events[eventName].bind(this); + this.webSocket["on" + eventName] = handler; + } + }; + Connection.prototype.uninstallEventHandlers = function uninstallEventHandlers() { + for (var eventName in this.events) { + this.webSocket["on" + eventName] = function() {}; + } + }; + return Connection; + }(); + Connection.reopenDelay = 500; + Connection.prototype.events = { + message: function message(event) { + if (!this.isProtocolSupported()) { + return; + } + var _JSON$parse = JSON.parse(event.data), identifier = _JSON$parse.identifier, message = _JSON$parse.message, reason = _JSON$parse.reason, reconnect = _JSON$parse.reconnect, type = _JSON$parse.type; + switch (type) { + case message_types.welcome: + this.monitor.recordConnect(); + return this.subscriptions.reload(); + + case message_types.disconnect: + logger.log("Disconnecting. Reason: " + reason); + return this.close({ + allowReconnect: reconnect + }); + + case message_types.ping: + return this.monitor.recordPing(); + + case message_types.confirmation: + return this.subscriptions.notify(identifier, "connected"); + + case message_types.rejection: + return this.subscriptions.reject(identifier); + + default: + return this.subscriptions.notify(identifier, "received", message); + } + }, + open: function open() { + logger.log("WebSocket onopen event, using '" + this.getProtocol() + "' subprotocol"); + this.disconnected = false; + if (!this.isProtocolSupported()) { + logger.log("Protocol is unsupported. Stopping monitor and disconnecting."); + return this.close({ + allowReconnect: false + }); + } + }, + close: function close(event) { + logger.log("WebSocket onclose event"); + if (this.disconnected) { + return; + } + this.disconnected = true; + this.monitor.recordDisconnect(); + return this.subscriptions.notifyAll("disconnected", { + willAttemptReconnect: this.monitor.isRunning() + }); + }, + error: function error() { + logger.log("WebSocket onerror event"); + } + }; + var extend = function extend(object, properties) { + if (properties != null) { + for (var key in properties) { + var value = properties[key]; + object[key] = value; + } + } + return object; + }; + var Subscription = function() { + function Subscription(consumer) { + var params = arguments.length > 1 && arguments[1] !== undefined ? arguments[1] : {}; + var mixin = arguments[2]; + classCallCheck(this, Subscription); + this.consumer = consumer; + this.identifier = JSON.stringify(params); + extend(this, mixin); + } + Subscription.prototype.perform = function perform(action) { + var data = arguments.length > 1 && arguments[1] !== undefined ? arguments[1] : {}; + data.action = action; + return this.send(data); + }; + Subscription.prototype.send = function send(data) { + return this.consumer.send({ + command: "message", + identifier: this.identifier, + data: JSON.stringify(data) + }); + }; + Subscription.prototype.unsubscribe = function unsubscribe() { + return this.consumer.subscriptions.remove(this); + }; + return Subscription; + }(); + var Subscriptions = function() { + function Subscriptions(consumer) { + classCallCheck(this, Subscriptions); + this.consumer = consumer; + this.subscriptions = []; + } + Subscriptions.prototype.create = function create(channelName, mixin) { + var channel = channelName; + var params = (typeof channel === "undefined" ? "undefined" : _typeof(channel)) === "object" ? channel : { + channel: channel + }; + var subscription = new Subscription(this.consumer, params, mixin); + return this.add(subscription); + }; + Subscriptions.prototype.add = function add(subscription) { + this.subscriptions.push(subscription); + this.consumer.ensureActiveConnection(); + this.notify(subscription, "initialized"); + this.sendCommand(subscription, "subscribe"); + return subscription; + }; + Subscriptions.prototype.remove = function remove(subscription) { + this.forget(subscription); + if (!this.findAll(subscription.identifier).length) { + this.sendCommand(subscription, "unsubscribe"); + } + return subscription; + }; + Subscriptions.prototype.reject = function reject(identifier) { + var _this = this; + return this.findAll(identifier).map(function(subscription) { + _this.forget(subscription); + _this.notify(subscription, "rejected"); + return subscription; + }); + }; + Subscriptions.prototype.forget = function forget(subscription) { + this.subscriptions = this.subscriptions.filter(function(s) { + return s !== subscription; + }); + return subscription; + }; + Subscriptions.prototype.findAll = function findAll(identifier) { + return this.subscriptions.filter(function(s) { + return s.identifier === identifier; + }); + }; + Subscriptions.prototype.reload = function reload() { + var _this2 = this; + return this.subscriptions.map(function(subscription) { + return _this2.sendCommand(subscription, "subscribe"); + }); + }; + Subscriptions.prototype.notifyAll = function notifyAll(callbackName) { + var _this3 = this; + for (var _len = arguments.length, args = Array(_len > 1 ? _len - 1 : 0), _key = 1; _key < _len; _key++) { + args[_key - 1] = arguments[_key]; + } + return this.subscriptions.map(function(subscription) { + return _this3.notify.apply(_this3, [ subscription, callbackName ].concat(args)); + }); + }; + Subscriptions.prototype.notify = function notify(subscription, callbackName) { + for (var _len2 = arguments.length, args = Array(_len2 > 2 ? _len2 - 2 : 0), _key2 = 2; _key2 < _len2; _key2++) { + args[_key2 - 2] = arguments[_key2]; + } + var subscriptions = void 0; + if (typeof subscription === "string") { + subscriptions = this.findAll(subscription); + } else { + subscriptions = [ subscription ]; + } + return subscriptions.map(function(subscription) { + return typeof subscription[callbackName] === "function" ? subscription[callbackName].apply(subscription, args) : undefined; + }); + }; + Subscriptions.prototype.sendCommand = function sendCommand(subscription, command) { + var identifier = subscription.identifier; + return this.consumer.send({ + command: command, + identifier: identifier + }); + }; + return Subscriptions; + }(); + var Consumer = function() { + function Consumer(url) { + classCallCheck(this, Consumer); + this._url = url; + this.subscriptions = new Subscriptions(this); + this.connection = new Connection(this); + } + Consumer.prototype.send = function send(data) { + return this.connection.send(data); + }; + Consumer.prototype.connect = function connect() { + return this.connection.open(); + }; + Consumer.prototype.disconnect = function disconnect() { + return this.connection.close({ + allowReconnect: false + }); + }; + Consumer.prototype.ensureActiveConnection = function ensureActiveConnection() { + if (!this.connection.isActive()) { + return this.connection.open(); + } + }; + createClass(Consumer, [ { + key: "url", + get: function get$$1() { + return createWebSocketURL(this._url); + } + } ]); + return Consumer; + }(); + function createWebSocketURL(url) { + if (typeof url === "function") { + url = url(); + } + if (url && !/^wss?:/i.test(url)) { + var a = document.createElement("a"); + a.href = url; + a.href = a.href; + a.protocol = a.protocol.replace("http", "ws"); + return a.href; + } else { + return url; + } + } + function createConsumer() { + var url = arguments.length > 0 && arguments[0] !== undefined ? arguments[0] : getConfig("url") || INTERNAL.default_mount_path; + return new Consumer(url); + } + function getConfig(name) { + var element = document.head.querySelector("meta[name='action-cable-" + name + "']"); + if (element) { + return element.getAttribute("content"); + } + } + exports.Connection = Connection; + exports.ConnectionMonitor = ConnectionMonitor; + exports.Consumer = Consumer; + exports.INTERNAL = INTERNAL; + exports.Subscription = Subscription; + exports.Subscriptions = Subscriptions; + exports.adapters = adapters; + exports.createWebSocketURL = createWebSocketURL; + exports.logger = logger; + exports.createConsumer = createConsumer; + exports.getConfig = getConfig; + Object.defineProperty(exports, "__esModule", { + value: true + }); +}); diff --git a/actioncable/app/assets/javascripts/action_cable/connection.coffee b/actioncable/app/assets/javascripts/action_cable/connection.coffee deleted file mode 100644 index 7fd68cad2f..0000000000 --- a/actioncable/app/assets/javascripts/action_cable/connection.coffee +++ /dev/null @@ -1,116 +0,0 @@ -#= require ./connection_monitor - -# Encapsulate the cable connection held by the consumer. This is an internal class not intended for direct user manipulation. - -{message_types, protocols} = ActionCable.INTERNAL -[supportedProtocols..., unsupportedProtocol] = protocols - -class ActionCable.Connection - @reopenDelay: 500 - - constructor: (@consumer) -> - {@subscriptions} = @consumer - @monitor = new ActionCable.ConnectionMonitor this - @disconnected = true - - send: (data) -> - if @isOpen() - @webSocket.send(JSON.stringify(data)) - true - else - false - - open: => - if @isActive() - ActionCable.log("Attempted to open WebSocket, but existing socket is #{@getState()}") - false - else - ActionCable.log("Opening WebSocket, current state is #{@getState()}, subprotocols: #{protocols}") - @uninstallEventHandlers() if @webSocket? - @webSocket = new ActionCable.WebSocket(@consumer.url, protocols) - @installEventHandlers() - @monitor.start() - true - - close: ({allowReconnect} = {allowReconnect: true}) -> - @monitor.stop() unless allowReconnect - @webSocket?.close() if @isActive() - - reopen: -> - ActionCable.log("Reopening WebSocket, current state is #{@getState()}") - if @isActive() - try - @close() - catch error - ActionCable.log("Failed to reopen WebSocket", error) - finally - ActionCable.log("Reopening WebSocket in #{@constructor.reopenDelay}ms") - setTimeout(@open, @constructor.reopenDelay) - else - @open() - - getProtocol: -> - @webSocket?.protocol - - isOpen: -> - @isState("open") - - isActive: -> - @isState("open", "connecting") - - # Private - - isProtocolSupported: -> - @getProtocol() in supportedProtocols - - isState: (states...) -> - @getState() in states - - getState: -> - return state.toLowerCase() for state, value of WebSocket when value is @webSocket?.readyState - null - - installEventHandlers: -> - for eventName of @events - handler = @events[eventName].bind(this) - @webSocket["on#{eventName}"] = handler - return - - uninstallEventHandlers: -> - for eventName of @events - @webSocket["on#{eventName}"] = -> - return - - events: - message: (event) -> - return unless @isProtocolSupported() - {identifier, message, type} = JSON.parse(event.data) - switch type - when message_types.welcome - @monitor.recordConnect() - @subscriptions.reload() - when message_types.ping - @monitor.recordPing() - when message_types.confirmation - @subscriptions.notify(identifier, "connected") - when message_types.rejection - @subscriptions.reject(identifier) - else - @subscriptions.notify(identifier, "received", message) - - open: -> - ActionCable.log("WebSocket onopen event, using '#{@getProtocol()}' subprotocol") - @disconnected = false - if not @isProtocolSupported() - ActionCable.log("Protocol is unsupported. Stopping monitor and disconnecting.") - @close(allowReconnect: false) - - close: (event) -> - ActionCable.log("WebSocket onclose event") - return if @disconnected - @disconnected = true - @monitor.recordDisconnect() - @subscriptions.notifyAll("disconnected", {willAttemptReconnect: @monitor.isRunning()}) - - error: -> - ActionCable.log("WebSocket onerror event") diff --git a/actioncable/app/assets/javascripts/action_cable/connection_monitor.coffee b/actioncable/app/assets/javascripts/action_cable/connection_monitor.coffee deleted file mode 100644 index 0cc675fa94..0000000000 --- a/actioncable/app/assets/javascripts/action_cable/connection_monitor.coffee +++ /dev/null @@ -1,95 +0,0 @@ -# Responsible for ensuring the cable connection is in good health by validating the heartbeat pings sent from the server, and attempting -# revival reconnections if things go astray. Internal class, not intended for direct user manipulation. -class ActionCable.ConnectionMonitor - @pollInterval: - min: 3 - max: 30 - - @staleThreshold: 6 # Server::Connections::BEAT_INTERVAL * 2 (missed two pings) - - constructor: (@connection) -> - @reconnectAttempts = 0 - - start: -> - unless @isRunning() - @startedAt = now() - delete @stoppedAt - @startPolling() - document.addEventListener("visibilitychange", @visibilityDidChange) - ActionCable.log("ConnectionMonitor started. pollInterval = #{@getPollInterval()} ms") - - stop: -> - if @isRunning() - @stoppedAt = now() - @stopPolling() - document.removeEventListener("visibilitychange", @visibilityDidChange) - ActionCable.log("ConnectionMonitor stopped") - - isRunning: -> - @startedAt? and not @stoppedAt? - - recordPing: -> - @pingedAt = now() - - recordConnect: -> - @reconnectAttempts = 0 - @recordPing() - delete @disconnectedAt - ActionCable.log("ConnectionMonitor recorded connect") - - recordDisconnect: -> - @disconnectedAt = now() - ActionCable.log("ConnectionMonitor recorded disconnect") - - # Private - - startPolling: -> - @stopPolling() - @poll() - - stopPolling: -> - clearTimeout(@pollTimeout) - - poll: -> - @pollTimeout = setTimeout => - @reconnectIfStale() - @poll() - , @getPollInterval() - - getPollInterval: -> - {min, max} = @constructor.pollInterval - interval = 5 * Math.log(@reconnectAttempts + 1) - Math.round(clamp(interval, min, max) * 1000) - - reconnectIfStale: -> - if @connectionIsStale() - ActionCable.log("ConnectionMonitor detected stale connection. reconnectAttempts = #{@reconnectAttempts}, pollInterval = #{@getPollInterval()} ms, time disconnected = #{secondsSince(@disconnectedAt)} s, stale threshold = #{@constructor.staleThreshold} s") - @reconnectAttempts++ - if @disconnectedRecently() - ActionCable.log("ConnectionMonitor skipping reopening recent disconnect") - else - ActionCable.log("ConnectionMonitor reopening") - @connection.reopen() - - connectionIsStale: -> - secondsSince(@pingedAt ? @startedAt) > @constructor.staleThreshold - - disconnectedRecently: -> - @disconnectedAt and secondsSince(@disconnectedAt) < @constructor.staleThreshold - - visibilityDidChange: => - if document.visibilityState is "visible" - setTimeout => - if @connectionIsStale() or not @connection.isOpen() - ActionCable.log("ConnectionMonitor reopening stale connection on visibilitychange. visbilityState = #{document.visibilityState}") - @connection.reopen() - , 200 - - now = -> - new Date().getTime() - - secondsSince = (time) -> - (now() - time) / 1000 - - clamp = (number, min, max) -> - Math.max(min, Math.min(max, number)) diff --git a/actioncable/app/assets/javascripts/action_cable/consumer.coffee b/actioncable/app/assets/javascripts/action_cable/consumer.coffee deleted file mode 100644 index 3298be717f..0000000000 --- a/actioncable/app/assets/javascripts/action_cable/consumer.coffee +++ /dev/null @@ -1,46 +0,0 @@ -#= require ./connection -#= require ./subscriptions -#= require ./subscription - -# The ActionCable.Consumer establishes the connection to a server-side Ruby Connection object. Once established, -# the ActionCable.ConnectionMonitor will ensure that its properly maintained through heartbeats and checking for stale updates. -# The Consumer instance is also the gateway to establishing subscriptions to desired channels through the #createSubscription -# method. -# -# The following example shows how this can be setup: -# -# @App = {} -# App.cable = ActionCable.createConsumer "ws://example.com/accounts/1" -# App.appearance = App.cable.subscriptions.create "AppearanceChannel" -# -# For more details on how you'd configure an actual channel subscription, see ActionCable.Subscription. -# -# When a consumer is created, it automatically connects with the server. -# -# To disconnect from the server, call -# -# App.cable.disconnect() -# -# and to restart the connection: -# -# App.cable.connect() -# -# Any channel subscriptions which existed prior to disconnecting will -# automatically resubscribe. -class ActionCable.Consumer - constructor: (@url) -> - @subscriptions = new ActionCable.Subscriptions this - @connection = new ActionCable.Connection this - - send: (data) -> - @connection.send(data) - - connect: -> - @connection.open() - - disconnect: -> - @connection.close(allowReconnect: false) - - ensureActiveConnection: -> - unless @connection.isActive() - @connection.open() diff --git a/actioncable/app/assets/javascripts/action_cable/subscription.coffee b/actioncable/app/assets/javascripts/action_cable/subscription.coffee deleted file mode 100644 index 8e0805a174..0000000000 --- a/actioncable/app/assets/javascripts/action_cable/subscription.coffee +++ /dev/null @@ -1,72 +0,0 @@ -# A new subscription is created through the ActionCable.Subscriptions instance available on the consumer. -# It provides a number of callbacks and a method for calling remote procedure calls on the corresponding -# Channel instance on the server side. -# -# An example demonstrates the basic functionality: -# -# App.appearance = App.cable.subscriptions.create "AppearanceChannel", -# connected: -> -# # Called once the subscription has been successfully completed -# -# disconnected: ({ willAttemptReconnect: boolean }) -> -# # Called when the client has disconnected with the server. -# # The object will have an `willAttemptReconnect` property which -# # says whether the client has the intention of attempting -# # to reconnect. -# -# appear: -> -# @perform 'appear', appearing_on: @appearingOn() -# -# away: -> -# @perform 'away' -# -# appearingOn: -> -# $('main').data 'appearing-on' -# -# The methods #appear and #away forward their intent to the remote AppearanceChannel instance on the server -# by calling the `@perform` method with the first parameter being the action (which maps to AppearanceChannel#appear/away). -# The second parameter is a hash that'll get JSON encoded and made available on the server in the data parameter. -# -# This is how the server component would look: -# -# class AppearanceChannel < ApplicationActionCable::Channel -# def subscribed -# current_user.appear -# end -# -# def unsubscribed -# current_user.disappear -# end -# -# def appear(data) -# current_user.appear on: data['appearing_on'] -# end -# -# def away -# current_user.away -# end -# end -# -# The "AppearanceChannel" name is automatically mapped between the client-side subscription creation and the server-side Ruby class name. -# The AppearanceChannel#appear/away public methods are exposed automatically to client-side invocation through the @perform method. -class ActionCable.Subscription - constructor: (@consumer, params = {}, mixin) -> - @identifier = JSON.stringify(params) - extend(this, mixin) - - # Perform a channel action with the optional data passed as an attribute - perform: (action, data = {}) -> - data.action = action - @send(data) - - send: (data) -> - @consumer.send(command: "message", identifier: @identifier, data: JSON.stringify(data)) - - unsubscribe: -> - @consumer.subscriptions.remove(this) - - extend = (object, properties) -> - if properties? - for key, value of properties - object[key] = value - object diff --git a/actioncable/app/assets/javascripts/action_cable/subscriptions.coffee b/actioncable/app/assets/javascripts/action_cable/subscriptions.coffee deleted file mode 100644 index aa052bf5d8..0000000000 --- a/actioncable/app/assets/javascripts/action_cable/subscriptions.coffee +++ /dev/null @@ -1,66 +0,0 @@ -# Collection class for creating (and internally managing) channel subscriptions. The only method intended to be triggered by the user -# us ActionCable.Subscriptions#create, and it should be called through the consumer like so: -# -# @App = {} -# App.cable = ActionCable.createConsumer "ws://example.com/accounts/1" -# App.appearance = App.cable.subscriptions.create "AppearanceChannel" -# -# For more details on how you'd configure an actual channel subscription, see ActionCable.Subscription. -class ActionCable.Subscriptions - constructor: (@consumer) -> - @subscriptions = [] - - create: (channelName, mixin) -> - channel = channelName - params = if typeof channel is "object" then channel else {channel} - subscription = new ActionCable.Subscription @consumer, params, mixin - @add(subscription) - - # Private - - add: (subscription) -> - @subscriptions.push(subscription) - @consumer.ensureActiveConnection() - @notify(subscription, "initialized") - @sendCommand(subscription, "subscribe") - subscription - - remove: (subscription) -> - @forget(subscription) - unless @findAll(subscription.identifier).length - @sendCommand(subscription, "unsubscribe") - subscription - - reject: (identifier) -> - for subscription in @findAll(identifier) - @forget(subscription) - @notify(subscription, "rejected") - subscription - - forget: (subscription) -> - @subscriptions = (s for s in @subscriptions when s isnt subscription) - subscription - - findAll: (identifier) -> - s for s in @subscriptions when s.identifier is identifier - - reload: -> - for subscription in @subscriptions - @sendCommand(subscription, "subscribe") - - notifyAll: (callbackName, args...) -> - for subscription in @subscriptions - @notify(subscription, callbackName, args...) - - notify: (subscription, callbackName, args...) -> - if typeof subscription is "string" - subscriptions = @findAll(subscription) - else - subscriptions = [subscription] - - for subscription in subscriptions - subscription[callbackName]?(args...) - - sendCommand: (subscription, command) -> - {identifier} = subscription - @consumer.send({command, identifier}) diff --git a/actioncable/app/javascript/action_cable/adapters.js b/actioncable/app/javascript/action_cable/adapters.js new file mode 100644 index 0000000000..4de8131438 --- /dev/null +++ b/actioncable/app/javascript/action_cable/adapters.js @@ -0,0 +1,4 @@ +export default { + logger: self.console, + WebSocket: self.WebSocket +} diff --git a/actioncable/app/javascript/action_cable/connection.js b/actioncable/app/javascript/action_cable/connection.js new file mode 100644 index 0000000000..96bac132c1 --- /dev/null +++ b/actioncable/app/javascript/action_cable/connection.js @@ -0,0 +1,165 @@ +import adapters from "./adapters" +import ConnectionMonitor from "./connection_monitor" +import INTERNAL from "./internal" +import logger from "./logger" + +// Encapsulate the cable connection held by the consumer. This is an internal class not intended for direct user manipulation. + +const {message_types, protocols} = INTERNAL +const supportedProtocols = protocols.slice(0, protocols.length - 1) + +const indexOf = [].indexOf + +class Connection { + constructor(consumer) { + this.open = this.open.bind(this) + this.consumer = consumer + this.subscriptions = this.consumer.subscriptions + this.monitor = new ConnectionMonitor(this) + this.disconnected = true + } + + send(data) { + if (this.isOpen()) { + this.webSocket.send(JSON.stringify(data)) + return true + } else { + return false + } + } + + open() { + if (this.isActive()) { + logger.log(`Attempted to open WebSocket, but existing socket is ${this.getState()}`) + return false + } else { + logger.log(`Opening WebSocket, current state is ${this.getState()}, subprotocols: ${protocols}`) + if (this.webSocket) { this.uninstallEventHandlers() } + this.webSocket = new adapters.WebSocket(this.consumer.url, protocols) + this.installEventHandlers() + this.monitor.start() + return true + } + } + + close({allowReconnect} = {allowReconnect: true}) { + if (!allowReconnect) { this.monitor.stop() } + if (this.isActive()) { + return this.webSocket.close() + } + } + + reopen() { + logger.log(`Reopening WebSocket, current state is ${this.getState()}`) + if (this.isActive()) { + try { + return this.close() + } catch (error) { + logger.log("Failed to reopen WebSocket", error) + } + finally { + logger.log(`Reopening WebSocket in ${this.constructor.reopenDelay}ms`) + setTimeout(this.open, this.constructor.reopenDelay) + } + } else { + return this.open() + } + } + + getProtocol() { + if (this.webSocket) { + return this.webSocket.protocol + } + } + + isOpen() { + return this.isState("open") + } + + isActive() { + return this.isState("open", "connecting") + } + + // Private + + isProtocolSupported() { + return indexOf.call(supportedProtocols, this.getProtocol()) >= 0 + } + + isState(...states) { + return indexOf.call(states, this.getState()) >= 0 + } + + getState() { + if (this.webSocket) { + for (let state in adapters.WebSocket) { + if (adapters.WebSocket[state] === this.webSocket.readyState) { + return state.toLowerCase() + } + } + } + return null + } + + installEventHandlers() { + for (let eventName in this.events) { + const handler = this.events[eventName].bind(this) + this.webSocket[`on${eventName}`] = handler + } + } + + uninstallEventHandlers() { + for (let eventName in this.events) { + this.webSocket[`on${eventName}`] = function() {} + } + } + +} + +Connection.reopenDelay = 500 + +Connection.prototype.events = { + message(event) { + if (!this.isProtocolSupported()) { return } + const {identifier, message, reason, reconnect, type} = JSON.parse(event.data) + switch (type) { + case message_types.welcome: + this.monitor.recordConnect() + return this.subscriptions.reload() + case message_types.disconnect: + logger.log(`Disconnecting. Reason: ${reason}`) + return this.close({allowReconnect: reconnect}) + case message_types.ping: + return this.monitor.recordPing() + case message_types.confirmation: + return this.subscriptions.notify(identifier, "connected") + case message_types.rejection: + return this.subscriptions.reject(identifier) + default: + return this.subscriptions.notify(identifier, "received", message) + } + }, + + open() { + logger.log(`WebSocket onopen event, using '${this.getProtocol()}' subprotocol`) + this.disconnected = false + if (!this.isProtocolSupported()) { + logger.log("Protocol is unsupported. Stopping monitor and disconnecting.") + return this.close({allowReconnect: false}) + } + }, + + close(event) { + logger.log("WebSocket onclose event") + if (this.disconnected) { return } + this.disconnected = true + this.monitor.recordDisconnect() + return this.subscriptions.notifyAll("disconnected", {willAttemptReconnect: this.monitor.isRunning()}) + }, + + error() { + logger.log("WebSocket onerror event") + } +} + +export default Connection diff --git a/actioncable/app/javascript/action_cable/connection_monitor.js b/actioncable/app/javascript/action_cable/connection_monitor.js new file mode 100644 index 0000000000..312a71d154 --- /dev/null +++ b/actioncable/app/javascript/action_cable/connection_monitor.js @@ -0,0 +1,126 @@ +import logger from "./logger" + +// Responsible for ensuring the cable connection is in good health by validating the heartbeat pings sent from the server, and attempting +// revival reconnections if things go astray. Internal class, not intended for direct user manipulation. + +const now = () => new Date().getTime() + +const secondsSince = time => (now() - time) / 1000 + +const clamp = (number, min, max) => Math.max(min, Math.min(max, number)) + +class ConnectionMonitor { + constructor(connection) { + this.visibilityDidChange = this.visibilityDidChange.bind(this) + this.connection = connection + this.reconnectAttempts = 0 + } + + start() { + if (!this.isRunning()) { + this.startedAt = now() + delete this.stoppedAt + this.startPolling() + addEventListener("visibilitychange", this.visibilityDidChange) + logger.log(`ConnectionMonitor started. pollInterval = ${this.getPollInterval()} ms`) + } + } + + stop() { + if (this.isRunning()) { + this.stoppedAt = now() + this.stopPolling() + removeEventListener("visibilitychange", this.visibilityDidChange) + logger.log("ConnectionMonitor stopped") + } + } + + isRunning() { + return this.startedAt && !this.stoppedAt + } + + recordPing() { + this.pingedAt = now() + } + + recordConnect() { + this.reconnectAttempts = 0 + this.recordPing() + delete this.disconnectedAt + logger.log("ConnectionMonitor recorded connect") + } + + recordDisconnect() { + this.disconnectedAt = now() + logger.log("ConnectionMonitor recorded disconnect") + } + + // Private + + startPolling() { + this.stopPolling() + this.poll() + } + + stopPolling() { + clearTimeout(this.pollTimeout) + } + + poll() { + this.pollTimeout = setTimeout(() => { + this.reconnectIfStale() + this.poll() + } + , this.getPollInterval()) + } + + getPollInterval() { + const {min, max, multiplier} = this.constructor.pollInterval + const interval = multiplier * Math.log(this.reconnectAttempts + 1) + return Math.round(clamp(interval, min, max) * 1000) + } + + reconnectIfStale() { + if (this.connectionIsStale()) { + logger.log(`ConnectionMonitor detected stale connection. reconnectAttempts = ${this.reconnectAttempts}, pollInterval = ${this.getPollInterval()} ms, time disconnected = ${secondsSince(this.disconnectedAt)} s, stale threshold = ${this.constructor.staleThreshold} s`) + this.reconnectAttempts++ + if (this.disconnectedRecently()) { + logger.log("ConnectionMonitor skipping reopening recent disconnect") + } else { + logger.log("ConnectionMonitor reopening") + this.connection.reopen() + } + } + } + + connectionIsStale() { + return secondsSince(this.pingedAt ? this.pingedAt : this.startedAt) > this.constructor.staleThreshold + } + + disconnectedRecently() { + return this.disconnectedAt && (secondsSince(this.disconnectedAt) < this.constructor.staleThreshold) + } + + visibilityDidChange() { + if (document.visibilityState === "visible") { + setTimeout(() => { + if (this.connectionIsStale() || !this.connection.isOpen()) { + logger.log(`ConnectionMonitor reopening stale connection on visibilitychange. visbilityState = ${document.visibilityState}`) + this.connection.reopen() + } + } + , 200) + } + } + +} + +ConnectionMonitor.pollInterval = { + min: 3, + max: 30, + multiplier: 5 +} + +ConnectionMonitor.staleThreshold = 6 // Server::Connections::BEAT_INTERVAL * 2 (missed two pings) + +export default ConnectionMonitor diff --git a/actioncable/app/javascript/action_cable/consumer.js b/actioncable/app/javascript/action_cable/consumer.js new file mode 100644 index 0000000000..e2e0dea8b5 --- /dev/null +++ b/actioncable/app/javascript/action_cable/consumer.js @@ -0,0 +1,75 @@ +import Connection from "./connection" +import Subscriptions from "./subscriptions" + +// The ActionCable.Consumer establishes the connection to a server-side Ruby Connection object. Once established, +// the ActionCable.ConnectionMonitor will ensure that its properly maintained through heartbeats and checking for stale updates. +// The Consumer instance is also the gateway to establishing subscriptions to desired channels through the #createSubscription +// method. +// +// The following example shows how this can be setup: +// +// App = {} +// App.cable = ActionCable.createConsumer("ws://example.com/accounts/1") +// App.appearance = App.cable.subscriptions.create("AppearanceChannel") +// +// For more details on how you'd configure an actual channel subscription, see ActionCable.Subscription. +// +// When a consumer is created, it automatically connects with the server. +// +// To disconnect from the server, call +// +// App.cable.disconnect() +// +// and to restart the connection: +// +// App.cable.connect() +// +// Any channel subscriptions which existed prior to disconnecting will +// automatically resubscribe. + +export default class Consumer { + constructor(url) { + this._url = url + this.subscriptions = new Subscriptions(this) + this.connection = new Connection(this) + } + + get url() { + return createWebSocketURL(this._url) + } + + send(data) { + return this.connection.send(data) + } + + connect() { + return this.connection.open() + } + + disconnect() { + return this.connection.close({allowReconnect: false}) + } + + ensureActiveConnection() { + if (!this.connection.isActive()) { + return this.connection.open() + } + } +} + +export function createWebSocketURL(url) { + if (typeof url === "function") { + url = url() + } + + if (url && !/^wss?:/i.test(url)) { + const a = document.createElement("a") + a.href = url + // Fix populating Location properties in IE. Otherwise, protocol will be blank. + a.href = a.href + a.protocol = a.protocol.replace("http", "ws") + return a.href + } else { + return url + } +} diff --git a/actioncable/app/javascript/action_cable/index.js b/actioncable/app/javascript/action_cable/index.js new file mode 100644 index 0000000000..848b5631d6 --- /dev/null +++ b/actioncable/app/javascript/action_cable/index.js @@ -0,0 +1,31 @@ +import Connection from "./connection" +import ConnectionMonitor from "./connection_monitor" +import Consumer, { createWebSocketURL } from "./consumer" +import INTERNAL from "./internal" +import Subscription from "./subscription" +import Subscriptions from "./subscriptions" +import adapters from "./adapters" +import logger from "./logger" + +export { + Connection, + ConnectionMonitor, + Consumer, + INTERNAL, + Subscription, + Subscriptions, + adapters, + createWebSocketURL, + logger, +} + +export function createConsumer(url = getConfig("url") || INTERNAL.default_mount_path) { + return new Consumer(url) +} + +export function getConfig(name) { + const element = document.head.querySelector(`meta[name='action-cable-${name}']`) + if (element) { + return element.getAttribute("content") + } +} diff --git a/actioncable/app/javascript/action_cable/logger.js b/actioncable/app/javascript/action_cable/logger.js new file mode 100644 index 0000000000..ef4661ead1 --- /dev/null +++ b/actioncable/app/javascript/action_cable/logger.js @@ -0,0 +1,10 @@ +import adapters from "./adapters" + +export default { + log(...messages) { + if (this.enabled) { + messages.push(Date.now()) + adapters.logger.log("[ActionCable]", ...messages) + } + }, +} diff --git a/actioncable/app/javascript/action_cable/subscription.js b/actioncable/app/javascript/action_cable/subscription.js new file mode 100644 index 0000000000..7de08f93b3 --- /dev/null +++ b/actioncable/app/javascript/action_cable/subscription.js @@ -0,0 +1,89 @@ +// A new subscription is created through the ActionCable.Subscriptions instance available on the consumer. +// It provides a number of callbacks and a method for calling remote procedure calls on the corresponding +// Channel instance on the server side. +// +// An example demonstrates the basic functionality: +// +// App.appearance = App.cable.subscriptions.create("AppearanceChannel", { +// connected() { +// // Called once the subscription has been successfully completed +// }, +// +// disconnected({ willAttemptReconnect: boolean }) { +// // Called when the client has disconnected with the server. +// // The object will have an `willAttemptReconnect` property which +// // says whether the client has the intention of attempting +// // to reconnect. +// }, +// +// appear() { +// this.perform('appear', {appearing_on: this.appearingOn()}) +// }, +// +// away() { +// this.perform('away') +// }, +// +// appearingOn() { +// $('main').data('appearing-on') +// } +// }) +// +// The methods #appear and #away forward their intent to the remote AppearanceChannel instance on the server +// by calling the `perform` method with the first parameter being the action (which maps to AppearanceChannel#appear/away). +// The second parameter is a hash that'll get JSON encoded and made available on the server in the data parameter. +// +// This is how the server component would look: +// +// class AppearanceChannel < ApplicationActionCable::Channel +// def subscribed +// current_user.appear +// end +// +// def unsubscribed +// current_user.disappear +// end +// +// def appear(data) +// current_user.appear on: data['appearing_on'] +// end +// +// def away +// current_user.away +// end +// end +// +// The "AppearanceChannel" name is automatically mapped between the client-side subscription creation and the server-side Ruby class name. +// The AppearanceChannel#appear/away public methods are exposed automatically to client-side invocation through the perform method. + +const extend = function(object, properties) { + if (properties != null) { + for (let key in properties) { + const value = properties[key] + object[key] = value + } + } + return object +} + +export default class Subscription { + constructor(consumer, params = {}, mixin) { + this.consumer = consumer + this.identifier = JSON.stringify(params) + extend(this, mixin) + } + + // Perform a channel action with the optional data passed as an attribute + perform(action, data = {}) { + data.action = action + return this.send(data) + } + + send(data) { + return this.consumer.send({command: "message", identifier: this.identifier, data: JSON.stringify(data)}) + } + + unsubscribe() { + return this.consumer.subscriptions.remove(this) + } +} diff --git a/actioncable/app/javascript/action_cable/subscriptions.js b/actioncable/app/javascript/action_cable/subscriptions.js new file mode 100644 index 0000000000..867cafb407 --- /dev/null +++ b/actioncable/app/javascript/action_cable/subscriptions.js @@ -0,0 +1,86 @@ +import Subscription from "./subscription" + +// Collection class for creating (and internally managing) channel subscriptions. The only method intended to be triggered by the user +// us ActionCable.Subscriptions#create, and it should be called through the consumer like so: +// +// App = {} +// App.cable = ActionCable.createConsumer("ws://example.com/accounts/1") +// App.appearance = App.cable.subscriptions.create("AppearanceChannel") +// +// For more details on how you'd configure an actual channel subscription, see ActionCable.Subscription. + +export default class Subscriptions { + constructor(consumer) { + this.consumer = consumer + this.subscriptions = [] + } + + create(channelName, mixin) { + const channel = channelName + const params = typeof channel === "object" ? channel : {channel} + const subscription = new Subscription(this.consumer, params, mixin) + return this.add(subscription) + } + + // Private + + add(subscription) { + this.subscriptions.push(subscription) + this.consumer.ensureActiveConnection() + this.notify(subscription, "initialized") + this.sendCommand(subscription, "subscribe") + return subscription + } + + remove(subscription) { + this.forget(subscription) + if (!this.findAll(subscription.identifier).length) { + this.sendCommand(subscription, "unsubscribe") + } + return subscription + } + + reject(identifier) { + return this.findAll(identifier).map((subscription) => { + this.forget(subscription) + this.notify(subscription, "rejected") + return subscription + }) + } + + forget(subscription) { + this.subscriptions = (this.subscriptions.filter((s) => s !== subscription)) + return subscription + } + + findAll(identifier) { + return this.subscriptions.filter((s) => s.identifier === identifier) + } + + reload() { + return this.subscriptions.map((subscription) => + this.sendCommand(subscription, "subscribe")) + } + + notifyAll(callbackName, ...args) { + return this.subscriptions.map((subscription) => + this.notify(subscription, callbackName, ...args)) + } + + notify(subscription, callbackName, ...args) { + let subscriptions + if (typeof subscription === "string") { + subscriptions = this.findAll(subscription) + } else { + subscriptions = [subscription] + } + + return subscriptions.map((subscription) => + (typeof subscription[callbackName] === "function" ? subscription[callbackName](...args) : undefined)) + } + + sendCommand(subscription, command) { + const {identifier} = subscription + return this.consumer.send({command, identifier}) + } +} |