diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/action_cable.rb | 2 | ||||
-rw-r--r-- | lib/action_cable/engine.rb | 4 | ||||
-rw-r--r-- | lib/action_cable/server.rb | 14 | ||||
-rw-r--r-- | lib/assets/javascripts/cable.js.coffee | 107 | ||||
-rw-r--r-- | lib/assets/javascripts/channel.js.coffee | 27 |
5 files changed, 154 insertions, 0 deletions
diff --git a/lib/action_cable.rb b/lib/action_cable.rb index e7d8f4cbb1..0681b8bdde 100644 --- a/lib/action_cable.rb +++ b/lib/action_cable.rb @@ -12,6 +12,8 @@ require 'active_support/callbacks' require 'faye/websocket' require 'celluloid' +require 'action_cable/engine' if defined?(Rails) + module ActionCable VERSION = '0.0.1' diff --git a/lib/action_cable/engine.rb b/lib/action_cable/engine.rb new file mode 100644 index 0000000000..6c943c7971 --- /dev/null +++ b/lib/action_cable/engine.rb @@ -0,0 +1,4 @@ +module ActionCable + class Engine < ::Rails::Engine + end +end diff --git a/lib/action_cable/server.rb b/lib/action_cable/server.rb index 8f72d2ca7b..2449837105 100644 --- a/lib/action_cable/server.rb +++ b/lib/action_cable/server.rb @@ -8,6 +8,8 @@ module ActionCable cattr_accessor(:logger, instance_reader: true) { Rails.logger } + PING_INTERVAL = 3 + class << self def register_channels(*channel_classes) self.registered_channels += channel_classes @@ -32,6 +34,11 @@ module ActionCable @websocket = Faye::WebSocket.new(@env) + @websocket.on(:open) do |event| + broadcast_ping_timestamp + @ping_timer = EventMachine.add_periodic_timer(PING_INTERVAL) { broadcast_ping_timestamp } + end + @websocket.on(:message) do |event| message = event.data worker_pool.async.invoke(self, :received_data, message) if message.is_a?(String) @@ -40,6 +47,8 @@ module ActionCable @websocket.on(:close) do |event| worker_pool.async.invoke(self, :cleanup_subscriptions) worker_pool.async.invoke(self, :disconnect) if respond_to?(:disconnect) + + EventMachine.cancel_timer(@ping_timer) if @ping_timer end @websocket.rack_response @@ -77,6 +86,10 @@ module ActionCable end private + def broadcast_ping_timestamp + broadcast({ identifier: '_ping', message: Time.now.to_i }.to_json) + end + def subscribe_channel(data) id_key = data['identifier'] id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access @@ -108,5 +121,6 @@ module ActionCable def invalid_request [404, {'Content-Type' => 'text/plain'}, ['Page not found']] end + end end diff --git a/lib/assets/javascripts/cable.js.coffee b/lib/assets/javascripts/cable.js.coffee new file mode 100644 index 0000000000..2a70693bf0 --- /dev/null +++ b/lib/assets/javascripts/cable.js.coffee @@ -0,0 +1,107 @@ +#= require_self +#= require_tree . + +class @Cable + MAX_CONNECTION_ATTEMPTS: 10 + MAX_CONNECTION_INTERVAL: 5 * 1000 + MAX_PING_INTERVAL: 6 + + constructor: (@cableUrl) -> + @subscribers = {} + @resetPingTime() + @resetConnectionAttemptsCount() + @connect() + + connect: -> + @connection = @createConnection() + + createConnection: -> + connection = new WebSocket(@cableUrl) + connection.onmessage = @receiveData + connection.onopen = @connected + connection.onclose = @reconnect + + connection.onerror = @reconnect + connection + + isConnected: => + @connection?.readyState is 1 + + sendData: (identifier, data) => + if @isConnected() + @connection.send JSON.stringify { action: 'message', identifier: identifier, data: data } + + receiveData: (message) => + data = JSON.parse message.data + + if data.identifier is '_ping' + @pingReceived(data.message) + else + @subscribers[data.identifier]?.onReceiveData(data.message) + + connected: => + @resetConnectionAttemptsCount() + + for identifier, callbacks of @subscribers + @subscribeOnServer(identifier) + callbacks['onConnect']?() + + reconnect: => + @resetPingTime() + @disconnected() + + setTimeout => + if @isMaxConnectionAttemptsReached() + @giveUp() + else + @incrementConnectionAttemptsCount() + @connect() + , @generateReconnectInterval() + + resetConnectionAttemptsCount: => + @connectionAttempts = 1 + + incrementConnectionAttemptsCount: => + @connectionAttempts += 1 + + isMaxConnectionAttemptsReached: => + @connectionAttempts > @MAX_CONNECTION_ATTEMPTS + + generateReconnectInterval: () -> + interval = (Math.pow(2, @connectionAttempts) - 1) * 1000 + if interval > @MAX_CONNECTION_INTERVAL then @MAX_CONNECTION_INTERVAL else interval + + resetPingTime: () => + @lastPingTime = null + + disconnected: => + callbacks['onDisconnect']?() for identifier, callbacks of @subscribers + + giveUp: => + # Show an error message + + subscribe: (identifier, callbacks) => + @subscribers[identifier] = callbacks + + if @isConnected() + @subscribeOnServer(identifier) + @subscribers[identifier]['onConnect']?() + + unsubscribe: (identifier) => + @unsubscribeOnServer(identifier, 'unsubscribe') + delete @subscribers[identifier] + + subscribeOnServer: (identifier) => + if @isConnected() + @connection.send JSON.stringify { action: 'subscribe', identifier: identifier } + + unsubscribeOnServer: (identifier) => + if @isConnected() + @connection.send JSON.stringify { action: 'unsubscribe', identifier: identifier } + + pingReceived: (timestamp) => + if @lastPingTime? and (timestamp - @lastPingTime) > @MAX_PING_INTERVAL + console.log "Websocket connection is stale. Reconnecting.." + @connection.close() + else + @lastPingTime = timestamp diff --git a/lib/assets/javascripts/channel.js.coffee b/lib/assets/javascripts/channel.js.coffee new file mode 100644 index 0000000000..058bcc03aa --- /dev/null +++ b/lib/assets/javascripts/channel.js.coffee @@ -0,0 +1,27 @@ +class @Cable.Channel + constructor: (params = {}) -> + @channelName ?= @underscore @constructor.name + + params['channel'] = @channelName + @channelIdentifier = JSON.stringify params + + cable.subscribe(@channelIdentifier, { + onConnect: @connected + onDisconnect: @disconnected + onReceiveData: @received + }) + + connected: => + # Override in the subclass + + disconnected: => + # Override in the subclass + + received: (data) => + # Override in the subclass + + send: (data) -> + cable.sendData @channelIdentifier, JSON.stringify data + + underscore: (value) -> + value.replace(/[A-Z]/g, (match) => "_#{match.toLowerCase()}").substr(1)
\ No newline at end of file |