aboutsummaryrefslogtreecommitdiffstats
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/action_cable.rb2
-rw-r--r--lib/action_cable/engine.rb4
-rw-r--r--lib/action_cable/server.rb14
-rw-r--r--lib/assets/javascripts/cable.js.coffee107
-rw-r--r--lib/assets/javascripts/channel.js.coffee27
5 files changed, 154 insertions, 0 deletions
diff --git a/lib/action_cable.rb b/lib/action_cable.rb
index e7d8f4cbb1..0681b8bdde 100644
--- a/lib/action_cable.rb
+++ b/lib/action_cable.rb
@@ -12,6 +12,8 @@ require 'active_support/callbacks'
require 'faye/websocket'
require 'celluloid'
+require 'action_cable/engine' if defined?(Rails)
+
module ActionCable
VERSION = '0.0.1'
diff --git a/lib/action_cable/engine.rb b/lib/action_cable/engine.rb
new file mode 100644
index 0000000000..6c943c7971
--- /dev/null
+++ b/lib/action_cable/engine.rb
@@ -0,0 +1,4 @@
+module ActionCable
+ class Engine < ::Rails::Engine
+ end
+end
diff --git a/lib/action_cable/server.rb b/lib/action_cable/server.rb
index 8f72d2ca7b..2449837105 100644
--- a/lib/action_cable/server.rb
+++ b/lib/action_cable/server.rb
@@ -8,6 +8,8 @@ module ActionCable
cattr_accessor(:logger, instance_reader: true) { Rails.logger }
+ PING_INTERVAL = 3
+
class << self
def register_channels(*channel_classes)
self.registered_channels += channel_classes
@@ -32,6 +34,11 @@ module ActionCable
@websocket = Faye::WebSocket.new(@env)
+ @websocket.on(:open) do |event|
+ broadcast_ping_timestamp
+ @ping_timer = EventMachine.add_periodic_timer(PING_INTERVAL) { broadcast_ping_timestamp }
+ end
+
@websocket.on(:message) do |event|
message = event.data
worker_pool.async.invoke(self, :received_data, message) if message.is_a?(String)
@@ -40,6 +47,8 @@ module ActionCable
@websocket.on(:close) do |event|
worker_pool.async.invoke(self, :cleanup_subscriptions)
worker_pool.async.invoke(self, :disconnect) if respond_to?(:disconnect)
+
+ EventMachine.cancel_timer(@ping_timer) if @ping_timer
end
@websocket.rack_response
@@ -77,6 +86,10 @@ module ActionCable
end
private
+ def broadcast_ping_timestamp
+ broadcast({ identifier: '_ping', message: Time.now.to_i }.to_json)
+ end
+
def subscribe_channel(data)
id_key = data['identifier']
id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access
@@ -108,5 +121,6 @@ module ActionCable
def invalid_request
[404, {'Content-Type' => 'text/plain'}, ['Page not found']]
end
+
end
end
diff --git a/lib/assets/javascripts/cable.js.coffee b/lib/assets/javascripts/cable.js.coffee
new file mode 100644
index 0000000000..2a70693bf0
--- /dev/null
+++ b/lib/assets/javascripts/cable.js.coffee
@@ -0,0 +1,107 @@
+#= require_self
+#= require_tree .
+
+class @Cable
+ MAX_CONNECTION_ATTEMPTS: 10
+ MAX_CONNECTION_INTERVAL: 5 * 1000
+ MAX_PING_INTERVAL: 6
+
+ constructor: (@cableUrl) ->
+ @subscribers = {}
+ @resetPingTime()
+ @resetConnectionAttemptsCount()
+ @connect()
+
+ connect: ->
+ @connection = @createConnection()
+
+ createConnection: ->
+ connection = new WebSocket(@cableUrl)
+ connection.onmessage = @receiveData
+ connection.onopen = @connected
+ connection.onclose = @reconnect
+
+ connection.onerror = @reconnect
+ connection
+
+ isConnected: =>
+ @connection?.readyState is 1
+
+ sendData: (identifier, data) =>
+ if @isConnected()
+ @connection.send JSON.stringify { action: 'message', identifier: identifier, data: data }
+
+ receiveData: (message) =>
+ data = JSON.parse message.data
+
+ if data.identifier is '_ping'
+ @pingReceived(data.message)
+ else
+ @subscribers[data.identifier]?.onReceiveData(data.message)
+
+ connected: =>
+ @resetConnectionAttemptsCount()
+
+ for identifier, callbacks of @subscribers
+ @subscribeOnServer(identifier)
+ callbacks['onConnect']?()
+
+ reconnect: =>
+ @resetPingTime()
+ @disconnected()
+
+ setTimeout =>
+ if @isMaxConnectionAttemptsReached()
+ @giveUp()
+ else
+ @incrementConnectionAttemptsCount()
+ @connect()
+ , @generateReconnectInterval()
+
+ resetConnectionAttemptsCount: =>
+ @connectionAttempts = 1
+
+ incrementConnectionAttemptsCount: =>
+ @connectionAttempts += 1
+
+ isMaxConnectionAttemptsReached: =>
+ @connectionAttempts > @MAX_CONNECTION_ATTEMPTS
+
+ generateReconnectInterval: () ->
+ interval = (Math.pow(2, @connectionAttempts) - 1) * 1000
+ if interval > @MAX_CONNECTION_INTERVAL then @MAX_CONNECTION_INTERVAL else interval
+
+ resetPingTime: () =>
+ @lastPingTime = null
+
+ disconnected: =>
+ callbacks['onDisconnect']?() for identifier, callbacks of @subscribers
+
+ giveUp: =>
+ # Show an error message
+
+ subscribe: (identifier, callbacks) =>
+ @subscribers[identifier] = callbacks
+
+ if @isConnected()
+ @subscribeOnServer(identifier)
+ @subscribers[identifier]['onConnect']?()
+
+ unsubscribe: (identifier) =>
+ @unsubscribeOnServer(identifier, 'unsubscribe')
+ delete @subscribers[identifier]
+
+ subscribeOnServer: (identifier) =>
+ if @isConnected()
+ @connection.send JSON.stringify { action: 'subscribe', identifier: identifier }
+
+ unsubscribeOnServer: (identifier) =>
+ if @isConnected()
+ @connection.send JSON.stringify { action: 'unsubscribe', identifier: identifier }
+
+ pingReceived: (timestamp) =>
+ if @lastPingTime? and (timestamp - @lastPingTime) > @MAX_PING_INTERVAL
+ console.log "Websocket connection is stale. Reconnecting.."
+ @connection.close()
+ else
+ @lastPingTime = timestamp
diff --git a/lib/assets/javascripts/channel.js.coffee b/lib/assets/javascripts/channel.js.coffee
new file mode 100644
index 0000000000..058bcc03aa
--- /dev/null
+++ b/lib/assets/javascripts/channel.js.coffee
@@ -0,0 +1,27 @@
+class @Cable.Channel
+ constructor: (params = {}) ->
+ @channelName ?= @underscore @constructor.name
+
+ params['channel'] = @channelName
+ @channelIdentifier = JSON.stringify params
+
+ cable.subscribe(@channelIdentifier, {
+ onConnect: @connected
+ onDisconnect: @disconnected
+ onReceiveData: @received
+ })
+
+ connected: =>
+ # Override in the subclass
+
+ disconnected: =>
+ # Override in the subclass
+
+ received: (data) =>
+ # Override in the subclass
+
+ send: (data) ->
+ cable.sendData @channelIdentifier, JSON.stringify data
+
+ underscore: (value) ->
+ value.replace(/[A-Z]/g, (match) => "_#{match.toLowerCase()}").substr(1) \ No newline at end of file