From c7f00661bf0cc54a73ccdb9d27fa10b0fd806e43 Mon Sep 17 00:00:00 2001 From: Javan Makhmali Date: Thu, 25 Jun 2015 10:21:53 -0400 Subject: Move connection and subscriber code into their own classes --- lib/assets/javascripts/cable.js.coffee | 134 ++------------------- lib/assets/javascripts/cable/channel.js.coffee | 22 ++++ lib/assets/javascripts/cable/connection.js.coffee | 102 ++++++++++++++++ .../javascripts/cable/subscriber_manager.js.coffee | 26 ++++ lib/assets/javascripts/channel.js.coffee | 25 ---- 5 files changed, 160 insertions(+), 149 deletions(-) create mode 100644 lib/assets/javascripts/cable/channel.js.coffee create mode 100644 lib/assets/javascripts/cable/connection.js.coffee create mode 100644 lib/assets/javascripts/cable/subscriber_manager.js.coffee delete mode 100644 lib/assets/javascripts/channel.js.coffee (limited to 'lib') diff --git a/lib/assets/javascripts/cable.js.coffee b/lib/assets/javascripts/cable.js.coffee index 9fc269f994..4c93f8f062 100644 --- a/lib/assets/javascripts/cable.js.coffee +++ b/lib/assets/javascripts/cable.js.coffee @@ -1,134 +1,20 @@ #= require_self -#= require_tree . +#= require cable/subscriber_manager +#= require cable/connection +#= require cable/channel class @Cable - MAX_CONNECTION_INTERVAL: 5 * 1000 - PING_STALE_INTERVAL: 8 - - constructor: (@cableUrl) -> - @subscribers = {} - @resetPingTime() - @resetConnectionAttemptsCount() - @connect() - - connect: -> - @connection = @createConnection() - - createConnection: -> - connection = new WebSocket(@cableUrl) - connection.onmessage = @onMessage - connection.onopen = @onConnect - connection.onclose = @onClose - connection.onerror = @onError - connection + constructor: (@url) -> + @subscribers = new Cable.SubscriberManager this + @connection = new Cable.Connection this createChannel: (channelName, mixin) -> channel = channelName params = if typeof channel is "object" then channel else {channel} new Cable.Channel this, params, mixin - isConnected: => - @connection?.readyState is 1 - - sendMessage: (identifier, data) => - if @isConnected() - @connection.send JSON.stringify { command: 'message', identifier: identifier, data: data } - - onMessage: (message) => - data = JSON.parse message.data - - if data.identifier is '_ping' - @pingReceived(data.message) - else - @subscribers[data.identifier]?.onMessage?(data.message) - - onConnect: => - @startWaitingForPing() - @resetConnectionAttemptsCount() - - for identifier, subscriber of @subscribers - @subscribeOnServer(identifier) - subscriber.onConnect?() - - onClose: => - @reconnect() - - onError: => - @reconnect() - - disconnect: -> - @removeExistingConnection() - @resetPingTime() - for identifier, subscriber of @subscribers - subscriber.onDisconnect?() - - reconnect: -> - @disconnect() - - setTimeout => - @incrementConnectionAttemptsCount() - @connect() - , @generateReconnectInterval() - - removeExistingConnection: => - if @connection? - @clearPingWaitTimeout() - - @connection.onclose = -> # no-op - @connection.onerror = -> # no-op - @connection.close() - @connection = null - - resetConnectionAttemptsCount: => - @connectionAttempts = 1 - - incrementConnectionAttemptsCount: => - @connectionAttempts += 1 - - generateReconnectInterval: () -> - interval = (Math.pow(2, @connectionAttempts) - 1) * 1000 - if interval > @MAX_CONNECTION_INTERVAL then @MAX_CONNECTION_INTERVAL else interval - - startWaitingForPing: => - @clearPingWaitTimeout() - - @waitForPingTimeout = setTimeout => - console.log "Ping took too long to arrive. Reconnecting.." - @reconnect() - , @PING_STALE_INTERVAL * 1000 - - clearPingWaitTimeout: => - clearTimeout(@waitForPingTimeout) - - resetPingTime: => - @lastPingTime = null - - giveUp: => - # Show an error message - - subscribe: (identifier, subscriber) => - @subscribers[identifier] = subscriber - - if @isConnected() - @subscribeOnServer(identifier) - subscriber.onConnect?() - - unsubscribe: (identifier) => - @unsubscribeOnServer(identifier) - delete @subscribers[identifier] - - subscribeOnServer: (identifier) => - if @isConnected() - @connection.send JSON.stringify { command: 'subscribe', identifier: identifier } - - unsubscribeOnServer: (identifier) => - if @isConnected() - @connection.send JSON.stringify { command: 'unsubscribe', identifier: identifier } + sendMessage: (identifier, data) -> + @sendCommand(identifier, "message", data) - pingReceived: (timestamp) => - if @lastPingTime? and (timestamp - @lastPingTime) > @PING_STALE_INTERVAL - console.log "Websocket connection is stale. Reconnecting.." - @reconnect() - else - @startWaitingForPing() - @lastPingTime = timestamp + sendCommand: (identifier, command, data) -> + @connection.send({identifier, command, data}) diff --git a/lib/assets/javascripts/cable/channel.js.coffee b/lib/assets/javascripts/cable/channel.js.coffee new file mode 100644 index 0000000000..645a44e140 --- /dev/null +++ b/lib/assets/javascripts/cable/channel.js.coffee @@ -0,0 +1,22 @@ +class Cable.Channel + constructor: (@cable, params = {}, mixin) -> + @identifier = JSON.stringify(params) + extend(this, mixin) + @cable.subscribers.add(@identifier, this) + + # Perform a channel action with the optional data passed as an attribute + sendAction: (action, data = {}) -> + data.action = action + @sendMessage(data) + + sendMessage: (data) -> + @cable.sendMessage(@identifier, JSON.stringify(data)) + + unsubscribe: -> + @cable.subscribers.remove(@identifier) + + extend = (object, properties) -> + if properties? + for key, value of properties + object[key] = value + object diff --git a/lib/assets/javascripts/cable/connection.js.coffee b/lib/assets/javascripts/cable/connection.js.coffee new file mode 100644 index 0000000000..a318925b97 --- /dev/null +++ b/lib/assets/javascripts/cable/connection.js.coffee @@ -0,0 +1,102 @@ +class Cable.Connection + MAX_CONNECTION_INTERVAL: 5 * 1000 + PING_STALE_INTERVAL: 8 + + constructor: (@cable) -> + @resetPingTime() + @resetConnectionAttemptsCount() + @connect() + + send: (data) -> + if @isConnected() + @websocket.send(JSON.stringify(data)) + true + else + false + + connect: -> + @websocket = @createWebSocket() + + createWebSocket: -> + ws = new WebSocket(@cable.url) + ws.onmessage = @onMessage + ws.onopen = @onConnect + ws.onclose = @onClose + ws.onerror = @onError + ws + + onMessage: (message) => + data = JSON.parse message.data + + if data.identifier is '_ping' + @pingReceived(data.message) + else + @cable.subscribers.notify(data.identifier, "onMessage", data.message) + + onConnect: => + @startWaitingForPing() + @resetConnectionAttemptsCount() + @cable.subscribers.reload() + + onClose: => + @reconnect() + + onError: => + @reconnect() + + isConnected: -> + @websocket?.readyState is 1 + + disconnect: -> + @removeExistingConnection() + @resetPingTime() + @cable.subscribers.notifyAll("onDisconnect") + + reconnect: -> + @disconnect() + + setTimeout => + @incrementConnectionAttemptsCount() + @connect() + , @generateReconnectInterval() + + removeExistingConnection: -> + if @websocket? + @clearPingWaitTimeout() + + @websocket.onclose = -> # no-op + @websocket.onerror = -> # no-op + @websocket.close() + @websocket = null + + resetConnectionAttemptsCount: -> + @connectionAttempts = 1 + + incrementConnectionAttemptsCount: -> + @connectionAttempts += 1 + + generateReconnectInterval: () -> + interval = (Math.pow(2, @connectionAttempts) - 1) * 1000 + if interval > @MAX_CONNECTION_INTERVAL then @MAX_CONNECTION_INTERVAL else interval + + startWaitingForPing: -> + @clearPingWaitTimeout() + + @waitForPingTimeout = setTimeout => + console.log "Ping took too long to arrive. Reconnecting.." + @reconnect() + , @PING_STALE_INTERVAL * 1000 + + clearPingWaitTimeout: -> + clearTimeout(@waitForPingTimeout) + + resetPingTime: -> + @lastPingTime = null + + pingReceived: (timestamp) -> + if @lastPingTime? and (timestamp - @lastPingTime) > @PING_STALE_INTERVAL + console.log "Websocket connection is stale. Reconnecting.." + @reconnect() + else + @startWaitingForPing() + @lastPingTime = timestamp diff --git a/lib/assets/javascripts/cable/subscriber_manager.js.coffee b/lib/assets/javascripts/cable/subscriber_manager.js.coffee new file mode 100644 index 0000000000..4f46efe817 --- /dev/null +++ b/lib/assets/javascripts/cable/subscriber_manager.js.coffee @@ -0,0 +1,26 @@ +class Cable.SubscriberManager + constructor: (@cable) -> + @subscribers = {} + + add: (identifier, subscriber) -> + @subscribers[identifier] = subscriber + if @cable.sendCommand(identifier, "subscribe") + @notify(identifier, "onConnect") + + reload: -> + for identifier in Object.keys(@subscribers) + if @cable.sendCommand(identifier, "subscribe") + @notify(identifier, "onConnect") + + remove: (identifier) -> + if subscriber = @subscribers[identifier] + @cable.sendCommand(identifier, "unsubscribe") + delete @subscribers[identifier] + + notifyAll: (event, args...) -> + for identifier in Object.keys(@subscribers) + @notify(identifier, event, args...) + + notify: (identifier, event, args...) -> + if subscriber = @subscribers[identifier] + subscriber[event]?(args...) diff --git a/lib/assets/javascripts/channel.js.coffee b/lib/assets/javascripts/channel.js.coffee deleted file mode 100644 index 5196d5e03f..0000000000 --- a/lib/assets/javascripts/channel.js.coffee +++ /dev/null @@ -1,25 +0,0 @@ -class @Cable.Channel - constructor: (@cable, params = {}, mixin) -> - @identifier = JSON.stringify(params) - extend(this, mixin) - @subscribe(@identifier, this) - - # Perform a channel action with the optional data passed as an attribute - sendAction: (action, data = {}) -> - data.action = action - @sendMessage(data) - - sendMessage: (data) -> - @cable.sendMessage(@identifier, JSON.stringify(data)) - - subscribe: -> - @cable.subscribe(@identifier, this) - - unsubscribe: -> - @cable.unsubscribe(@identifier) - - extend = (object, properties) -> - if properties? - for key, value of properties - object[key] = value - object -- cgit v1.2.3