diff options
Diffstat (limited to 'actioncable/app/javascript/action_cable')
8 files changed, 577 insertions, 0 deletions
diff --git a/actioncable/app/javascript/action_cable/adapters.js b/actioncable/app/javascript/action_cable/adapters.js new file mode 100644 index 0000000000..4de8131438 --- /dev/null +++ b/actioncable/app/javascript/action_cable/adapters.js @@ -0,0 +1,4 @@ +export default { + logger: self.console, + WebSocket: self.WebSocket +} diff --git a/actioncable/app/javascript/action_cable/connection.js b/actioncable/app/javascript/action_cable/connection.js new file mode 100644 index 0000000000..96bac132c1 --- /dev/null +++ b/actioncable/app/javascript/action_cable/connection.js @@ -0,0 +1,165 @@ +import adapters from "./adapters" +import ConnectionMonitor from "./connection_monitor" +import INTERNAL from "./internal" +import logger from "./logger" + +// Encapsulate the cable connection held by the consumer. This is an internal class not intended for direct user manipulation. + +const {message_types, protocols} = INTERNAL +const supportedProtocols = protocols.slice(0, protocols.length - 1) + +const indexOf = [].indexOf + +class Connection { + constructor(consumer) { + this.open = this.open.bind(this) + this.consumer = consumer + this.subscriptions = this.consumer.subscriptions + this.monitor = new ConnectionMonitor(this) + this.disconnected = true + } + + send(data) { + if (this.isOpen()) { + this.webSocket.send(JSON.stringify(data)) + return true + } else { + return false + } + } + + open() { + if (this.isActive()) { + logger.log(`Attempted to open WebSocket, but existing socket is ${this.getState()}`) + return false + } else { + logger.log(`Opening WebSocket, current state is ${this.getState()}, subprotocols: ${protocols}`) + if (this.webSocket) { this.uninstallEventHandlers() } + this.webSocket = new adapters.WebSocket(this.consumer.url, protocols) + this.installEventHandlers() + this.monitor.start() + return true + } + } + + close({allowReconnect} = {allowReconnect: true}) { + if (!allowReconnect) { this.monitor.stop() } + if (this.isActive()) { + return this.webSocket.close() + } + } + + reopen() { + logger.log(`Reopening WebSocket, current state is ${this.getState()}`) + if (this.isActive()) { + try { + return this.close() + } catch (error) { + logger.log("Failed to reopen WebSocket", error) + } + finally { + logger.log(`Reopening WebSocket in ${this.constructor.reopenDelay}ms`) + setTimeout(this.open, this.constructor.reopenDelay) + } + } else { + return this.open() + } + } + + getProtocol() { + if (this.webSocket) { + return this.webSocket.protocol + } + } + + isOpen() { + return this.isState("open") + } + + isActive() { + return this.isState("open", "connecting") + } + + // Private + + isProtocolSupported() { + return indexOf.call(supportedProtocols, this.getProtocol()) >= 0 + } + + isState(...states) { + return indexOf.call(states, this.getState()) >= 0 + } + + getState() { + if (this.webSocket) { + for (let state in adapters.WebSocket) { + if (adapters.WebSocket[state] === this.webSocket.readyState) { + return state.toLowerCase() + } + } + } + return null + } + + installEventHandlers() { + for (let eventName in this.events) { + const handler = this.events[eventName].bind(this) + this.webSocket[`on${eventName}`] = handler + } + } + + uninstallEventHandlers() { + for (let eventName in this.events) { + this.webSocket[`on${eventName}`] = function() {} + } + } + +} + +Connection.reopenDelay = 500 + +Connection.prototype.events = { + message(event) { + if (!this.isProtocolSupported()) { return } + const {identifier, message, reason, reconnect, type} = JSON.parse(event.data) + switch (type) { + case message_types.welcome: + this.monitor.recordConnect() + return this.subscriptions.reload() + case message_types.disconnect: + logger.log(`Disconnecting. Reason: ${reason}`) + return this.close({allowReconnect: reconnect}) + case message_types.ping: + return this.monitor.recordPing() + case message_types.confirmation: + return this.subscriptions.notify(identifier, "connected") + case message_types.rejection: + return this.subscriptions.reject(identifier) + default: + return this.subscriptions.notify(identifier, "received", message) + } + }, + + open() { + logger.log(`WebSocket onopen event, using '${this.getProtocol()}' subprotocol`) + this.disconnected = false + if (!this.isProtocolSupported()) { + logger.log("Protocol is unsupported. Stopping monitor and disconnecting.") + return this.close({allowReconnect: false}) + } + }, + + close(event) { + logger.log("WebSocket onclose event") + if (this.disconnected) { return } + this.disconnected = true + this.monitor.recordDisconnect() + return this.subscriptions.notifyAll("disconnected", {willAttemptReconnect: this.monitor.isRunning()}) + }, + + error() { + logger.log("WebSocket onerror event") + } +} + +export default Connection diff --git a/actioncable/app/javascript/action_cable/connection_monitor.js b/actioncable/app/javascript/action_cable/connection_monitor.js new file mode 100644 index 0000000000..312a71d154 --- /dev/null +++ b/actioncable/app/javascript/action_cable/connection_monitor.js @@ -0,0 +1,126 @@ +import logger from "./logger" + +// Responsible for ensuring the cable connection is in good health by validating the heartbeat pings sent from the server, and attempting +// revival reconnections if things go astray. Internal class, not intended for direct user manipulation. + +const now = () => new Date().getTime() + +const secondsSince = time => (now() - time) / 1000 + +const clamp = (number, min, max) => Math.max(min, Math.min(max, number)) + +class ConnectionMonitor { + constructor(connection) { + this.visibilityDidChange = this.visibilityDidChange.bind(this) + this.connection = connection + this.reconnectAttempts = 0 + } + + start() { + if (!this.isRunning()) { + this.startedAt = now() + delete this.stoppedAt + this.startPolling() + addEventListener("visibilitychange", this.visibilityDidChange) + logger.log(`ConnectionMonitor started. pollInterval = ${this.getPollInterval()} ms`) + } + } + + stop() { + if (this.isRunning()) { + this.stoppedAt = now() + this.stopPolling() + removeEventListener("visibilitychange", this.visibilityDidChange) + logger.log("ConnectionMonitor stopped") + } + } + + isRunning() { + return this.startedAt && !this.stoppedAt + } + + recordPing() { + this.pingedAt = now() + } + + recordConnect() { + this.reconnectAttempts = 0 + this.recordPing() + delete this.disconnectedAt + logger.log("ConnectionMonitor recorded connect") + } + + recordDisconnect() { + this.disconnectedAt = now() + logger.log("ConnectionMonitor recorded disconnect") + } + + // Private + + startPolling() { + this.stopPolling() + this.poll() + } + + stopPolling() { + clearTimeout(this.pollTimeout) + } + + poll() { + this.pollTimeout = setTimeout(() => { + this.reconnectIfStale() + this.poll() + } + , this.getPollInterval()) + } + + getPollInterval() { + const {min, max, multiplier} = this.constructor.pollInterval + const interval = multiplier * Math.log(this.reconnectAttempts + 1) + return Math.round(clamp(interval, min, max) * 1000) + } + + reconnectIfStale() { + if (this.connectionIsStale()) { + logger.log(`ConnectionMonitor detected stale connection. reconnectAttempts = ${this.reconnectAttempts}, pollInterval = ${this.getPollInterval()} ms, time disconnected = ${secondsSince(this.disconnectedAt)} s, stale threshold = ${this.constructor.staleThreshold} s`) + this.reconnectAttempts++ + if (this.disconnectedRecently()) { + logger.log("ConnectionMonitor skipping reopening recent disconnect") + } else { + logger.log("ConnectionMonitor reopening") + this.connection.reopen() + } + } + } + + connectionIsStale() { + return secondsSince(this.pingedAt ? this.pingedAt : this.startedAt) > this.constructor.staleThreshold + } + + disconnectedRecently() { + return this.disconnectedAt && (secondsSince(this.disconnectedAt) < this.constructor.staleThreshold) + } + + visibilityDidChange() { + if (document.visibilityState === "visible") { + setTimeout(() => { + if (this.connectionIsStale() || !this.connection.isOpen()) { + logger.log(`ConnectionMonitor reopening stale connection on visibilitychange. visbilityState = ${document.visibilityState}`) + this.connection.reopen() + } + } + , 200) + } + } + +} + +ConnectionMonitor.pollInterval = { + min: 3, + max: 30, + multiplier: 5 +} + +ConnectionMonitor.staleThreshold = 6 // Server::Connections::BEAT_INTERVAL * 2 (missed two pings) + +export default ConnectionMonitor diff --git a/actioncable/app/javascript/action_cable/consumer.js b/actioncable/app/javascript/action_cable/consumer.js new file mode 100644 index 0000000000..e8440f39f5 --- /dev/null +++ b/actioncable/app/javascript/action_cable/consumer.js @@ -0,0 +1,54 @@ +import Connection from "./connection" +import Subscriptions from "./subscriptions" + +// The ActionCable.Consumer establishes the connection to a server-side Ruby Connection object. Once established, +// the ActionCable.ConnectionMonitor will ensure that its properly maintained through heartbeats and checking for stale updates. +// The Consumer instance is also the gateway to establishing subscriptions to desired channels through the #createSubscription +// method. +// +// The following example shows how this can be setup: +// +// App = {} +// App.cable = ActionCable.createConsumer("ws://example.com/accounts/1") +// App.appearance = App.cable.subscriptions.create("AppearanceChannel") +// +// For more details on how you'd configure an actual channel subscription, see ActionCable.Subscription. +// +// When a consumer is created, it automatically connects with the server. +// +// To disconnect from the server, call +// +// App.cable.disconnect() +// +// and to restart the connection: +// +// App.cable.connect() +// +// Any channel subscriptions which existed prior to disconnecting will +// automatically resubscribe. + +export default class Consumer { + constructor(url) { + this.url = url + this.subscriptions = new Subscriptions(this) + this.connection = new Connection(this) + } + + send(data) { + return this.connection.send(data) + } + + connect() { + return this.connection.open() + } + + disconnect() { + return this.connection.close({allowReconnect: false}) + } + + ensureActiveConnection() { + if (!this.connection.isActive()) { + return this.connection.open() + } + } +} diff --git a/actioncable/app/javascript/action_cable/index.js b/actioncable/app/javascript/action_cable/index.js new file mode 100644 index 0000000000..659418396f --- /dev/null +++ b/actioncable/app/javascript/action_cable/index.js @@ -0,0 +1,43 @@ +import Connection from "./connection" +import ConnectionMonitor from "./connection_monitor" +import Consumer from "./consumer" +import INTERNAL from "./internal" +import Subscription from "./subscription" +import Subscriptions from "./subscriptions" +import adapters from "./adapters" +import logger from "./logger" + +export { + Connection, + ConnectionMonitor, + Consumer, + INTERNAL, + Subscription, + Subscriptions, + adapters, + logger, +} + +export function createConsumer(url = getConfig("url") || INTERNAL.default_mount_path) { + return new Consumer(createWebSocketURL(url)) +} + +export function getConfig(name) { + const element = document.head.querySelector(`meta[name='action-cable-${name}']`) + if (element) { + return element.getAttribute("content") + } +} + +export function createWebSocketURL(url) { + if (url && !/^wss?:/i.test(url)) { + const a = document.createElement("a") + a.href = url + // Fix populating Location properties in IE. Otherwise, protocol will be blank. + a.href = a.href + a.protocol = a.protocol.replace("http", "ws") + return a.href + } else { + return url + } +} diff --git a/actioncable/app/javascript/action_cable/logger.js b/actioncable/app/javascript/action_cable/logger.js new file mode 100644 index 0000000000..ef4661ead1 --- /dev/null +++ b/actioncable/app/javascript/action_cable/logger.js @@ -0,0 +1,10 @@ +import adapters from "./adapters" + +export default { + log(...messages) { + if (this.enabled) { + messages.push(Date.now()) + adapters.logger.log("[ActionCable]", ...messages) + } + }, +} diff --git a/actioncable/app/javascript/action_cable/subscription.js b/actioncable/app/javascript/action_cable/subscription.js new file mode 100644 index 0000000000..7de08f93b3 --- /dev/null +++ b/actioncable/app/javascript/action_cable/subscription.js @@ -0,0 +1,89 @@ +// A new subscription is created through the ActionCable.Subscriptions instance available on the consumer. +// It provides a number of callbacks and a method for calling remote procedure calls on the corresponding +// Channel instance on the server side. +// +// An example demonstrates the basic functionality: +// +// App.appearance = App.cable.subscriptions.create("AppearanceChannel", { +// connected() { +// // Called once the subscription has been successfully completed +// }, +// +// disconnected({ willAttemptReconnect: boolean }) { +// // Called when the client has disconnected with the server. +// // The object will have an `willAttemptReconnect` property which +// // says whether the client has the intention of attempting +// // to reconnect. +// }, +// +// appear() { +// this.perform('appear', {appearing_on: this.appearingOn()}) +// }, +// +// away() { +// this.perform('away') +// }, +// +// appearingOn() { +// $('main').data('appearing-on') +// } +// }) +// +// The methods #appear and #away forward their intent to the remote AppearanceChannel instance on the server +// by calling the `perform` method with the first parameter being the action (which maps to AppearanceChannel#appear/away). +// The second parameter is a hash that'll get JSON encoded and made available on the server in the data parameter. +// +// This is how the server component would look: +// +// class AppearanceChannel < ApplicationActionCable::Channel +// def subscribed +// current_user.appear +// end +// +// def unsubscribed +// current_user.disappear +// end +// +// def appear(data) +// current_user.appear on: data['appearing_on'] +// end +// +// def away +// current_user.away +// end +// end +// +// The "AppearanceChannel" name is automatically mapped between the client-side subscription creation and the server-side Ruby class name. +// The AppearanceChannel#appear/away public methods are exposed automatically to client-side invocation through the perform method. + +const extend = function(object, properties) { + if (properties != null) { + for (let key in properties) { + const value = properties[key] + object[key] = value + } + } + return object +} + +export default class Subscription { + constructor(consumer, params = {}, mixin) { + this.consumer = consumer + this.identifier = JSON.stringify(params) + extend(this, mixin) + } + + // Perform a channel action with the optional data passed as an attribute + perform(action, data = {}) { + data.action = action + return this.send(data) + } + + send(data) { + return this.consumer.send({command: "message", identifier: this.identifier, data: JSON.stringify(data)}) + } + + unsubscribe() { + return this.consumer.subscriptions.remove(this) + } +} diff --git a/actioncable/app/javascript/action_cable/subscriptions.js b/actioncable/app/javascript/action_cable/subscriptions.js new file mode 100644 index 0000000000..867cafb407 --- /dev/null +++ b/actioncable/app/javascript/action_cable/subscriptions.js @@ -0,0 +1,86 @@ +import Subscription from "./subscription" + +// Collection class for creating (and internally managing) channel subscriptions. The only method intended to be triggered by the user +// us ActionCable.Subscriptions#create, and it should be called through the consumer like so: +// +// App = {} +// App.cable = ActionCable.createConsumer("ws://example.com/accounts/1") +// App.appearance = App.cable.subscriptions.create("AppearanceChannel") +// +// For more details on how you'd configure an actual channel subscription, see ActionCable.Subscription. + +export default class Subscriptions { + constructor(consumer) { + this.consumer = consumer + this.subscriptions = [] + } + + create(channelName, mixin) { + const channel = channelName + const params = typeof channel === "object" ? channel : {channel} + const subscription = new Subscription(this.consumer, params, mixin) + return this.add(subscription) + } + + // Private + + add(subscription) { + this.subscriptions.push(subscription) + this.consumer.ensureActiveConnection() + this.notify(subscription, "initialized") + this.sendCommand(subscription, "subscribe") + return subscription + } + + remove(subscription) { + this.forget(subscription) + if (!this.findAll(subscription.identifier).length) { + this.sendCommand(subscription, "unsubscribe") + } + return subscription + } + + reject(identifier) { + return this.findAll(identifier).map((subscription) => { + this.forget(subscription) + this.notify(subscription, "rejected") + return subscription + }) + } + + forget(subscription) { + this.subscriptions = (this.subscriptions.filter((s) => s !== subscription)) + return subscription + } + + findAll(identifier) { + return this.subscriptions.filter((s) => s.identifier === identifier) + } + + reload() { + return this.subscriptions.map((subscription) => + this.sendCommand(subscription, "subscribe")) + } + + notifyAll(callbackName, ...args) { + return this.subscriptions.map((subscription) => + this.notify(subscription, callbackName, ...args)) + } + + notify(subscription, callbackName, ...args) { + let subscriptions + if (typeof subscription === "string") { + subscriptions = this.findAll(subscription) + } else { + subscriptions = [subscription] + } + + return subscriptions.map((subscription) => + (typeof subscription[callbackName] === "function" ? subscription[callbackName](...args) : undefined)) + } + + sendCommand(subscription, command) { + const {identifier} = subscription + return this.consumer.send({command, identifier}) + } +} |