diff options
Diffstat (limited to 'lib/assets')
-rw-r--r-- | lib/assets/javascripts/cable.js.coffee | 126 | ||||
-rw-r--r-- | lib/assets/javascripts/cable/connection.js.coffee | 53 | ||||
-rw-r--r-- | lib/assets/javascripts/cable/connection_monitor.js.coffee | 65 | ||||
-rw-r--r-- | lib/assets/javascripts/cable/consumer.js.coffee | 18 | ||||
-rw-r--r-- | lib/assets/javascripts/cable/subscriber_manager.js.coffee | 38 | ||||
-rw-r--r-- | lib/assets/javascripts/cable/subscription.js.coffee | 22 | ||||
-rw-r--r-- | lib/assets/javascripts/channel.js.coffee | 34 |
7 files changed, 201 insertions, 155 deletions
diff --git a/lib/assets/javascripts/cable.js.coffee b/lib/assets/javascripts/cable.js.coffee index 7c033d3b08..0bd1757505 100644 --- a/lib/assets/javascripts/cable.js.coffee +++ b/lib/assets/javascripts/cable.js.coffee @@ -1,124 +1,8 @@ #= require_self -#= require_tree . +#= require cable/consumer -class @Cable - MAX_CONNECTION_INTERVAL: 5 * 1000 - PING_STALE_INTERVAL: 8 +@Cable = + PING_IDENTIFIER: "_ping" - constructor: (@cableUrl) -> - @subscribers = {} - @resetPingTime() - @resetConnectionAttemptsCount() - @connect() - - connect: -> - @connection = @createConnection() - - createConnection: -> - connection = new WebSocket(@cableUrl) - connection.onmessage = @receiveData - connection.onopen = @connected - connection.onclose = @reconnect - - connection.onerror = @reconnect - connection - - isConnected: => - @connection?.readyState is 1 - - sendData: (identifier, data) => - if @isConnected() - @connection.send JSON.stringify { command: 'message', identifier: identifier, data: data } - - receiveData: (message) => - data = JSON.parse message.data - - if data.identifier is '_ping' - @pingReceived(data.message) - else - @subscribers[data.identifier]?.onReceiveData(data.message) - - connected: => - @startWaitingForPing() - @resetConnectionAttemptsCount() - - for identifier, callbacks of @subscribers - @subscribeOnServer(identifier) - callbacks['onConnect']?() - - reconnect: => - @removeExistingConnection() - - @resetPingTime() - @disconnected() - - setTimeout => - @incrementConnectionAttemptsCount() - @connect() - , @generateReconnectInterval() - - removeExistingConnection: => - if @connection? - @clearPingWaitTimeout() - - @connection.onclose = -> # no-op - @connection.onerror = -> # no-op - @connection.close() - @connection = null - - resetConnectionAttemptsCount: => - @connectionAttempts = 1 - - incrementConnectionAttemptsCount: => - @connectionAttempts += 1 - - generateReconnectInterval: () -> - interval = (Math.pow(2, @connectionAttempts) - 1) * 1000 - if interval > @MAX_CONNECTION_INTERVAL then @MAX_CONNECTION_INTERVAL else interval - - startWaitingForPing: => - @clearPingWaitTimeout() - - @waitForPingTimeout = setTimeout => - console.log "Ping took too long to arrive. Reconnecting.." - @reconnect() - , @PING_STALE_INTERVAL * 1000 - - clearPingWaitTimeout: => - clearTimeout(@waitForPingTimeout) - - resetPingTime: => - @lastPingTime = null - - disconnected: => - callbacks['onDisconnect']?() for identifier, callbacks of @subscribers - - giveUp: => - # Show an error message - - subscribe: (identifier, callbacks) => - @subscribers[identifier] = callbacks - - if @isConnected() - @subscribeOnServer(identifier) - @subscribers[identifier]['onConnect']?() - - unsubscribe: (identifier) => - @unsubscribeOnServer(identifier, 'unsubscribe') - delete @subscribers[identifier] - - subscribeOnServer: (identifier) => - if @isConnected() - @connection.send JSON.stringify { command: 'subscribe', identifier: identifier } - - unsubscribeOnServer: (identifier) => - if @isConnected() - @connection.send JSON.stringify { command: 'unsubscribe', identifier: identifier } - - pingReceived: (timestamp) => - if @lastPingTime? and (timestamp - @lastPingTime) > @PING_STALE_INTERVAL - console.log "Websocket connection is stale. Reconnecting.." - @reconnect() - else - @startWaitingForPing() - @lastPingTime = timestamp + createConsumer: (url) -> + new Cable.Consumer url diff --git a/lib/assets/javascripts/cable/connection.js.coffee b/lib/assets/javascripts/cable/connection.js.coffee new file mode 100644 index 0000000000..4f7d2abada --- /dev/null +++ b/lib/assets/javascripts/cable/connection.js.coffee @@ -0,0 +1,53 @@ +class Cable.Connection + constructor: (@consumer) -> + @open() + + send: (data) -> + if @isOpen() + @websocket.send(JSON.stringify(data)) + true + else + false + + open: => + @websocket = new WebSocket(@consumer.url) + @websocket.onmessage = @onMessage + @websocket.onopen = @onOpen + @websocket.onclose = @onClose + @websocket.onerror = @onError + + close: -> + @websocket.close() unless @isClosed() + + reopen: -> + if @isClosed() + @open() + else + @websocket.onclose = @open + @websocket.onerror = @open + @websocket.close() + + isOpen: -> + @websocket.readyState is WebSocket.OPEN + + isClosed: -> + @websocket.readyState in [ WebSocket.CLOSED, WebSocket.CLOSING ] + + onMessage: (message) => + data = JSON.parse message.data + @consumer.subscribers.notify(data.identifier, "received", data.message) + + onOpen: => + @consumer.subscribers.reload() + + onClose: => + @disconnect() + + onError: => + @disconnect() + @websocket.onclose = -> # no-op + @websocket.onerror = -> # no-op + try @close() + + disconnect: -> + @consumer.subscribers.notifyAll("disconnected") diff --git a/lib/assets/javascripts/cable/connection_monitor.js.coffee b/lib/assets/javascripts/cable/connection_monitor.js.coffee new file mode 100644 index 0000000000..fc5093c5eb --- /dev/null +++ b/lib/assets/javascripts/cable/connection_monitor.js.coffee @@ -0,0 +1,65 @@ +class Cable.ConnectionMonitor + identifier: Cable.PING_IDENTIFIER + + pollInterval: + min: 2 + max: 30 + + staleThreshold: + startedAt: 4 + pingedAt: 8 + + constructor: (@consumer) -> + @consumer.subscribers.add(this) + @start() + + connected: -> + @reset() + @pingedAt = now() + + received: -> + @pingedAt = now() + + reset: -> + @reconnectAttempts = 0 + + start: -> + @reset() + delete @stoppedAt + @startedAt = now() + @poll() + + stop: -> + @stoppedAt = now() + + poll: -> + setTimeout => + unless @stoppedAt + @reconnectIfStale() + @poll() + , @getInterval() + + getInterval: -> + {min, max} = @pollInterval + interval = 4 * Math.log(@reconnectAttempts + 1) + clamp(interval, min, max) * 1000 + + reconnectIfStale: -> + if @connectionIsStale() + @reconnectAttempts += 1 + @consumer.connection.reopen() + + connectionIsStale: -> + if @pingedAt + secondsSince(@pingedAt) > @staleThreshold.pingedAt + else + secondsSince(@startedAt) > @staleThreshold.startedAt + + now = -> + new Date().getTime() + + secondsSince = (time) -> + (now() - time) / 1000 + + clamp = (number, min, max) -> + Math.max(min, Math.min(max, number)) diff --git a/lib/assets/javascripts/cable/consumer.js.coffee b/lib/assets/javascripts/cable/consumer.js.coffee new file mode 100644 index 0000000000..b9c08807f2 --- /dev/null +++ b/lib/assets/javascripts/cable/consumer.js.coffee @@ -0,0 +1,18 @@ +#= require cable/connection +#= require cable/connection_monitor +#= require cable/subscription +#= require cable/subscriber_manager + +class Cable.Consumer + constructor: (@url) -> + @subscribers = new Cable.SubscriberManager this + @connection = new Cable.Connection this + @connectionMonitor = new Cable.ConnectionMonitor this + + createSubscription: (channelName, mixin) -> + channel = channelName + params = if typeof channel is "object" then channel else {channel} + new Cable.Subscription this, params, mixin + + send: (data) -> + @connection.send(data) diff --git a/lib/assets/javascripts/cable/subscriber_manager.js.coffee b/lib/assets/javascripts/cable/subscriber_manager.js.coffee new file mode 100644 index 0000000000..0b6a16590c --- /dev/null +++ b/lib/assets/javascripts/cable/subscriber_manager.js.coffee @@ -0,0 +1,38 @@ +class Cable.SubscriberManager + constructor: (@consumer) -> + @subscribers = [] + + add: (subscriber) -> + @subscribers.push(subscriber) + @notify(subscriber, "initialized") + if @sendCommand(subscriber, "subscribe") + @notify(subscriber, "connected") + + reload: -> + for subscriber in @subscribers + if @sendCommand(subscriber, "subscribe") + @notify(subscriber, "connected") + + remove: (subscriber) -> + @sendCommand(subscriber, "unsubscribe") + @subscribers = (s for s in @subscribers when s isnt subscriber) + + notifyAll: (callbackName, args...) -> + for subscriber in @subscribers + @notify(subscriber, callbackName, args...) + + notify: (subscriber, callbackName, args...) -> + if typeof subscriber is "string" + subscribers = (s for s in @subscribers when s.identifier is subscriber) + else + subscribers = [subscriber] + + for subscriber in subscribers + subscriber[callbackName]?(args...) + + sendCommand: (subscriber, command) -> + {identifier} = subscriber + if identifier is Cable.PING_IDENTIFIER + @consumer.connection.isOpen() + else + @consumer.send({command, identifier}) diff --git a/lib/assets/javascripts/cable/subscription.js.coffee b/lib/assets/javascripts/cable/subscription.js.coffee new file mode 100644 index 0000000000..74cc35a7a7 --- /dev/null +++ b/lib/assets/javascripts/cable/subscription.js.coffee @@ -0,0 +1,22 @@ +class Cable.Subscription + constructor: (@consumer, params = {}, mixin) -> + @identifier = JSON.stringify(params) + extend(this, mixin) + @consumer.subscribers.add(this) + + # Perform a channel action with the optional data passed as an attribute + perform: (action, data = {}) -> + data.action = action + @send(data) + + send: (data) -> + @consumer.send(command: "message", identifier: @identifier, data: JSON.stringify(data)) + + unsubscribe: -> + @consumer.subscribers.remove(this) + + extend = (object, properties) -> + if properties? + for key, value of properties + object[key] = value + object diff --git a/lib/assets/javascripts/channel.js.coffee b/lib/assets/javascripts/channel.js.coffee deleted file mode 100644 index 2f07affb19..0000000000 --- a/lib/assets/javascripts/channel.js.coffee +++ /dev/null @@ -1,34 +0,0 @@ -class @Cable.Channel - constructor: (params = {}) -> - @channelName ?= "#{@underscore(@constructor.name)}_channel" - - params['channel'] = @channelName - @channelIdentifier = JSON.stringify params - - cable.subscribe(@channelIdentifier, { - onConnect: @connected - onDisconnect: @disconnected - onReceiveData: @received - }) - - - connected: => - # Override in the subclass - - disconnected: => - # Override in the subclass - - received: (data) => - # Override in the subclass - - # Perform a channel action with the optional data passed as an attribute - perform: (action, data = {}) -> - data.action = action - cable.sendData @channelIdentifier, JSON.stringify data - - send: (data) -> - cable.sendData @channelIdentifier, JSON.stringify data - - - underscore: (value) -> - value.replace(/[A-Z]/g, (match) => "_#{match.toLowerCase()}").substr(1)
\ No newline at end of file |