diff options
author | Javan Makhmali <javan@javan.us> | 2015-06-26 14:35:39 -0400 |
---|---|---|
committer | Javan Makhmali <javan@javan.us> | 2015-06-26 14:35:39 -0400 |
commit | f4bd1aee96d50b4d0bd915774e0c8d23f4b12748 (patch) | |
tree | 6ff0fb08be146cc6255252610ca97eb083cf0914 /lib | |
parent | e3cb3696cfaa766b62d644411fe71e4e64aab85a (diff) | |
parent | 5541b8fcaf9ec40e6f16c50cb45030838a3e3450 (diff) | |
download | rails-f4bd1aee96d50b4d0bd915774e0c8d23f4b12748.tar.gz rails-f4bd1aee96d50b4d0bd915774e0c8d23f4b12748.tar.bz2 rails-f4bd1aee96d50b4d0bd915774e0c8d23f4b12748.zip |
Merge pull request #2 from basecamp/create-channel
Channel factory
Diffstat (limited to 'lib')
-rw-r--r-- | lib/action_cable/channel/base.rb | 8 | ||||
-rw-r--r-- | lib/action_cable/connection/subscriptions.rb | 4 | ||||
-rw-r--r-- | lib/assets/javascripts/cable.js.coffee | 126 | ||||
-rw-r--r-- | lib/assets/javascripts/cable/connection.js.coffee | 53 | ||||
-rw-r--r-- | lib/assets/javascripts/cable/connection_monitor.js.coffee | 41 | ||||
-rw-r--r-- | lib/assets/javascripts/cable/consumer.js.coffee | 16 | ||||
-rw-r--r-- | lib/assets/javascripts/cable/subscriber_manager.js.coffee | 36 | ||||
-rw-r--r-- | lib/assets/javascripts/cable/subscription.js.coffee | 22 | ||||
-rw-r--r-- | lib/assets/javascripts/channel.js.coffee | 33 |
9 files changed, 176 insertions, 163 deletions
diff --git a/lib/action_cable/channel/base.rb b/lib/action_cable/channel/base.rb index 83ba2cb3d2..6c55a8ed65 100644 --- a/lib/action_cable/channel/base.rb +++ b/lib/action_cable/channel/base.rb @@ -10,16 +10,10 @@ module ActionCable attr_reader :params, :connection delegate :logger, to: :connection - class_attribute :channel_name - class << self def matches?(identifier) raise "Please implement #{name}#matches? method" end - - def find_name - @name ||= channel_name || to_s.demodulize.underscore - end end def initialize(connection, channel_identifier, params = {}) @@ -138,4 +132,4 @@ module ActionCable end end end -end
\ No newline at end of file +end diff --git a/lib/action_cable/connection/subscriptions.rb b/lib/action_cable/connection/subscriptions.rb index 9e7a8a5f73..ae191c7795 100644 --- a/lib/action_cable/connection/subscriptions.rb +++ b/lib/action_cable/connection/subscriptions.rb @@ -23,7 +23,7 @@ module ActionCable id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access subscription_klass = connection.server.registered_channels.detect do |channel_klass| - channel_klass.find_name == id_options[:channel] + channel_klass == id_options[:channel].safe_constantize end if subscription_klass @@ -66,4 +66,4 @@ module ActionCable end end end -end
\ No newline at end of file +end diff --git a/lib/assets/javascripts/cable.js.coffee b/lib/assets/javascripts/cable.js.coffee index 7c033d3b08..0bd1757505 100644 --- a/lib/assets/javascripts/cable.js.coffee +++ b/lib/assets/javascripts/cable.js.coffee @@ -1,124 +1,8 @@ #= require_self -#= require_tree . +#= require cable/consumer -class @Cable - MAX_CONNECTION_INTERVAL: 5 * 1000 - PING_STALE_INTERVAL: 8 +@Cable = + PING_IDENTIFIER: "_ping" - constructor: (@cableUrl) -> - @subscribers = {} - @resetPingTime() - @resetConnectionAttemptsCount() - @connect() - - connect: -> - @connection = @createConnection() - - createConnection: -> - connection = new WebSocket(@cableUrl) - connection.onmessage = @receiveData - connection.onopen = @connected - connection.onclose = @reconnect - - connection.onerror = @reconnect - connection - - isConnected: => - @connection?.readyState is 1 - - sendData: (identifier, data) => - if @isConnected() - @connection.send JSON.stringify { command: 'message', identifier: identifier, data: data } - - receiveData: (message) => - data = JSON.parse message.data - - if data.identifier is '_ping' - @pingReceived(data.message) - else - @subscribers[data.identifier]?.onReceiveData(data.message) - - connected: => - @startWaitingForPing() - @resetConnectionAttemptsCount() - - for identifier, callbacks of @subscribers - @subscribeOnServer(identifier) - callbacks['onConnect']?() - - reconnect: => - @removeExistingConnection() - - @resetPingTime() - @disconnected() - - setTimeout => - @incrementConnectionAttemptsCount() - @connect() - , @generateReconnectInterval() - - removeExistingConnection: => - if @connection? - @clearPingWaitTimeout() - - @connection.onclose = -> # no-op - @connection.onerror = -> # no-op - @connection.close() - @connection = null - - resetConnectionAttemptsCount: => - @connectionAttempts = 1 - - incrementConnectionAttemptsCount: => - @connectionAttempts += 1 - - generateReconnectInterval: () -> - interval = (Math.pow(2, @connectionAttempts) - 1) * 1000 - if interval > @MAX_CONNECTION_INTERVAL then @MAX_CONNECTION_INTERVAL else interval - - startWaitingForPing: => - @clearPingWaitTimeout() - - @waitForPingTimeout = setTimeout => - console.log "Ping took too long to arrive. Reconnecting.." - @reconnect() - , @PING_STALE_INTERVAL * 1000 - - clearPingWaitTimeout: => - clearTimeout(@waitForPingTimeout) - - resetPingTime: => - @lastPingTime = null - - disconnected: => - callbacks['onDisconnect']?() for identifier, callbacks of @subscribers - - giveUp: => - # Show an error message - - subscribe: (identifier, callbacks) => - @subscribers[identifier] = callbacks - - if @isConnected() - @subscribeOnServer(identifier) - @subscribers[identifier]['onConnect']?() - - unsubscribe: (identifier) => - @unsubscribeOnServer(identifier, 'unsubscribe') - delete @subscribers[identifier] - - subscribeOnServer: (identifier) => - if @isConnected() - @connection.send JSON.stringify { command: 'subscribe', identifier: identifier } - - unsubscribeOnServer: (identifier) => - if @isConnected() - @connection.send JSON.stringify { command: 'unsubscribe', identifier: identifier } - - pingReceived: (timestamp) => - if @lastPingTime? and (timestamp - @lastPingTime) > @PING_STALE_INTERVAL - console.log "Websocket connection is stale. Reconnecting.." - @reconnect() - else - @startWaitingForPing() - @lastPingTime = timestamp + createConsumer: (url) -> + new Cable.Consumer url diff --git a/lib/assets/javascripts/cable/connection.js.coffee b/lib/assets/javascripts/cable/connection.js.coffee new file mode 100644 index 0000000000..cd9539a6aa --- /dev/null +++ b/lib/assets/javascripts/cable/connection.js.coffee @@ -0,0 +1,53 @@ +#= require cable/connection_monitor + +class Cable.Connection + constructor: (@consumer) -> + new Cable.ConnectionMonitor @consumer + @open() + + send: (data) -> + if @isOpen() + @websocket.send(JSON.stringify(data)) + true + else + false + + open: -> + @websocket = new WebSocket(@consumer.url) + @websocket.onmessage = @onMessage + @websocket.onopen = @onOpen + @websocket.onclose = @onClose + @websocket.onerror = @onError + @websocket + + close: -> + @websocket.close() unless @isClosed() + + reopen: -> + @close() + @open() + + isOpen: -> + @websocket.readyState is WebSocket.OPEN + + isClosed: -> + @websocket.readyState in [ WebSocket.CLOSED, WebSocket.CLOSING ] + + onMessage: (message) => + data = JSON.parse message.data + @consumer.subscribers.notify(data.identifier, "received", data.message) + + onOpen: => + @consumer.subscribers.reload() + + onClose: => + @disconnect() + + onError: => + @disconnect() + @websocket.onclose = -> # no-op + @websocket.onerror = -> # no-op + try @close() + + disconnect: -> + @consumer.subscribers.notifyAll("disconnected") diff --git a/lib/assets/javascripts/cable/connection_monitor.js.coffee b/lib/assets/javascripts/cable/connection_monitor.js.coffee new file mode 100644 index 0000000000..bb4ee8f7f6 --- /dev/null +++ b/lib/assets/javascripts/cable/connection_monitor.js.coffee @@ -0,0 +1,41 @@ +class Cable.ConnectionMonitor + MAX_CONNECTION_INTERVAL: 5 * 1000 + PING_STALE_INTERVAL: 8 * 1000 + + identifier: Cable.PING_IDENTIFIER + + constructor: (@consumer) -> + @reset() + @consumer.subscribers.add(this) + @pollConnection() + + connected: -> + @reset() + @pingedAt = now() + + received: -> + @pingedAt = now() + + reset: -> + @connectionAttempts = 1 + + pollConnection: -> + setTimeout => + @reconnect() if @connectionIsStale() + @pollConnection() + , @getPollTimeout() + + getPollTimeout: -> + interval = (Math.pow(2, @connectionAttempts) - 1) * 1000 + if interval > @MAX_CONNECTION_INTERVAL then @MAX_CONNECTION_INTERVAL else interval + + connectionIsStale: -> + @pingedAt? and (now() - @pingedAt) > @PING_STALE_INTERVAL + + reconnect: -> + console.log "Ping took too long to arrive. Reconnecting.." + @connectionAttempts += 1 + @consumer.connection.reopen() + + now = -> + new Date().getTime() diff --git a/lib/assets/javascripts/cable/consumer.js.coffee b/lib/assets/javascripts/cable/consumer.js.coffee new file mode 100644 index 0000000000..a9abd6256a --- /dev/null +++ b/lib/assets/javascripts/cable/consumer.js.coffee @@ -0,0 +1,16 @@ +#= require cable/connection +#= require cable/subscription +#= require cable/subscriber_manager + +class Cable.Consumer + constructor: (@url) -> + @subscribers = new Cable.SubscriberManager this + @connection = new Cable.Connection this + + createSubscription: (channelName, mixin) -> + channel = channelName + params = if typeof channel is "object" then channel else {channel} + new Cable.Subscription this, params, mixin + + send: (data) -> + @connection.send(data) diff --git a/lib/assets/javascripts/cable/subscriber_manager.js.coffee b/lib/assets/javascripts/cable/subscriber_manager.js.coffee new file mode 100644 index 0000000000..922c74808c --- /dev/null +++ b/lib/assets/javascripts/cable/subscriber_manager.js.coffee @@ -0,0 +1,36 @@ +class Cable.SubscriberManager + constructor: (@consumer) -> + @subscribers = [] + + add: (subscriber) -> + @subscribers.push(subscriber) + @notify(subscriber, "initialized") + if @sendCommand(subscriber, "subscribe") + @notify(subscriber, "connected") + + reload: -> + for subscriber in @subscribers + if @sendCommand(subscriber, "subscribe") + @notify(subscriber, "connected") + + remove: (subscriber) -> + @sendCommand(subscriber, "unsubscribe") + @subscribers = (s for s in @subscribers when s isnt subscriber) + + notifyAll: (callbackName, args...) -> + for subscriber in @subscribers + @notify(subscriber, callbackName, args...) + + notify: (subscriber, callbackName, args...) -> + if typeof subscriber is "string" + subscribers = (s for s in @subscribers when s.identifier is subscriber) + else + subscribers = [subscriber] + + for subscriber in subscribers + subscriber[callbackName]?(args...) + + sendCommand: (subscriber, command) -> + {identifier} = subscriber + return true if identifier is Cable.PING_IDENTIFIER + @consumer.send({command, identifier}) diff --git a/lib/assets/javascripts/cable/subscription.js.coffee b/lib/assets/javascripts/cable/subscription.js.coffee new file mode 100644 index 0000000000..74cc35a7a7 --- /dev/null +++ b/lib/assets/javascripts/cable/subscription.js.coffee @@ -0,0 +1,22 @@ +class Cable.Subscription + constructor: (@consumer, params = {}, mixin) -> + @identifier = JSON.stringify(params) + extend(this, mixin) + @consumer.subscribers.add(this) + + # Perform a channel action with the optional data passed as an attribute + perform: (action, data = {}) -> + data.action = action + @send(data) + + send: (data) -> + @consumer.send(command: "message", identifier: @identifier, data: JSON.stringify(data)) + + unsubscribe: -> + @consumer.subscribers.remove(this) + + extend = (object, properties) -> + if properties? + for key, value of properties + object[key] = value + object diff --git a/lib/assets/javascripts/channel.js.coffee b/lib/assets/javascripts/channel.js.coffee deleted file mode 100644 index c972334140..0000000000 --- a/lib/assets/javascripts/channel.js.coffee +++ /dev/null @@ -1,33 +0,0 @@ -class @Cable.Channel - constructor: (params = {}) -> - {channelName} = @constructor - - if channelName? - params['channel'] = channelName - @channelIdentifier = JSON.stringify params - else - throw new Error "This channel's constructor is missing the required 'channelName' property" - - cable.subscribe(@channelIdentifier, { - onConnect: @connected - onDisconnect: @disconnected - onReceiveData: @received - }) - - - connected: => - # Override in the subclass - - disconnected: => - # Override in the subclass - - received: (data) => - # Override in the subclass - - # Perform a channel action with the optional data passed as an attribute - perform: (action, data = {}) -> - data.action = action - cable.sendData @channelIdentifier, JSON.stringify data - - send: (data) -> - cable.sendData @channelIdentifier, JSON.stringify data |