aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--action_cable.gemspec2
-rw-r--r--lib/action_cable.rb4
-rw-r--r--lib/action_cable/broadcaster.rb17
-rw-r--r--lib/action_cable/channel.rb2
-rw-r--r--lib/action_cable/channel/base.rb10
-rw-r--r--lib/action_cable/channel/redis.rb37
-rw-r--r--lib/action_cable/channel/streams.rb40
-rw-r--r--lib/action_cable/connection.rb6
-rw-r--r--lib/action_cable/connection/base.rb196
-rw-r--r--lib/action_cable/connection/heartbeat.rb27
-rw-r--r--lib/action_cable/connection/identification.rb26
-rw-r--r--lib/action_cable/connection/identifier.rb17
-rw-r--r--lib/action_cable/connection/internal_channel.rb8
-rw-r--r--lib/action_cable/connection/message_buffer.rb51
-rw-r--r--lib/action_cable/connection/subscriptions.rb69
-rw-r--r--lib/action_cable/connection/tagged_logger_proxy.rb4
-rw-r--r--lib/action_cable/connection/web_socket.rb27
-rw-r--r--lib/action_cable/remote_connection.rb13
-rw-r--r--lib/action_cable/server.rb76
-rw-r--r--lib/action_cable/server/base.rb67
-rw-r--r--lib/action_cable/server/broadcasting.rb39
-rw-r--r--lib/action_cable/server/worker.rb32
-rw-r--r--lib/action_cable/worker.rb30
-rw-r--r--lib/assets/javascripts/cable.js.coffee126
-rw-r--r--lib/assets/javascripts/cable/connection.js.coffee53
-rw-r--r--lib/assets/javascripts/cable/connection_monitor.js.coffee65
-rw-r--r--lib/assets/javascripts/cable/consumer.js.coffee18
-rw-r--r--lib/assets/javascripts/cable/subscriber_manager.js.coffee38
-rw-r--r--lib/assets/javascripts/cable/subscription.js.coffee22
-rw-r--r--lib/assets/javascripts/channel.js.coffee34
-rw-r--r--test/channel_test.rb2
-rw-r--r--test/server_test.rb2
32 files changed, 671 insertions, 489 deletions
diff --git a/action_cable.gemspec b/action_cable.gemspec
index 714256a73e..1d68c2b0a5 100644
--- a/action_cable.gemspec
+++ b/action_cable.gemspec
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|
s.platform = Gem::Platform::RUBY
s.name = 'action_cable'
- s.version = '0.0.3'
+ s.version = '0.1.0'
s.summary = 'Framework for websockets.'
s.description = 'Action Cable is a framework for realtime communication over websockets.'
diff --git a/lib/action_cable.rb b/lib/action_cable.rb
index 26b3980deb..aaf48efa4b 100644
--- a/lib/action_cable.rb
+++ b/lib/action_cable.rb
@@ -19,10 +19,10 @@ require 'action_cable/engine' if defined?(Rails)
module ActionCable
VERSION = '0.0.3'
- autoload :Channel, 'action_cable/channel'
- autoload :Worker, 'action_cable/worker'
autoload :Server, 'action_cable/server'
autoload :Connection, 'action_cable/connection'
+ autoload :Channel, 'action_cable/channel'
+
autoload :RemoteConnection, 'action_cable/remote_connection'
autoload :RemoteConnections, 'action_cable/remote_connections'
autoload :Broadcaster, 'action_cable/broadcaster'
diff --git a/lib/action_cable/broadcaster.rb b/lib/action_cable/broadcaster.rb
deleted file mode 100644
index 7d8cc90970..0000000000
--- a/lib/action_cable/broadcaster.rb
+++ /dev/null
@@ -1,17 +0,0 @@
-module ActionCable
- class Broadcaster
- attr_reader :server, :channel, :redis
- delegate :logger, to: :server
-
- def initialize(server, channel)
- @server = server
- @channel = channel
- @redis = @server.threaded_redis
- end
-
- def broadcast(message)
- logger.info "[ActionCable] Broadcasting to #{channel}: #{message}"
- redis.publish channel, message.to_json
- end
- end
-end
diff --git a/lib/action_cable/channel.rb b/lib/action_cable/channel.rb
index 94cdc8d722..0432052514 100644
--- a/lib/action_cable/channel.rb
+++ b/lib/action_cable/channel.rb
@@ -1,7 +1,7 @@
module ActionCable
module Channel
autoload :Callbacks, 'action_cable/channel/callbacks'
- autoload :Redis, 'action_cable/channel/redis'
+ autoload :Streams, 'action_cable/channel/streams'
autoload :Base, 'action_cable/channel/base'
end
end
diff --git a/lib/action_cable/channel/base.rb b/lib/action_cable/channel/base.rb
index 12a5789bdc..335d2d9d7c 100644
--- a/lib/action_cable/channel/base.rb
+++ b/lib/action_cable/channel/base.rb
@@ -2,7 +2,7 @@ module ActionCable
module Channel
class Base
include Callbacks
- include Redis
+ include Streams
on_subscribe :start_periodic_timers
on_unsubscribe :stop_periodic_timers
@@ -10,16 +10,10 @@ module ActionCable
attr_reader :params, :connection
delegate :logger, to: :connection
- class_attribute :channel_name
-
class << self
def matches?(identifier)
raise "Please implement #{name}#matches? method"
end
-
- def find_name
- @name ||= channel_name || to_s.demodulize.underscore
- end
end
def initialize(connection, channel_identifier, params = {})
@@ -138,4 +132,4 @@ module ActionCable
end
end
end
-end \ No newline at end of file
+end
diff --git a/lib/action_cable/channel/redis.rb b/lib/action_cable/channel/redis.rb
deleted file mode 100644
index 0f77dc0418..0000000000
--- a/lib/action_cable/channel/redis.rb
+++ /dev/null
@@ -1,37 +0,0 @@
-module ActionCable
- module Channel
- module Redis
- extend ActiveSupport::Concern
-
- included do
- on_unsubscribe :unsubscribe_from_all_channels
- delegate :pubsub, to: :connection
- end
-
- def subscribe_to(redis_channel, callback = nil)
- callback ||= default_subscription_callback(redis_channel)
- @_redis_channels ||= []
- @_redis_channels << [ redis_channel, callback ]
-
- pubsub.subscribe(redis_channel, &callback)
- logger.info "#{channel_name} subscribed to broadcasts from #{redis_channel}"
- end
-
- def unsubscribe_from_all_channels
- if @_redis_channels
- @_redis_channels.each do |redis_channel, callback|
- pubsub.unsubscribe_proc(redis_channel, callback)
- logger.info "#{channel_name} unsubscribed to broadcasts from #{redis_channel}"
- end
- end
- end
-
- protected
- def default_subscription_callback(channel)
- -> (message) do
- transmit ActiveSupport::JSON.decode(message), via: "broadcast from #{channel}"
- end
- end
- end
- end
-end
diff --git a/lib/action_cable/channel/streams.rb b/lib/action_cable/channel/streams.rb
new file mode 100644
index 0000000000..3eac776e61
--- /dev/null
+++ b/lib/action_cable/channel/streams.rb
@@ -0,0 +1,40 @@
+module ActionCable
+ module Channel
+ module Streams
+ extend ActiveSupport::Concern
+
+ included do
+ on_unsubscribe :stop_all_streams
+ end
+
+ def stream_from(broadcasting, callback = nil)
+ callback ||= default_stream_callback(broadcasting)
+
+ streams << [ broadcasting, callback ]
+ pubsub.subscribe broadcasting, &callback
+
+ logger.info "#{channel_name} is streaming from #{broadcasting}"
+ end
+
+ def stop_all_streams
+ streams.each do |broadcasting, callback|
+ pubsub.unsubscribe_proc broadcasting, callback
+ logger.info "#{channel_name} stopped streaming from #{broadcasting}"
+ end
+ end
+
+ private
+ delegate :pubsub, to: :connection
+
+ def streams
+ @_streams ||= []
+ end
+
+ def default_stream_callback(broadcasting)
+ -> (message) do
+ transmit ActiveSupport::JSON.decode(message), via: "streamed from #{broadcasting}"
+ end
+ end
+ end
+ end
+end
diff --git a/lib/action_cable/connection.rb b/lib/action_cable/connection.rb
index a9048926e4..1b4a6ecc23 100644
--- a/lib/action_cable/connection.rb
+++ b/lib/action_cable/connection.rb
@@ -1,8 +1,12 @@
module ActionCable
module Connection
autoload :Base, 'action_cable/connection/base'
+ autoload :Heartbeat, 'action_cable/connection/heartbeat'
+ autoload :Identification, 'action_cable/connection/identification'
autoload :InternalChannel, 'action_cable/connection/internal_channel'
- autoload :Identifier, 'action_cable/connection/identifier'
+ autoload :MessageBuffer, 'action_cable/connection/message_buffer'
+ autoload :WebSocket, 'action_cable/connection/web_socket'
+ autoload :Subscriptions, 'action_cable/connection/subscriptions'
autoload :TaggedLoggerProxy, 'action_cable/connection/tagged_logger_proxy'
end
end
diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb
index 89d0844031..69c0db9167 100644
--- a/lib/action_cable/connection/base.rb
+++ b/lib/action_cable/connection/base.rb
@@ -1,113 +1,68 @@
module ActionCable
module Connection
class Base
- include InternalChannel, Identifier
+ include Identification
+ include InternalChannel
- PING_INTERVAL = 3
-
- class_attribute :identifiers
- self.identifiers = Set.new
-
- def self.identified_by(*identifiers)
- self.identifiers += identifiers
- end
-
- attr_reader :env, :server, :logger
+ attr_reader :server, :env
delegate :worker_pool, :pubsub, to: :server
+ attr_reader :logger
+
def initialize(server, env)
- @started_at = Time.now
+ @server, @env = server, env
- @server = server
- @env = env
- @accept_messages = false
- @pending_messages = []
- @subscriptions = {}
+ @logger = new_tagged_logger
- @logger = TaggedLoggerProxy.new(server.logger, tags: log_tags)
+ @websocket = ActionCable::Connection::WebSocket.new(env)
+ @heartbeat = ActionCable::Connection::Heartbeat.new(self)
+ @subscriptions = ActionCable::Connection::Subscriptions.new(self)
+ @message_buffer = ActionCable::Connection::MessageBuffer.new(self)
+
+ @started_at = Time.now
end
def process
logger.info started_request_message
- if websocket?
- @websocket = Faye::WebSocket.new(@env)
-
- @websocket.on(:open) do |event|
- transmit_ping_timestamp
- @ping_timer = EventMachine.add_periodic_timer(PING_INTERVAL) { transmit_ping_timestamp }
- worker_pool.async.invoke(self, :initialize_connection)
- end
-
- @websocket.on(:message) do |event|
- message = event.data
-
- if message.is_a?(String)
- if @accept_messages
- worker_pool.async.invoke(self, :received_data, message)
- else
- @pending_messages << message
- end
- end
- end
-
- @websocket.on(:close) do |event|
- logger.info finished_request_message
-
- worker_pool.async.invoke(self, :on_connection_closed)
- EventMachine.cancel_timer(@ping_timer) if @ping_timer
- end
-
- @websocket.rack_response
+ if websocket.possible?
+ websocket.on(:open) { |event| send_async :on_open }
+ websocket.on(:message) { |event| on_message event.data }
+ websocket.on(:close) { |event| send_async :on_close }
+
+ respond_to_successful_request
else
- invalid_request
+ respond_to_invalid_request
end
end
- def received_data(data)
- return unless websocket_alive?
-
- data = ActiveSupport::JSON.decode data
-
- case data['command']
- when 'subscribe'
- subscribe_channel(data)
- when 'unsubscribe'
- unsubscribe_channel(data)
- when 'message'
- process_message(data)
+ def receive(data_in_json)
+ if websocket.alive?
+ subscriptions.execute_command ActiveSupport::JSON.decode(data_in_json)
else
- logger.error "Received unrecognized command in #{data.inspect}"
- end
- end
-
- def cleanup_subscriptions
- @subscriptions.each do |id, channel|
- channel.perform_disconnection
+ logger.error "Received data without a live websocket (#{data.inspect})"
end
end
def transmit(data)
- @websocket.send data
+ websocket.transmit data
end
- def statistics
- {
- identifier: connection_identifier,
- started_at: @started_at,
- subscriptions: @subscriptions.keys
- }
+ def close
+ logger.error "Closing connection"
+ websocket.close
end
- def handle_exception
- close_connection
+
+ def send_async(method, *arguments)
+ worker_pool.async.invoke(self, method, *arguments)
end
- def close_connection
- logger.error "Closing connection"
- @websocket.close
+ def statistics
+ { identifier: connection_identifier, started_at: @started_at, subscriptions: subscriptions.identifiers }
end
+
protected
def request
@request ||= ActionDispatch::Request.new(Rails.application.env_config.merge(env))
@@ -117,79 +72,59 @@ module ActionCable
request.cookie_jar
end
- def initialize_connection
+
+ private
+ attr_reader :websocket
+ attr_reader :heartbeat, :subscriptions, :message_buffer
+
+ def on_open
server.add_connection(self)
connect if respond_to?(:connect)
subscribe_to_internal_channel
+ heartbeat.start
- @accept_messages = true
- worker_pool.async.invoke(self, :received_data, @pending_messages.shift) until @pending_messages.empty?
+ message_buffer.process!
end
- def on_connection_closed
- server.remove_connection(self)
-
- cleanup_subscriptions
- unsubscribe_from_internal_channel
- disconnect if respond_to?(:disconnect)
+ def on_message(message)
+ message_buffer.append message
end
- def transmit_ping_timestamp
- transmit({ identifier: '_ping', message: Time.now.to_i }.to_json)
- end
+ def on_close
+ logger.info finished_request_message
- def subscribe_channel(data)
- id_key = data['identifier']
- id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access
+ server.remove_connection(self)
- subscription_klass = server.registered_channels.detect { |channel_klass| channel_klass.find_name == id_options[:channel] }
+ subscriptions.cleanup
+ unsubscribe_from_internal_channel
+ heartbeat.stop
- if subscription_klass
- @subscriptions[id_key] = subscription_klass.new(self, id_key, id_options)
- else
- logger.error "Subscription class not found (#{data.inspect})"
- end
- rescue Exception => e
- logger.error "Could not subscribe to channel (#{data.inspect})"
- log_exception(e)
+ disconnect if respond_to?(:disconnect)
end
- def process_message(message)
- if @subscriptions[message['identifier']]
- @subscriptions[message['identifier']].perform_action(ActiveSupport::JSON.decode message['data'])
- else
- raise "Unable to process message because no subscription was found (#{message.inspect})"
- end
- rescue Exception => e
- logger.error "Could not process message (#{message.inspect})"
- log_exception(e)
- end
- def unsubscribe_channel(data)
- logger.info "Unsubscribing from channel: #{data['identifier']}"
- @subscriptions[data['identifier']].perform_disconnection
- @subscriptions.delete(data['identifier'])
+ def respond_to_successful_request
+ websocket.rack_response
end
- def invalid_request
+ def respond_to_invalid_request
logger.info finished_request_message
- [404, {'Content-Type' => 'text/plain'}, ['Page not found']]
+ [ 404, { 'Content-Type' => 'text/plain' }, [ 'Page not found' ] ]
end
- def websocket_alive?
- @websocket && @websocket.ready_state == Faye::WebSocket::API::OPEN
- end
- def websocket?
- @is_websocket ||= Faye::WebSocket.websocket?(@env)
+ # Tags are declared in the server but computed in the connection. This allows us per-connection tailored tags.
+ def new_tagged_logger
+ TaggedLoggerProxy.new server.logger,
+ tags: server.log_tags.map { |tag| tag.respond_to?(:call) ? tag.call(request) : tag.to_s.camelize }
end
def started_request_message
'Started %s "%s"%s for %s at %s' % [
request.request_method,
request.filtered_path,
- websocket? ? ' [Websocket]' : '',
+ websocket.possible? ? ' [Websocket]' : '',
request.ip,
Time.now.to_default_s ]
end
@@ -197,19 +132,10 @@ module ActionCable
def finished_request_message
'Finished "%s"%s for %s at %s' % [
request.filtered_path,
- websocket? ? ' [Websocket]' : '',
+ websocket.possible? ? ' [Websocket]' : '',
request.ip,
Time.now.to_default_s ]
end
-
- def log_exception(e)
- logger.error "There was an exception: #{e.class} - #{e.message}"
- logger.error e.backtrace.join("\n")
- end
-
- def log_tags
- server.log_tags.map { |tag| tag.respond_to?(:call) ? tag.call(request) : tag.to_s.camelize }
- end
end
end
end
diff --git a/lib/action_cable/connection/heartbeat.rb b/lib/action_cable/connection/heartbeat.rb
new file mode 100644
index 0000000000..47cd937c25
--- /dev/null
+++ b/lib/action_cable/connection/heartbeat.rb
@@ -0,0 +1,27 @@
+module ActionCable
+ module Connection
+ class Heartbeat
+ BEAT_INTERVAL = 3
+
+ def initialize(connection)
+ @connection = connection
+ end
+
+ def start
+ beat
+ @timer = EventMachine.add_periodic_timer(BEAT_INTERVAL) { beat }
+ end
+
+ def stop
+ EventMachine.cancel_timer(@timer) if @timer
+ end
+
+ private
+ attr_reader :connection
+
+ def beat
+ connection.transmit({ identifier: '_ping', message: Time.now.to_i }.to_json)
+ end
+ end
+ end
+end \ No newline at end of file
diff --git a/lib/action_cable/connection/identification.rb b/lib/action_cable/connection/identification.rb
new file mode 100644
index 0000000000..246636198b
--- /dev/null
+++ b/lib/action_cable/connection/identification.rb
@@ -0,0 +1,26 @@
+module ActionCable
+ module Connection
+ module Identification
+ extend ActiveSupport::Concern
+
+ included do
+ class_attribute :identifiers
+ self.identifiers = Set.new
+ end
+
+ class_methods do
+ def identified_by(*identifiers)
+ self.identifiers += identifiers
+ end
+ end
+
+ def connection_identifier
+ @connection_identifier ||= connection_gid identifiers.map { |id| instance_variable_get("@#{id}") }.compact
+ end
+
+ def connection_gid(ids)
+ ids.map { |o| o.to_global_id.to_s }.sort.join(":")
+ end
+ end
+ end
+end
diff --git a/lib/action_cable/connection/identifier.rb b/lib/action_cable/connection/identifier.rb
deleted file mode 100644
index a608fc546a..0000000000
--- a/lib/action_cable/connection/identifier.rb
+++ /dev/null
@@ -1,17 +0,0 @@
-module ActionCable
- module Connection
- module Identifier
- def internal_redis_channel
- "action_cable/#{connection_identifier}"
- end
-
- def connection_identifier
- @connection_identifier ||= connection_gid identifiers.map { |id| instance_variable_get("@#{id}")}.compact
- end
-
- def connection_gid(ids)
- ids.map {|o| o.to_global_id.to_s }.sort.join(":")
- end
- end
- end
-end
diff --git a/lib/action_cable/connection/internal_channel.rb b/lib/action_cable/connection/internal_channel.rb
index 3a11bcaf7b..70e5e58373 100644
--- a/lib/action_cable/connection/internal_channel.rb
+++ b/lib/action_cable/connection/internal_channel.rb
@@ -3,6 +3,10 @@ module ActionCable
module InternalChannel
extend ActiveSupport::Concern
+ def internal_redis_channel
+ "action_cable/#{connection_identifier}"
+ end
+
def subscribe_to_internal_channel
if connection_identifier.present?
callback = -> (message) { process_internal_message(message) }
@@ -27,13 +31,13 @@ module ActionCable
case message['type']
when 'disconnect'
logger.info "Removing connection (#{connection_identifier})"
- @websocket.close
+ websocket.close
end
rescue Exception => e
logger.error "There was an exception - #{e.class}(#{e.message})"
logger.error e.backtrace.join("\n")
- handle_exception
+ close
end
end
end
diff --git a/lib/action_cable/connection/message_buffer.rb b/lib/action_cable/connection/message_buffer.rb
new file mode 100644
index 0000000000..615266e0cb
--- /dev/null
+++ b/lib/action_cable/connection/message_buffer.rb
@@ -0,0 +1,51 @@
+module ActionCable
+ module Connection
+ class MessageBuffer
+ def initialize(connection)
+ @connection = connection
+ @buffered_messages = []
+ end
+
+ def append(message)
+ if valid? message
+ if processing?
+ receive message
+ else
+ buffer message
+ end
+ else
+ connection.logger.error "Couldn't handle non-string message: #{message.class}"
+ end
+ end
+
+ def processing?
+ @processing
+ end
+
+ def process!
+ @processing = true
+ receive_buffered_messages
+ end
+
+ private
+ attr_reader :connection
+ attr_accessor :buffered_messages
+
+ def valid?(message)
+ message.is_a?(String)
+ end
+
+ def receive(message)
+ connection.send_async :receive, message
+ end
+
+ def buffer(message)
+ buffered_messages << message
+ end
+
+ def receive_buffered_messages
+ receive buffered_messages.shift until buffered_messages.empty?
+ end
+ end
+ end
+end \ No newline at end of file
diff --git a/lib/action_cable/connection/subscriptions.rb b/lib/action_cable/connection/subscriptions.rb
new file mode 100644
index 0000000000..24ab1bdfbf
--- /dev/null
+++ b/lib/action_cable/connection/subscriptions.rb
@@ -0,0 +1,69 @@
+module ActionCable
+ module Connection
+ class Subscriptions
+ def initialize(connection)
+ @connection = connection
+ @subscriptions = {}
+ end
+
+ def execute_command(data)
+ case data['command']
+ when 'subscribe' then add data
+ when 'unsubscribe' then remove data
+ when 'message' then perform_action data
+ else
+ logger.error "Received unrecognized command in #{data.inspect}"
+ end
+ rescue Exception => e
+ logger.error "Could not execute command from #{data.inspect}) [#{e.class} - #{e.message}]: #{e.backtrace.first(5).join(" | ")}"
+ end
+
+ def add(data)
+ id_key = data['identifier']
+ id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access
+
+ subscription_klass = connection.server.registered_channels.detect do |channel_klass|
+ channel_klass == id_options[:channel].safe_constantize
+ end
+
+ if subscription_klass
+ subscriptions[id_key] ||= subscription_klass.new(connection, id_key, id_options)
+ else
+ logger.error "Subscription class not found (#{data.inspect})"
+ end
+ end
+
+ def remove(data)
+ logger.info "Unsubscribing from channel: #{data['identifier']}"
+ subscriptions[data['identifier']].perform_disconnection
+ subscriptions.delete(data['identifier'])
+ end
+
+ def perform_action(data)
+ find(data).perform_action ActiveSupport::JSON.decode(data['data'])
+ end
+
+
+ def identifiers
+ subscriptions.keys
+ end
+
+ def cleanup
+ subscriptions.each { |id, channel| channel.perform_disconnection }
+ end
+
+
+ private
+ attr_reader :connection, :subscriptions
+ delegate :logger, to: :connection
+
+ def find(data)
+ if subscription = subscriptions[data['identifier']]
+ subscription
+ else
+ raise "Unable to find subscription with identifier: #{data['identifier']}"
+ end
+ end
+ end
+ end
+end
diff --git a/lib/action_cable/connection/tagged_logger_proxy.rb b/lib/action_cable/connection/tagged_logger_proxy.rb
index d99cc2e9a3..e0c0075adf 100644
--- a/lib/action_cable/connection/tagged_logger_proxy.rb
+++ b/lib/action_cable/connection/tagged_logger_proxy.rb
@@ -1,7 +1,9 @@
module ActionCable
module Connection
+ # Allows the use of per-connection tags against the server logger. This wouldn't work using the tradional
+ # ActiveSupport::TaggedLogging-enhanced Rails.logger, as that logger will reset the tags between requests.
+ # The connection is long-lived, so it needs its own set of tags for its independent duration.
class TaggedLoggerProxy
-
def initialize(logger, tags:)
@logger = logger
@tags = tags.flatten
diff --git a/lib/action_cable/connection/web_socket.rb b/lib/action_cable/connection/web_socket.rb
new file mode 100644
index 0000000000..135a28cfe4
--- /dev/null
+++ b/lib/action_cable/connection/web_socket.rb
@@ -0,0 +1,27 @@
+module ActionCable
+ module Connection
+ # Decorate the Faye::WebSocket with helpers we need.
+ class WebSocket
+ delegate :rack_response, :close, :on, to: :websocket
+
+ def initialize(env)
+ @websocket = Faye::WebSocket.websocket?(env) ? Faye::WebSocket.new(env) : nil
+ end
+
+ def possible?
+ websocket
+ end
+
+ def alive?
+ websocket && websocket.ready_state == Faye::WebSocket::API::OPEN
+ end
+
+ def transmit(data)
+ websocket.send data
+ end
+
+ private
+ attr_reader :websocket
+ end
+ end
+end
diff --git a/lib/action_cable/remote_connection.rb b/lib/action_cable/remote_connection.rb
index 912fb6eb57..d7a3f0125d 100644
--- a/lib/action_cable/remote_connection.rb
+++ b/lib/action_cable/remote_connection.rb
@@ -2,7 +2,7 @@ module ActionCable
class RemoteConnection
class InvalidIdentifiersError < StandardError; end
- include Connection::Identifier
+ include Connection::Identification, Connection::InternalChannel
def initialize(server, ids)
@server = server
@@ -10,19 +10,16 @@ module ActionCable
end
def disconnect
- message = { type: 'disconnect' }.to_json
- redis.publish(internal_redis_channel, message)
+ server.broadcast_without_logging internal_redis_channel, type: 'disconnect'
end
def identifiers
- @server.connection_identifiers
- end
-
- def redis
- @server.threaded_redis
+ server.connection_identifiers
end
private
+ attr_reader :server
+
def set_identifier_instance_vars(ids)
raise InvalidIdentifiersError unless valid_identifiers?(ids)
ids.each { |k,v| instance_variable_set("@#{k}", v) }
diff --git a/lib/action_cable/server.rb b/lib/action_cable/server.rb
index 322fc85519..fa7bad4e32 100644
--- a/lib/action_cable/server.rb
+++ b/lib/action_cable/server.rb
@@ -1,75 +1,7 @@
module ActionCable
- class Server
- cattr_accessor(:logger, instance_reader: true) { Rails.logger }
-
- attr_accessor :registered_channels, :redis_config, :log_tags
-
- def initialize(redis_config:, channels:, worker_pool_size: 100, connection: Connection, log_tags: [ 'ActionCable' ])
- @redis_config = redis_config.with_indifferent_access
- @registered_channels = Set.new(channels)
- @worker_pool_size = worker_pool_size
- @connection_class = connection
- @log_tags = log_tags
-
- @connections = []
-
- logger.info "[ActionCable] Initialized server (redis_config: #{@redis_config.inspect}, worker_pool_size: #{@worker_pool_size})"
- end
-
- def call(env)
- @connection_class.new(self, env).process
- end
-
- def worker_pool
- @worker_pool ||= ActionCable::Worker.pool(size: @worker_pool_size)
- end
-
- def pubsub
- @pubsub ||= redis.pubsub
- end
-
- def redis
- @redis ||= begin
- redis = EM::Hiredis.connect(@redis_config[:url])
- redis.on(:reconnect_failed) do
- logger.info "[ActionCable] Redis reconnect failed."
- # logger.info "[ActionCable] Redis reconnected. Closing all the open connections."
- # @connections.map &:close_connection
- end
- redis
- end
- end
-
- def threaded_redis
- @threaded_redis ||= Redis.new(redis_config)
- end
-
- def remote_connections
- @remote_connections ||= RemoteConnections.new(self)
- end
-
- def broadcaster_for(channel)
- Broadcaster.new(self, channel)
- end
-
- def broadcast(channel, message)
- broadcaster_for(channel).broadcast(message)
- end
-
- def connection_identifiers
- @connection_class.identifiers
- end
-
- def add_connection(connection)
- @connections << connection
- end
-
- def remove_connection(connection)
- @connections.delete connection
- end
-
- def open_connections_statistics
- @connections.map(&:statistics)
- end
+ module Server
+ autoload :Base, 'action_cable/server/base'
+ autoload :Broadcasting, 'action_cable/server/broadcasting'
+ autoload :Worker, 'action_cable/server/worker'
end
end
diff --git a/lib/action_cable/server/base.rb b/lib/action_cable/server/base.rb
new file mode 100644
index 0000000000..e8109b325d
--- /dev/null
+++ b/lib/action_cable/server/base.rb
@@ -0,0 +1,67 @@
+module ActionCable
+ module Server
+ class Base
+ include ActionCable::Server::Broadcasting
+
+ cattr_accessor(:logger, instance_reader: true) { Rails.logger }
+
+ attr_accessor :registered_channels, :redis_config, :log_tags
+
+ def initialize(redis_config:, channels:, worker_pool_size: 100, connection: Connection, log_tags: [ 'ActionCable' ])
+ @redis_config = redis_config.with_indifferent_access
+ @registered_channels = Set.new(channels)
+ @worker_pool_size = worker_pool_size
+ @connection_class = connection
+ @log_tags = log_tags
+
+ @connections = []
+
+ logger.info "[ActionCable] Initialized server (redis_config: #{@redis_config.inspect}, worker_pool_size: #{@worker_pool_size})"
+ end
+
+ def call(env)
+ @connection_class.new(self, env).process
+ end
+
+ def worker_pool
+ @worker_pool ||= ActionCable::Server::Worker.pool(size: @worker_pool_size)
+ end
+
+ def pubsub
+ @pubsub ||= redis.pubsub
+ end
+
+ def redis
+ @redis ||= begin
+ redis = EM::Hiredis.connect(@redis_config[:url])
+ redis.on(:reconnect_failed) do
+ logger.info "[ActionCable] Redis reconnect failed."
+ # logger.info "[ActionCable] Redis reconnected. Closing all the open connections."
+ # @connections.map &:close
+ end
+ redis
+ end
+ end
+
+ def remote_connections
+ @remote_connections ||= RemoteConnections.new(self)
+ end
+
+ def connection_identifiers
+ @connection_class.identifiers
+ end
+
+ def add_connection(connection)
+ @connections << connection
+ end
+
+ def remove_connection(connection)
+ @connections.delete connection
+ end
+
+ def open_connections_statistics
+ @connections.map(&:statistics)
+ end
+ end
+ end
+end \ No newline at end of file
diff --git a/lib/action_cable/server/broadcasting.rb b/lib/action_cable/server/broadcasting.rb
new file mode 100644
index 0000000000..b0e51b8ba8
--- /dev/null
+++ b/lib/action_cable/server/broadcasting.rb
@@ -0,0 +1,39 @@
+module ActionCable
+ module Server
+ module Broadcasting
+ def broadcast(broadcasting, message)
+ broadcaster_for(broadcasting).broadcast(message)
+ end
+
+ def broadcast_without_logging(broadcasting, message)
+ broadcaster_for(broadcasting).broadcast_without_logging(message)
+ end
+
+ def broadcaster_for(broadcasting)
+ Broadcaster.new(self, broadcasting)
+ end
+
+ def broadcasting_redis
+ @broadcasting_redis ||= Redis.new(redis_config)
+ end
+
+ private
+ class Broadcaster
+ attr_reader :server, :broadcasting
+
+ def initialize(server, broadcasting)
+ @server, @broadcasting = server, broadcasting
+ end
+
+ def broadcast(message)
+ server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message}"
+ broadcast_without_logging(message)
+ end
+
+ def broadcast_without_logging(message)
+ server.broadcasting_redis.publish broadcasting, message.to_json
+ end
+ end
+ end
+ end
+end \ No newline at end of file
diff --git a/lib/action_cable/server/worker.rb b/lib/action_cable/server/worker.rb
new file mode 100644
index 0000000000..0491cb9ab0
--- /dev/null
+++ b/lib/action_cable/server/worker.rb
@@ -0,0 +1,32 @@
+module ActionCable
+ module Server
+ class Worker
+ include ActiveSupport::Callbacks
+ include Celluloid
+
+ define_callbacks :work
+
+ def invoke(receiver, method, *args)
+ run_callbacks :work do
+ receiver.send method, *args
+ end
+ rescue Exception => e
+ logger.error "There was an exception - #{e.class}(#{e.message})"
+ logger.error e.backtrace.join("\n")
+
+ receiver.handle_exception if receiver.respond_to?(:handle_exception)
+ end
+
+ def run_periodic_timer(channel, callback)
+ run_callbacks :work do
+ callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback)
+ end
+ end
+
+ private
+ def logger
+ ActionCable::Server::Base.logger
+ end
+ end
+ end
+end \ No newline at end of file
diff --git a/lib/action_cable/worker.rb b/lib/action_cable/worker.rb
deleted file mode 100644
index 6800a75d1d..0000000000
--- a/lib/action_cable/worker.rb
+++ /dev/null
@@ -1,30 +0,0 @@
-module ActionCable
- class Worker
- include ActiveSupport::Callbacks
- include Celluloid
-
- define_callbacks :work
-
- def invoke(receiver, method, *args)
- run_callbacks :work do
- receiver.send method, *args
- end
- rescue Exception => e
- logger.error "There was an exception - #{e.class}(#{e.message})"
- logger.error e.backtrace.join("\n")
-
- receiver.handle_exception if receiver.respond_to?(:handle_exception)
- end
-
- def run_periodic_timer(channel, callback)
- run_callbacks :work do
- callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback)
- end
- end
-
- private
- def logger
- ActionCable::Server.logger
- end
- end
-end
diff --git a/lib/assets/javascripts/cable.js.coffee b/lib/assets/javascripts/cable.js.coffee
index 7c033d3b08..0bd1757505 100644
--- a/lib/assets/javascripts/cable.js.coffee
+++ b/lib/assets/javascripts/cable.js.coffee
@@ -1,124 +1,8 @@
#= require_self
-#= require_tree .
+#= require cable/consumer
-class @Cable
- MAX_CONNECTION_INTERVAL: 5 * 1000
- PING_STALE_INTERVAL: 8
+@Cable =
+ PING_IDENTIFIER: "_ping"
- constructor: (@cableUrl) ->
- @subscribers = {}
- @resetPingTime()
- @resetConnectionAttemptsCount()
- @connect()
-
- connect: ->
- @connection = @createConnection()
-
- createConnection: ->
- connection = new WebSocket(@cableUrl)
- connection.onmessage = @receiveData
- connection.onopen = @connected
- connection.onclose = @reconnect
-
- connection.onerror = @reconnect
- connection
-
- isConnected: =>
- @connection?.readyState is 1
-
- sendData: (identifier, data) =>
- if @isConnected()
- @connection.send JSON.stringify { command: 'message', identifier: identifier, data: data }
-
- receiveData: (message) =>
- data = JSON.parse message.data
-
- if data.identifier is '_ping'
- @pingReceived(data.message)
- else
- @subscribers[data.identifier]?.onReceiveData(data.message)
-
- connected: =>
- @startWaitingForPing()
- @resetConnectionAttemptsCount()
-
- for identifier, callbacks of @subscribers
- @subscribeOnServer(identifier)
- callbacks['onConnect']?()
-
- reconnect: =>
- @removeExistingConnection()
-
- @resetPingTime()
- @disconnected()
-
- setTimeout =>
- @incrementConnectionAttemptsCount()
- @connect()
- , @generateReconnectInterval()
-
- removeExistingConnection: =>
- if @connection?
- @clearPingWaitTimeout()
-
- @connection.onclose = -> # no-op
- @connection.onerror = -> # no-op
- @connection.close()
- @connection = null
-
- resetConnectionAttemptsCount: =>
- @connectionAttempts = 1
-
- incrementConnectionAttemptsCount: =>
- @connectionAttempts += 1
-
- generateReconnectInterval: () ->
- interval = (Math.pow(2, @connectionAttempts) - 1) * 1000
- if interval > @MAX_CONNECTION_INTERVAL then @MAX_CONNECTION_INTERVAL else interval
-
- startWaitingForPing: =>
- @clearPingWaitTimeout()
-
- @waitForPingTimeout = setTimeout =>
- console.log "Ping took too long to arrive. Reconnecting.."
- @reconnect()
- , @PING_STALE_INTERVAL * 1000
-
- clearPingWaitTimeout: =>
- clearTimeout(@waitForPingTimeout)
-
- resetPingTime: =>
- @lastPingTime = null
-
- disconnected: =>
- callbacks['onDisconnect']?() for identifier, callbacks of @subscribers
-
- giveUp: =>
- # Show an error message
-
- subscribe: (identifier, callbacks) =>
- @subscribers[identifier] = callbacks
-
- if @isConnected()
- @subscribeOnServer(identifier)
- @subscribers[identifier]['onConnect']?()
-
- unsubscribe: (identifier) =>
- @unsubscribeOnServer(identifier, 'unsubscribe')
- delete @subscribers[identifier]
-
- subscribeOnServer: (identifier) =>
- if @isConnected()
- @connection.send JSON.stringify { command: 'subscribe', identifier: identifier }
-
- unsubscribeOnServer: (identifier) =>
- if @isConnected()
- @connection.send JSON.stringify { command: 'unsubscribe', identifier: identifier }
-
- pingReceived: (timestamp) =>
- if @lastPingTime? and (timestamp - @lastPingTime) > @PING_STALE_INTERVAL
- console.log "Websocket connection is stale. Reconnecting.."
- @reconnect()
- else
- @startWaitingForPing()
- @lastPingTime = timestamp
+ createConsumer: (url) ->
+ new Cable.Consumer url
diff --git a/lib/assets/javascripts/cable/connection.js.coffee b/lib/assets/javascripts/cable/connection.js.coffee
new file mode 100644
index 0000000000..4f7d2abada
--- /dev/null
+++ b/lib/assets/javascripts/cable/connection.js.coffee
@@ -0,0 +1,53 @@
+class Cable.Connection
+ constructor: (@consumer) ->
+ @open()
+
+ send: (data) ->
+ if @isOpen()
+ @websocket.send(JSON.stringify(data))
+ true
+ else
+ false
+
+ open: =>
+ @websocket = new WebSocket(@consumer.url)
+ @websocket.onmessage = @onMessage
+ @websocket.onopen = @onOpen
+ @websocket.onclose = @onClose
+ @websocket.onerror = @onError
+
+ close: ->
+ @websocket.close() unless @isClosed()
+
+ reopen: ->
+ if @isClosed()
+ @open()
+ else
+ @websocket.onclose = @open
+ @websocket.onerror = @open
+ @websocket.close()
+
+ isOpen: ->
+ @websocket.readyState is WebSocket.OPEN
+
+ isClosed: ->
+ @websocket.readyState in [ WebSocket.CLOSED, WebSocket.CLOSING ]
+
+ onMessage: (message) =>
+ data = JSON.parse message.data
+ @consumer.subscribers.notify(data.identifier, "received", data.message)
+
+ onOpen: =>
+ @consumer.subscribers.reload()
+
+ onClose: =>
+ @disconnect()
+
+ onError: =>
+ @disconnect()
+ @websocket.onclose = -> # no-op
+ @websocket.onerror = -> # no-op
+ try @close()
+
+ disconnect: ->
+ @consumer.subscribers.notifyAll("disconnected")
diff --git a/lib/assets/javascripts/cable/connection_monitor.js.coffee b/lib/assets/javascripts/cable/connection_monitor.js.coffee
new file mode 100644
index 0000000000..fc5093c5eb
--- /dev/null
+++ b/lib/assets/javascripts/cable/connection_monitor.js.coffee
@@ -0,0 +1,65 @@
+class Cable.ConnectionMonitor
+ identifier: Cable.PING_IDENTIFIER
+
+ pollInterval:
+ min: 2
+ max: 30
+
+ staleThreshold:
+ startedAt: 4
+ pingedAt: 8
+
+ constructor: (@consumer) ->
+ @consumer.subscribers.add(this)
+ @start()
+
+ connected: ->
+ @reset()
+ @pingedAt = now()
+
+ received: ->
+ @pingedAt = now()
+
+ reset: ->
+ @reconnectAttempts = 0
+
+ start: ->
+ @reset()
+ delete @stoppedAt
+ @startedAt = now()
+ @poll()
+
+ stop: ->
+ @stoppedAt = now()
+
+ poll: ->
+ setTimeout =>
+ unless @stoppedAt
+ @reconnectIfStale()
+ @poll()
+ , @getInterval()
+
+ getInterval: ->
+ {min, max} = @pollInterval
+ interval = 4 * Math.log(@reconnectAttempts + 1)
+ clamp(interval, min, max) * 1000
+
+ reconnectIfStale: ->
+ if @connectionIsStale()
+ @reconnectAttempts += 1
+ @consumer.connection.reopen()
+
+ connectionIsStale: ->
+ if @pingedAt
+ secondsSince(@pingedAt) > @staleThreshold.pingedAt
+ else
+ secondsSince(@startedAt) > @staleThreshold.startedAt
+
+ now = ->
+ new Date().getTime()
+
+ secondsSince = (time) ->
+ (now() - time) / 1000
+
+ clamp = (number, min, max) ->
+ Math.max(min, Math.min(max, number))
diff --git a/lib/assets/javascripts/cable/consumer.js.coffee b/lib/assets/javascripts/cable/consumer.js.coffee
new file mode 100644
index 0000000000..b9c08807f2
--- /dev/null
+++ b/lib/assets/javascripts/cable/consumer.js.coffee
@@ -0,0 +1,18 @@
+#= require cable/connection
+#= require cable/connection_monitor
+#= require cable/subscription
+#= require cable/subscriber_manager
+
+class Cable.Consumer
+ constructor: (@url) ->
+ @subscribers = new Cable.SubscriberManager this
+ @connection = new Cable.Connection this
+ @connectionMonitor = new Cable.ConnectionMonitor this
+
+ createSubscription: (channelName, mixin) ->
+ channel = channelName
+ params = if typeof channel is "object" then channel else {channel}
+ new Cable.Subscription this, params, mixin
+
+ send: (data) ->
+ @connection.send(data)
diff --git a/lib/assets/javascripts/cable/subscriber_manager.js.coffee b/lib/assets/javascripts/cable/subscriber_manager.js.coffee
new file mode 100644
index 0000000000..0b6a16590c
--- /dev/null
+++ b/lib/assets/javascripts/cable/subscriber_manager.js.coffee
@@ -0,0 +1,38 @@
+class Cable.SubscriberManager
+ constructor: (@consumer) ->
+ @subscribers = []
+
+ add: (subscriber) ->
+ @subscribers.push(subscriber)
+ @notify(subscriber, "initialized")
+ if @sendCommand(subscriber, "subscribe")
+ @notify(subscriber, "connected")
+
+ reload: ->
+ for subscriber in @subscribers
+ if @sendCommand(subscriber, "subscribe")
+ @notify(subscriber, "connected")
+
+ remove: (subscriber) ->
+ @sendCommand(subscriber, "unsubscribe")
+ @subscribers = (s for s in @subscribers when s isnt subscriber)
+
+ notifyAll: (callbackName, args...) ->
+ for subscriber in @subscribers
+ @notify(subscriber, callbackName, args...)
+
+ notify: (subscriber, callbackName, args...) ->
+ if typeof subscriber is "string"
+ subscribers = (s for s in @subscribers when s.identifier is subscriber)
+ else
+ subscribers = [subscriber]
+
+ for subscriber in subscribers
+ subscriber[callbackName]?(args...)
+
+ sendCommand: (subscriber, command) ->
+ {identifier} = subscriber
+ if identifier is Cable.PING_IDENTIFIER
+ @consumer.connection.isOpen()
+ else
+ @consumer.send({command, identifier})
diff --git a/lib/assets/javascripts/cable/subscription.js.coffee b/lib/assets/javascripts/cable/subscription.js.coffee
new file mode 100644
index 0000000000..74cc35a7a7
--- /dev/null
+++ b/lib/assets/javascripts/cable/subscription.js.coffee
@@ -0,0 +1,22 @@
+class Cable.Subscription
+ constructor: (@consumer, params = {}, mixin) ->
+ @identifier = JSON.stringify(params)
+ extend(this, mixin)
+ @consumer.subscribers.add(this)
+
+ # Perform a channel action with the optional data passed as an attribute
+ perform: (action, data = {}) ->
+ data.action = action
+ @send(data)
+
+ send: (data) ->
+ @consumer.send(command: "message", identifier: @identifier, data: JSON.stringify(data))
+
+ unsubscribe: ->
+ @consumer.subscribers.remove(this)
+
+ extend = (object, properties) ->
+ if properties?
+ for key, value of properties
+ object[key] = value
+ object
diff --git a/lib/assets/javascripts/channel.js.coffee b/lib/assets/javascripts/channel.js.coffee
deleted file mode 100644
index 2f07affb19..0000000000
--- a/lib/assets/javascripts/channel.js.coffee
+++ /dev/null
@@ -1,34 +0,0 @@
-class @Cable.Channel
- constructor: (params = {}) ->
- @channelName ?= "#{@underscore(@constructor.name)}_channel"
-
- params['channel'] = @channelName
- @channelIdentifier = JSON.stringify params
-
- cable.subscribe(@channelIdentifier, {
- onConnect: @connected
- onDisconnect: @disconnected
- onReceiveData: @received
- })
-
-
- connected: =>
- # Override in the subclass
-
- disconnected: =>
- # Override in the subclass
-
- received: (data) =>
- # Override in the subclass
-
- # Perform a channel action with the optional data passed as an attribute
- perform: (action, data = {}) ->
- data.action = action
- cable.sendData @channelIdentifier, JSON.stringify data
-
- send: (data) ->
- cable.sendData @channelIdentifier, JSON.stringify data
-
-
- underscore: (value) ->
- value.replace(/[A-Z]/g, (match) => "_#{match.toLowerCase()}").substr(1) \ No newline at end of file
diff --git a/test/channel_test.rb b/test/channel_test.rb
index ad5fa04356..96987977ea 100644
--- a/test/channel_test.rb
+++ b/test/channel_test.rb
@@ -8,7 +8,7 @@ class ChannelTest < ActionCableTest
end
end
- class PingServer < ActionCable::Server
+ class PingServer < ActionCable::Server::Base
register_channels PingChannel
end
diff --git a/test/server_test.rb b/test/server_test.rb
index 824875bb99..1e02497f61 100644
--- a/test/server_test.rb
+++ b/test/server_test.rb
@@ -8,7 +8,7 @@ class ServerTest < ActionCableTest
end
end
- class ChatServer < ActionCable::Server
+ class ChatServer < ActionCable::Server::Base
register_channels ChatChannel
end