diff options
author | David Heinemeier Hansson <david@loudthinking.com> | 2015-07-04 21:30:48 +0200 |
---|---|---|
committer | David Heinemeier Hansson <david@loudthinking.com> | 2015-07-04 21:30:48 +0200 |
commit | 0de65cf2d8860377d45020a83866073c5fec188c (patch) | |
tree | d39b559162936fd79049595d69098fd7712a8fe3 /lib | |
parent | 9886a995f5f0b32d0d400074c48221cb0f6b911e (diff) | |
parent | 5de01033150b70982f23a42670c55348a7371c4b (diff) | |
download | rails-0de65cf2d8860377d45020a83866073c5fec188c.tar.gz rails-0de65cf2d8860377d45020a83866073c5fec188c.tar.bz2 rails-0de65cf2d8860377d45020a83866073c5fec188c.zip |
Merge branch 'master' of github.com:basecamp/action_cable
Diffstat (limited to 'lib')
29 files changed, 668 insertions, 486 deletions
diff --git a/lib/action_cable.rb b/lib/action_cable.rb index 26b3980deb..aaf48efa4b 100644 --- a/lib/action_cable.rb +++ b/lib/action_cable.rb @@ -19,10 +19,10 @@ require 'action_cable/engine' if defined?(Rails) module ActionCable VERSION = '0.0.3' - autoload :Channel, 'action_cable/channel' - autoload :Worker, 'action_cable/worker' autoload :Server, 'action_cable/server' autoload :Connection, 'action_cable/connection' + autoload :Channel, 'action_cable/channel' + autoload :RemoteConnection, 'action_cable/remote_connection' autoload :RemoteConnections, 'action_cable/remote_connections' autoload :Broadcaster, 'action_cable/broadcaster' diff --git a/lib/action_cable/broadcaster.rb b/lib/action_cable/broadcaster.rb deleted file mode 100644 index 7d8cc90970..0000000000 --- a/lib/action_cable/broadcaster.rb +++ /dev/null @@ -1,17 +0,0 @@ -module ActionCable - class Broadcaster - attr_reader :server, :channel, :redis - delegate :logger, to: :server - - def initialize(server, channel) - @server = server - @channel = channel - @redis = @server.threaded_redis - end - - def broadcast(message) - logger.info "[ActionCable] Broadcasting to #{channel}: #{message}" - redis.publish channel, message.to_json - end - end -end diff --git a/lib/action_cable/channel.rb b/lib/action_cable/channel.rb index 94cdc8d722..0432052514 100644 --- a/lib/action_cable/channel.rb +++ b/lib/action_cable/channel.rb @@ -1,7 +1,7 @@ module ActionCable module Channel autoload :Callbacks, 'action_cable/channel/callbacks' - autoload :Redis, 'action_cable/channel/redis' + autoload :Streams, 'action_cable/channel/streams' autoload :Base, 'action_cable/channel/base' end end diff --git a/lib/action_cable/channel/base.rb b/lib/action_cable/channel/base.rb index 12a5789bdc..335d2d9d7c 100644 --- a/lib/action_cable/channel/base.rb +++ b/lib/action_cable/channel/base.rb @@ -2,7 +2,7 @@ module ActionCable module Channel class Base include Callbacks - include Redis + include Streams on_subscribe :start_periodic_timers on_unsubscribe :stop_periodic_timers @@ -10,16 +10,10 @@ module ActionCable attr_reader :params, :connection delegate :logger, to: :connection - class_attribute :channel_name - class << self def matches?(identifier) raise "Please implement #{name}#matches? method" end - - def find_name - @name ||= channel_name || to_s.demodulize.underscore - end end def initialize(connection, channel_identifier, params = {}) @@ -138,4 +132,4 @@ module ActionCable end end end -end
\ No newline at end of file +end diff --git a/lib/action_cable/channel/redis.rb b/lib/action_cable/channel/redis.rb deleted file mode 100644 index 0f77dc0418..0000000000 --- a/lib/action_cable/channel/redis.rb +++ /dev/null @@ -1,37 +0,0 @@ -module ActionCable - module Channel - module Redis - extend ActiveSupport::Concern - - included do - on_unsubscribe :unsubscribe_from_all_channels - delegate :pubsub, to: :connection - end - - def subscribe_to(redis_channel, callback = nil) - callback ||= default_subscription_callback(redis_channel) - @_redis_channels ||= [] - @_redis_channels << [ redis_channel, callback ] - - pubsub.subscribe(redis_channel, &callback) - logger.info "#{channel_name} subscribed to broadcasts from #{redis_channel}" - end - - def unsubscribe_from_all_channels - if @_redis_channels - @_redis_channels.each do |redis_channel, callback| - pubsub.unsubscribe_proc(redis_channel, callback) - logger.info "#{channel_name} unsubscribed to broadcasts from #{redis_channel}" - end - end - end - - protected - def default_subscription_callback(channel) - -> (message) do - transmit ActiveSupport::JSON.decode(message), via: "broadcast from #{channel}" - end - end - end - end -end diff --git a/lib/action_cable/channel/streams.rb b/lib/action_cable/channel/streams.rb new file mode 100644 index 0000000000..3eac776e61 --- /dev/null +++ b/lib/action_cable/channel/streams.rb @@ -0,0 +1,40 @@ +module ActionCable + module Channel + module Streams + extend ActiveSupport::Concern + + included do + on_unsubscribe :stop_all_streams + end + + def stream_from(broadcasting, callback = nil) + callback ||= default_stream_callback(broadcasting) + + streams << [ broadcasting, callback ] + pubsub.subscribe broadcasting, &callback + + logger.info "#{channel_name} is streaming from #{broadcasting}" + end + + def stop_all_streams + streams.each do |broadcasting, callback| + pubsub.unsubscribe_proc broadcasting, callback + logger.info "#{channel_name} stopped streaming from #{broadcasting}" + end + end + + private + delegate :pubsub, to: :connection + + def streams + @_streams ||= [] + end + + def default_stream_callback(broadcasting) + -> (message) do + transmit ActiveSupport::JSON.decode(message), via: "streamed from #{broadcasting}" + end + end + end + end +end diff --git a/lib/action_cable/connection.rb b/lib/action_cable/connection.rb index a9048926e4..1b4a6ecc23 100644 --- a/lib/action_cable/connection.rb +++ b/lib/action_cable/connection.rb @@ -1,8 +1,12 @@ module ActionCable module Connection autoload :Base, 'action_cable/connection/base' + autoload :Heartbeat, 'action_cable/connection/heartbeat' + autoload :Identification, 'action_cable/connection/identification' autoload :InternalChannel, 'action_cable/connection/internal_channel' - autoload :Identifier, 'action_cable/connection/identifier' + autoload :MessageBuffer, 'action_cable/connection/message_buffer' + autoload :WebSocket, 'action_cable/connection/web_socket' + autoload :Subscriptions, 'action_cable/connection/subscriptions' autoload :TaggedLoggerProxy, 'action_cable/connection/tagged_logger_proxy' end end diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index 89d0844031..69c0db9167 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -1,113 +1,68 @@ module ActionCable module Connection class Base - include InternalChannel, Identifier + include Identification + include InternalChannel - PING_INTERVAL = 3 - - class_attribute :identifiers - self.identifiers = Set.new - - def self.identified_by(*identifiers) - self.identifiers += identifiers - end - - attr_reader :env, :server, :logger + attr_reader :server, :env delegate :worker_pool, :pubsub, to: :server + attr_reader :logger + def initialize(server, env) - @started_at = Time.now + @server, @env = server, env - @server = server - @env = env - @accept_messages = false - @pending_messages = [] - @subscriptions = {} + @logger = new_tagged_logger - @logger = TaggedLoggerProxy.new(server.logger, tags: log_tags) + @websocket = ActionCable::Connection::WebSocket.new(env) + @heartbeat = ActionCable::Connection::Heartbeat.new(self) + @subscriptions = ActionCable::Connection::Subscriptions.new(self) + @message_buffer = ActionCable::Connection::MessageBuffer.new(self) + + @started_at = Time.now end def process logger.info started_request_message - if websocket? - @websocket = Faye::WebSocket.new(@env) - - @websocket.on(:open) do |event| - transmit_ping_timestamp - @ping_timer = EventMachine.add_periodic_timer(PING_INTERVAL) { transmit_ping_timestamp } - worker_pool.async.invoke(self, :initialize_connection) - end - - @websocket.on(:message) do |event| - message = event.data - - if message.is_a?(String) - if @accept_messages - worker_pool.async.invoke(self, :received_data, message) - else - @pending_messages << message - end - end - end - - @websocket.on(:close) do |event| - logger.info finished_request_message - - worker_pool.async.invoke(self, :on_connection_closed) - EventMachine.cancel_timer(@ping_timer) if @ping_timer - end - - @websocket.rack_response + if websocket.possible? + websocket.on(:open) { |event| send_async :on_open } + websocket.on(:message) { |event| on_message event.data } + websocket.on(:close) { |event| send_async :on_close } + + respond_to_successful_request else - invalid_request + respond_to_invalid_request end end - def received_data(data) - return unless websocket_alive? - - data = ActiveSupport::JSON.decode data - - case data['command'] - when 'subscribe' - subscribe_channel(data) - when 'unsubscribe' - unsubscribe_channel(data) - when 'message' - process_message(data) + def receive(data_in_json) + if websocket.alive? + subscriptions.execute_command ActiveSupport::JSON.decode(data_in_json) else - logger.error "Received unrecognized command in #{data.inspect}" - end - end - - def cleanup_subscriptions - @subscriptions.each do |id, channel| - channel.perform_disconnection + logger.error "Received data without a live websocket (#{data.inspect})" end end def transmit(data) - @websocket.send data + websocket.transmit data end - def statistics - { - identifier: connection_identifier, - started_at: @started_at, - subscriptions: @subscriptions.keys - } + def close + logger.error "Closing connection" + websocket.close end - def handle_exception - close_connection + + def send_async(method, *arguments) + worker_pool.async.invoke(self, method, *arguments) end - def close_connection - logger.error "Closing connection" - @websocket.close + def statistics + { identifier: connection_identifier, started_at: @started_at, subscriptions: subscriptions.identifiers } end + protected def request @request ||= ActionDispatch::Request.new(Rails.application.env_config.merge(env)) @@ -117,79 +72,59 @@ module ActionCable request.cookie_jar end - def initialize_connection + + private + attr_reader :websocket + attr_reader :heartbeat, :subscriptions, :message_buffer + + def on_open server.add_connection(self) connect if respond_to?(:connect) subscribe_to_internal_channel + heartbeat.start - @accept_messages = true - worker_pool.async.invoke(self, :received_data, @pending_messages.shift) until @pending_messages.empty? + message_buffer.process! end - def on_connection_closed - server.remove_connection(self) - - cleanup_subscriptions - unsubscribe_from_internal_channel - disconnect if respond_to?(:disconnect) + def on_message(message) + message_buffer.append message end - def transmit_ping_timestamp - transmit({ identifier: '_ping', message: Time.now.to_i }.to_json) - end + def on_close + logger.info finished_request_message - def subscribe_channel(data) - id_key = data['identifier'] - id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access + server.remove_connection(self) - subscription_klass = server.registered_channels.detect { |channel_klass| channel_klass.find_name == id_options[:channel] } + subscriptions.cleanup + unsubscribe_from_internal_channel + heartbeat.stop - if subscription_klass - @subscriptions[id_key] = subscription_klass.new(self, id_key, id_options) - else - logger.error "Subscription class not found (#{data.inspect})" - end - rescue Exception => e - logger.error "Could not subscribe to channel (#{data.inspect})" - log_exception(e) + disconnect if respond_to?(:disconnect) end - def process_message(message) - if @subscriptions[message['identifier']] - @subscriptions[message['identifier']].perform_action(ActiveSupport::JSON.decode message['data']) - else - raise "Unable to process message because no subscription was found (#{message.inspect})" - end - rescue Exception => e - logger.error "Could not process message (#{message.inspect})" - log_exception(e) - end - def unsubscribe_channel(data) - logger.info "Unsubscribing from channel: #{data['identifier']}" - @subscriptions[data['identifier']].perform_disconnection - @subscriptions.delete(data['identifier']) + def respond_to_successful_request + websocket.rack_response end - def invalid_request + def respond_to_invalid_request logger.info finished_request_message - [404, {'Content-Type' => 'text/plain'}, ['Page not found']] + [ 404, { 'Content-Type' => 'text/plain' }, [ 'Page not found' ] ] end - def websocket_alive? - @websocket && @websocket.ready_state == Faye::WebSocket::API::OPEN - end - def websocket? - @is_websocket ||= Faye::WebSocket.websocket?(@env) + # Tags are declared in the server but computed in the connection. This allows us per-connection tailored tags. + def new_tagged_logger + TaggedLoggerProxy.new server.logger, + tags: server.log_tags.map { |tag| tag.respond_to?(:call) ? tag.call(request) : tag.to_s.camelize } end def started_request_message 'Started %s "%s"%s for %s at %s' % [ request.request_method, request.filtered_path, - websocket? ? ' [Websocket]' : '', + websocket.possible? ? ' [Websocket]' : '', request.ip, Time.now.to_default_s ] end @@ -197,19 +132,10 @@ module ActionCable def finished_request_message 'Finished "%s"%s for %s at %s' % [ request.filtered_path, - websocket? ? ' [Websocket]' : '', + websocket.possible? ? ' [Websocket]' : '', request.ip, Time.now.to_default_s ] end - - def log_exception(e) - logger.error "There was an exception: #{e.class} - #{e.message}" - logger.error e.backtrace.join("\n") - end - - def log_tags - server.log_tags.map { |tag| tag.respond_to?(:call) ? tag.call(request) : tag.to_s.camelize } - end end end end diff --git a/lib/action_cable/connection/heartbeat.rb b/lib/action_cable/connection/heartbeat.rb new file mode 100644 index 0000000000..47cd937c25 --- /dev/null +++ b/lib/action_cable/connection/heartbeat.rb @@ -0,0 +1,27 @@ +module ActionCable + module Connection + class Heartbeat + BEAT_INTERVAL = 3 + + def initialize(connection) + @connection = connection + end + + def start + beat + @timer = EventMachine.add_periodic_timer(BEAT_INTERVAL) { beat } + end + + def stop + EventMachine.cancel_timer(@timer) if @timer + end + + private + attr_reader :connection + + def beat + connection.transmit({ identifier: '_ping', message: Time.now.to_i }.to_json) + end + end + end +end
\ No newline at end of file diff --git a/lib/action_cable/connection/identification.rb b/lib/action_cable/connection/identification.rb new file mode 100644 index 0000000000..246636198b --- /dev/null +++ b/lib/action_cable/connection/identification.rb @@ -0,0 +1,26 @@ +module ActionCable + module Connection + module Identification + extend ActiveSupport::Concern + + included do + class_attribute :identifiers + self.identifiers = Set.new + end + + class_methods do + def identified_by(*identifiers) + self.identifiers += identifiers + end + end + + def connection_identifier + @connection_identifier ||= connection_gid identifiers.map { |id| instance_variable_get("@#{id}") }.compact + end + + def connection_gid(ids) + ids.map { |o| o.to_global_id.to_s }.sort.join(":") + end + end + end +end diff --git a/lib/action_cable/connection/identifier.rb b/lib/action_cable/connection/identifier.rb deleted file mode 100644 index a608fc546a..0000000000 --- a/lib/action_cable/connection/identifier.rb +++ /dev/null @@ -1,17 +0,0 @@ -module ActionCable - module Connection - module Identifier - def internal_redis_channel - "action_cable/#{connection_identifier}" - end - - def connection_identifier - @connection_identifier ||= connection_gid identifiers.map { |id| instance_variable_get("@#{id}")}.compact - end - - def connection_gid(ids) - ids.map {|o| o.to_global_id.to_s }.sort.join(":") - end - end - end -end diff --git a/lib/action_cable/connection/internal_channel.rb b/lib/action_cable/connection/internal_channel.rb index 3a11bcaf7b..70e5e58373 100644 --- a/lib/action_cable/connection/internal_channel.rb +++ b/lib/action_cable/connection/internal_channel.rb @@ -3,6 +3,10 @@ module ActionCable module InternalChannel extend ActiveSupport::Concern + def internal_redis_channel + "action_cable/#{connection_identifier}" + end + def subscribe_to_internal_channel if connection_identifier.present? callback = -> (message) { process_internal_message(message) } @@ -27,13 +31,13 @@ module ActionCable case message['type'] when 'disconnect' logger.info "Removing connection (#{connection_identifier})" - @websocket.close + websocket.close end rescue Exception => e logger.error "There was an exception - #{e.class}(#{e.message})" logger.error e.backtrace.join("\n") - handle_exception + close end end end diff --git a/lib/action_cable/connection/message_buffer.rb b/lib/action_cable/connection/message_buffer.rb new file mode 100644 index 0000000000..615266e0cb --- /dev/null +++ b/lib/action_cable/connection/message_buffer.rb @@ -0,0 +1,51 @@ +module ActionCable + module Connection + class MessageBuffer + def initialize(connection) + @connection = connection + @buffered_messages = [] + end + + def append(message) + if valid? message + if processing? + receive message + else + buffer message + end + else + connection.logger.error "Couldn't handle non-string message: #{message.class}" + end + end + + def processing? + @processing + end + + def process! + @processing = true + receive_buffered_messages + end + + private + attr_reader :connection + attr_accessor :buffered_messages + + def valid?(message) + message.is_a?(String) + end + + def receive(message) + connection.send_async :receive, message + end + + def buffer(message) + buffered_messages << message + end + + def receive_buffered_messages + receive buffered_messages.shift until buffered_messages.empty? + end + end + end +end
\ No newline at end of file diff --git a/lib/action_cable/connection/subscriptions.rb b/lib/action_cable/connection/subscriptions.rb new file mode 100644 index 0000000000..24ab1bdfbf --- /dev/null +++ b/lib/action_cable/connection/subscriptions.rb @@ -0,0 +1,69 @@ +module ActionCable + module Connection + class Subscriptions + def initialize(connection) + @connection = connection + @subscriptions = {} + end + + def execute_command(data) + case data['command'] + when 'subscribe' then add data + when 'unsubscribe' then remove data + when 'message' then perform_action data + else + logger.error "Received unrecognized command in #{data.inspect}" + end + rescue Exception => e + logger.error "Could not execute command from #{data.inspect}) [#{e.class} - #{e.message}]: #{e.backtrace.first(5).join(" | ")}" + end + + def add(data) + id_key = data['identifier'] + id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access + + subscription_klass = connection.server.registered_channels.detect do |channel_klass| + channel_klass == id_options[:channel].safe_constantize + end + + if subscription_klass + subscriptions[id_key] ||= subscription_klass.new(connection, id_key, id_options) + else + logger.error "Subscription class not found (#{data.inspect})" + end + end + + def remove(data) + logger.info "Unsubscribing from channel: #{data['identifier']}" + subscriptions[data['identifier']].perform_disconnection + subscriptions.delete(data['identifier']) + end + + def perform_action(data) + find(data).perform_action ActiveSupport::JSON.decode(data['data']) + end + + + def identifiers + subscriptions.keys + end + + def cleanup + subscriptions.each { |id, channel| channel.perform_disconnection } + end + + + private + attr_reader :connection, :subscriptions + delegate :logger, to: :connection + + def find(data) + if subscription = subscriptions[data['identifier']] + subscription + else + raise "Unable to find subscription with identifier: #{data['identifier']}" + end + end + end + end +end diff --git a/lib/action_cable/connection/tagged_logger_proxy.rb b/lib/action_cable/connection/tagged_logger_proxy.rb index d99cc2e9a3..e0c0075adf 100644 --- a/lib/action_cable/connection/tagged_logger_proxy.rb +++ b/lib/action_cable/connection/tagged_logger_proxy.rb @@ -1,7 +1,9 @@ module ActionCable module Connection + # Allows the use of per-connection tags against the server logger. This wouldn't work using the tradional + # ActiveSupport::TaggedLogging-enhanced Rails.logger, as that logger will reset the tags between requests. + # The connection is long-lived, so it needs its own set of tags for its independent duration. class TaggedLoggerProxy - def initialize(logger, tags:) @logger = logger @tags = tags.flatten diff --git a/lib/action_cable/connection/web_socket.rb b/lib/action_cable/connection/web_socket.rb new file mode 100644 index 0000000000..135a28cfe4 --- /dev/null +++ b/lib/action_cable/connection/web_socket.rb @@ -0,0 +1,27 @@ +module ActionCable + module Connection + # Decorate the Faye::WebSocket with helpers we need. + class WebSocket + delegate :rack_response, :close, :on, to: :websocket + + def initialize(env) + @websocket = Faye::WebSocket.websocket?(env) ? Faye::WebSocket.new(env) : nil + end + + def possible? + websocket + end + + def alive? + websocket && websocket.ready_state == Faye::WebSocket::API::OPEN + end + + def transmit(data) + websocket.send data + end + + private + attr_reader :websocket + end + end +end diff --git a/lib/action_cable/remote_connection.rb b/lib/action_cable/remote_connection.rb index 912fb6eb57..d7a3f0125d 100644 --- a/lib/action_cable/remote_connection.rb +++ b/lib/action_cable/remote_connection.rb @@ -2,7 +2,7 @@ module ActionCable class RemoteConnection class InvalidIdentifiersError < StandardError; end - include Connection::Identifier + include Connection::Identification, Connection::InternalChannel def initialize(server, ids) @server = server @@ -10,19 +10,16 @@ module ActionCable end def disconnect - message = { type: 'disconnect' }.to_json - redis.publish(internal_redis_channel, message) + server.broadcast_without_logging internal_redis_channel, type: 'disconnect' end def identifiers - @server.connection_identifiers - end - - def redis - @server.threaded_redis + server.connection_identifiers end private + attr_reader :server + def set_identifier_instance_vars(ids) raise InvalidIdentifiersError unless valid_identifiers?(ids) ids.each { |k,v| instance_variable_set("@#{k}", v) } diff --git a/lib/action_cable/server.rb b/lib/action_cable/server.rb index 322fc85519..fa7bad4e32 100644 --- a/lib/action_cable/server.rb +++ b/lib/action_cable/server.rb @@ -1,75 +1,7 @@ module ActionCable - class Server - cattr_accessor(:logger, instance_reader: true) { Rails.logger } - - attr_accessor :registered_channels, :redis_config, :log_tags - - def initialize(redis_config:, channels:, worker_pool_size: 100, connection: Connection, log_tags: [ 'ActionCable' ]) - @redis_config = redis_config.with_indifferent_access - @registered_channels = Set.new(channels) - @worker_pool_size = worker_pool_size - @connection_class = connection - @log_tags = log_tags - - @connections = [] - - logger.info "[ActionCable] Initialized server (redis_config: #{@redis_config.inspect}, worker_pool_size: #{@worker_pool_size})" - end - - def call(env) - @connection_class.new(self, env).process - end - - def worker_pool - @worker_pool ||= ActionCable::Worker.pool(size: @worker_pool_size) - end - - def pubsub - @pubsub ||= redis.pubsub - end - - def redis - @redis ||= begin - redis = EM::Hiredis.connect(@redis_config[:url]) - redis.on(:reconnect_failed) do - logger.info "[ActionCable] Redis reconnect failed." - # logger.info "[ActionCable] Redis reconnected. Closing all the open connections." - # @connections.map &:close_connection - end - redis - end - end - - def threaded_redis - @threaded_redis ||= Redis.new(redis_config) - end - - def remote_connections - @remote_connections ||= RemoteConnections.new(self) - end - - def broadcaster_for(channel) - Broadcaster.new(self, channel) - end - - def broadcast(channel, message) - broadcaster_for(channel).broadcast(message) - end - - def connection_identifiers - @connection_class.identifiers - end - - def add_connection(connection) - @connections << connection - end - - def remove_connection(connection) - @connections.delete connection - end - - def open_connections_statistics - @connections.map(&:statistics) - end + module Server + autoload :Base, 'action_cable/server/base' + autoload :Broadcasting, 'action_cable/server/broadcasting' + autoload :Worker, 'action_cable/server/worker' end end diff --git a/lib/action_cable/server/base.rb b/lib/action_cable/server/base.rb new file mode 100644 index 0000000000..e8109b325d --- /dev/null +++ b/lib/action_cable/server/base.rb @@ -0,0 +1,67 @@ +module ActionCable + module Server + class Base + include ActionCable::Server::Broadcasting + + cattr_accessor(:logger, instance_reader: true) { Rails.logger } + + attr_accessor :registered_channels, :redis_config, :log_tags + + def initialize(redis_config:, channels:, worker_pool_size: 100, connection: Connection, log_tags: [ 'ActionCable' ]) + @redis_config = redis_config.with_indifferent_access + @registered_channels = Set.new(channels) + @worker_pool_size = worker_pool_size + @connection_class = connection + @log_tags = log_tags + + @connections = [] + + logger.info "[ActionCable] Initialized server (redis_config: #{@redis_config.inspect}, worker_pool_size: #{@worker_pool_size})" + end + + def call(env) + @connection_class.new(self, env).process + end + + def worker_pool + @worker_pool ||= ActionCable::Server::Worker.pool(size: @worker_pool_size) + end + + def pubsub + @pubsub ||= redis.pubsub + end + + def redis + @redis ||= begin + redis = EM::Hiredis.connect(@redis_config[:url]) + redis.on(:reconnect_failed) do + logger.info "[ActionCable] Redis reconnect failed." + # logger.info "[ActionCable] Redis reconnected. Closing all the open connections." + # @connections.map &:close + end + redis + end + end + + def remote_connections + @remote_connections ||= RemoteConnections.new(self) + end + + def connection_identifiers + @connection_class.identifiers + end + + def add_connection(connection) + @connections << connection + end + + def remove_connection(connection) + @connections.delete connection + end + + def open_connections_statistics + @connections.map(&:statistics) + end + end + end +end
\ No newline at end of file diff --git a/lib/action_cable/server/broadcasting.rb b/lib/action_cable/server/broadcasting.rb new file mode 100644 index 0000000000..b0e51b8ba8 --- /dev/null +++ b/lib/action_cable/server/broadcasting.rb @@ -0,0 +1,39 @@ +module ActionCable + module Server + module Broadcasting + def broadcast(broadcasting, message) + broadcaster_for(broadcasting).broadcast(message) + end + + def broadcast_without_logging(broadcasting, message) + broadcaster_for(broadcasting).broadcast_without_logging(message) + end + + def broadcaster_for(broadcasting) + Broadcaster.new(self, broadcasting) + end + + def broadcasting_redis + @broadcasting_redis ||= Redis.new(redis_config) + end + + private + class Broadcaster + attr_reader :server, :broadcasting + + def initialize(server, broadcasting) + @server, @broadcasting = server, broadcasting + end + + def broadcast(message) + server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message}" + broadcast_without_logging(message) + end + + def broadcast_without_logging(message) + server.broadcasting_redis.publish broadcasting, message.to_json + end + end + end + end +end
\ No newline at end of file diff --git a/lib/action_cable/server/worker.rb b/lib/action_cable/server/worker.rb new file mode 100644 index 0000000000..0491cb9ab0 --- /dev/null +++ b/lib/action_cable/server/worker.rb @@ -0,0 +1,32 @@ +module ActionCable + module Server + class Worker + include ActiveSupport::Callbacks + include Celluloid + + define_callbacks :work + + def invoke(receiver, method, *args) + run_callbacks :work do + receiver.send method, *args + end + rescue Exception => e + logger.error "There was an exception - #{e.class}(#{e.message})" + logger.error e.backtrace.join("\n") + + receiver.handle_exception if receiver.respond_to?(:handle_exception) + end + + def run_periodic_timer(channel, callback) + run_callbacks :work do + callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback) + end + end + + private + def logger + ActionCable::Server::Base.logger + end + end + end +end
\ No newline at end of file diff --git a/lib/action_cable/worker.rb b/lib/action_cable/worker.rb deleted file mode 100644 index 6800a75d1d..0000000000 --- a/lib/action_cable/worker.rb +++ /dev/null @@ -1,30 +0,0 @@ -module ActionCable - class Worker - include ActiveSupport::Callbacks - include Celluloid - - define_callbacks :work - - def invoke(receiver, method, *args) - run_callbacks :work do - receiver.send method, *args - end - rescue Exception => e - logger.error "There was an exception - #{e.class}(#{e.message})" - logger.error e.backtrace.join("\n") - - receiver.handle_exception if receiver.respond_to?(:handle_exception) - end - - def run_periodic_timer(channel, callback) - run_callbacks :work do - callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback) - end - end - - private - def logger - ActionCable::Server.logger - end - end -end diff --git a/lib/assets/javascripts/cable.js.coffee b/lib/assets/javascripts/cable.js.coffee index 7c033d3b08..0bd1757505 100644 --- a/lib/assets/javascripts/cable.js.coffee +++ b/lib/assets/javascripts/cable.js.coffee @@ -1,124 +1,8 @@ #= require_self -#= require_tree . +#= require cable/consumer -class @Cable - MAX_CONNECTION_INTERVAL: 5 * 1000 - PING_STALE_INTERVAL: 8 +@Cable = + PING_IDENTIFIER: "_ping" - constructor: (@cableUrl) -> - @subscribers = {} - @resetPingTime() - @resetConnectionAttemptsCount() - @connect() - - connect: -> - @connection = @createConnection() - - createConnection: -> - connection = new WebSocket(@cableUrl) - connection.onmessage = @receiveData - connection.onopen = @connected - connection.onclose = @reconnect - - connection.onerror = @reconnect - connection - - isConnected: => - @connection?.readyState is 1 - - sendData: (identifier, data) => - if @isConnected() - @connection.send JSON.stringify { command: 'message', identifier: identifier, data: data } - - receiveData: (message) => - data = JSON.parse message.data - - if data.identifier is '_ping' - @pingReceived(data.message) - else - @subscribers[data.identifier]?.onReceiveData(data.message) - - connected: => - @startWaitingForPing() - @resetConnectionAttemptsCount() - - for identifier, callbacks of @subscribers - @subscribeOnServer(identifier) - callbacks['onConnect']?() - - reconnect: => - @removeExistingConnection() - - @resetPingTime() - @disconnected() - - setTimeout => - @incrementConnectionAttemptsCount() - @connect() - , @generateReconnectInterval() - - removeExistingConnection: => - if @connection? - @clearPingWaitTimeout() - - @connection.onclose = -> # no-op - @connection.onerror = -> # no-op - @connection.close() - @connection = null - - resetConnectionAttemptsCount: => - @connectionAttempts = 1 - - incrementConnectionAttemptsCount: => - @connectionAttempts += 1 - - generateReconnectInterval: () -> - interval = (Math.pow(2, @connectionAttempts) - 1) * 1000 - if interval > @MAX_CONNECTION_INTERVAL then @MAX_CONNECTION_INTERVAL else interval - - startWaitingForPing: => - @clearPingWaitTimeout() - - @waitForPingTimeout = setTimeout => - console.log "Ping took too long to arrive. Reconnecting.." - @reconnect() - , @PING_STALE_INTERVAL * 1000 - - clearPingWaitTimeout: => - clearTimeout(@waitForPingTimeout) - - resetPingTime: => - @lastPingTime = null - - disconnected: => - callbacks['onDisconnect']?() for identifier, callbacks of @subscribers - - giveUp: => - # Show an error message - - subscribe: (identifier, callbacks) => - @subscribers[identifier] = callbacks - - if @isConnected() - @subscribeOnServer(identifier) - @subscribers[identifier]['onConnect']?() - - unsubscribe: (identifier) => - @unsubscribeOnServer(identifier, 'unsubscribe') - delete @subscribers[identifier] - - subscribeOnServer: (identifier) => - if @isConnected() - @connection.send JSON.stringify { command: 'subscribe', identifier: identifier } - - unsubscribeOnServer: (identifier) => - if @isConnected() - @connection.send JSON.stringify { command: 'unsubscribe', identifier: identifier } - - pingReceived: (timestamp) => - if @lastPingTime? and (timestamp - @lastPingTime) > @PING_STALE_INTERVAL - console.log "Websocket connection is stale. Reconnecting.." - @reconnect() - else - @startWaitingForPing() - @lastPingTime = timestamp + createConsumer: (url) -> + new Cable.Consumer url diff --git a/lib/assets/javascripts/cable/connection.js.coffee b/lib/assets/javascripts/cable/connection.js.coffee new file mode 100644 index 0000000000..4f7d2abada --- /dev/null +++ b/lib/assets/javascripts/cable/connection.js.coffee @@ -0,0 +1,53 @@ +class Cable.Connection + constructor: (@consumer) -> + @open() + + send: (data) -> + if @isOpen() + @websocket.send(JSON.stringify(data)) + true + else + false + + open: => + @websocket = new WebSocket(@consumer.url) + @websocket.onmessage = @onMessage + @websocket.onopen = @onOpen + @websocket.onclose = @onClose + @websocket.onerror = @onError + + close: -> + @websocket.close() unless @isClosed() + + reopen: -> + if @isClosed() + @open() + else + @websocket.onclose = @open + @websocket.onerror = @open + @websocket.close() + + isOpen: -> + @websocket.readyState is WebSocket.OPEN + + isClosed: -> + @websocket.readyState in [ WebSocket.CLOSED, WebSocket.CLOSING ] + + onMessage: (message) => + data = JSON.parse message.data + @consumer.subscribers.notify(data.identifier, "received", data.message) + + onOpen: => + @consumer.subscribers.reload() + + onClose: => + @disconnect() + + onError: => + @disconnect() + @websocket.onclose = -> # no-op + @websocket.onerror = -> # no-op + try @close() + + disconnect: -> + @consumer.subscribers.notifyAll("disconnected") diff --git a/lib/assets/javascripts/cable/connection_monitor.js.coffee b/lib/assets/javascripts/cable/connection_monitor.js.coffee new file mode 100644 index 0000000000..fc5093c5eb --- /dev/null +++ b/lib/assets/javascripts/cable/connection_monitor.js.coffee @@ -0,0 +1,65 @@ +class Cable.ConnectionMonitor + identifier: Cable.PING_IDENTIFIER + + pollInterval: + min: 2 + max: 30 + + staleThreshold: + startedAt: 4 + pingedAt: 8 + + constructor: (@consumer) -> + @consumer.subscribers.add(this) + @start() + + connected: -> + @reset() + @pingedAt = now() + + received: -> + @pingedAt = now() + + reset: -> + @reconnectAttempts = 0 + + start: -> + @reset() + delete @stoppedAt + @startedAt = now() + @poll() + + stop: -> + @stoppedAt = now() + + poll: -> + setTimeout => + unless @stoppedAt + @reconnectIfStale() + @poll() + , @getInterval() + + getInterval: -> + {min, max} = @pollInterval + interval = 4 * Math.log(@reconnectAttempts + 1) + clamp(interval, min, max) * 1000 + + reconnectIfStale: -> + if @connectionIsStale() + @reconnectAttempts += 1 + @consumer.connection.reopen() + + connectionIsStale: -> + if @pingedAt + secondsSince(@pingedAt) > @staleThreshold.pingedAt + else + secondsSince(@startedAt) > @staleThreshold.startedAt + + now = -> + new Date().getTime() + + secondsSince = (time) -> + (now() - time) / 1000 + + clamp = (number, min, max) -> + Math.max(min, Math.min(max, number)) diff --git a/lib/assets/javascripts/cable/consumer.js.coffee b/lib/assets/javascripts/cable/consumer.js.coffee new file mode 100644 index 0000000000..b9c08807f2 --- /dev/null +++ b/lib/assets/javascripts/cable/consumer.js.coffee @@ -0,0 +1,18 @@ +#= require cable/connection +#= require cable/connection_monitor +#= require cable/subscription +#= require cable/subscriber_manager + +class Cable.Consumer + constructor: (@url) -> + @subscribers = new Cable.SubscriberManager this + @connection = new Cable.Connection this + @connectionMonitor = new Cable.ConnectionMonitor this + + createSubscription: (channelName, mixin) -> + channel = channelName + params = if typeof channel is "object" then channel else {channel} + new Cable.Subscription this, params, mixin + + send: (data) -> + @connection.send(data) diff --git a/lib/assets/javascripts/cable/subscriber_manager.js.coffee b/lib/assets/javascripts/cable/subscriber_manager.js.coffee new file mode 100644 index 0000000000..0b6a16590c --- /dev/null +++ b/lib/assets/javascripts/cable/subscriber_manager.js.coffee @@ -0,0 +1,38 @@ +class Cable.SubscriberManager + constructor: (@consumer) -> + @subscribers = [] + + add: (subscriber) -> + @subscribers.push(subscriber) + @notify(subscriber, "initialized") + if @sendCommand(subscriber, "subscribe") + @notify(subscriber, "connected") + + reload: -> + for subscriber in @subscribers + if @sendCommand(subscriber, "subscribe") + @notify(subscriber, "connected") + + remove: (subscriber) -> + @sendCommand(subscriber, "unsubscribe") + @subscribers = (s for s in @subscribers when s isnt subscriber) + + notifyAll: (callbackName, args...) -> + for subscriber in @subscribers + @notify(subscriber, callbackName, args...) + + notify: (subscriber, callbackName, args...) -> + if typeof subscriber is "string" + subscribers = (s for s in @subscribers when s.identifier is subscriber) + else + subscribers = [subscriber] + + for subscriber in subscribers + subscriber[callbackName]?(args...) + + sendCommand: (subscriber, command) -> + {identifier} = subscriber + if identifier is Cable.PING_IDENTIFIER + @consumer.connection.isOpen() + else + @consumer.send({command, identifier}) diff --git a/lib/assets/javascripts/cable/subscription.js.coffee b/lib/assets/javascripts/cable/subscription.js.coffee new file mode 100644 index 0000000000..74cc35a7a7 --- /dev/null +++ b/lib/assets/javascripts/cable/subscription.js.coffee @@ -0,0 +1,22 @@ +class Cable.Subscription + constructor: (@consumer, params = {}, mixin) -> + @identifier = JSON.stringify(params) + extend(this, mixin) + @consumer.subscribers.add(this) + + # Perform a channel action with the optional data passed as an attribute + perform: (action, data = {}) -> + data.action = action + @send(data) + + send: (data) -> + @consumer.send(command: "message", identifier: @identifier, data: JSON.stringify(data)) + + unsubscribe: -> + @consumer.subscribers.remove(this) + + extend = (object, properties) -> + if properties? + for key, value of properties + object[key] = value + object diff --git a/lib/assets/javascripts/channel.js.coffee b/lib/assets/javascripts/channel.js.coffee deleted file mode 100644 index 2f07affb19..0000000000 --- a/lib/assets/javascripts/channel.js.coffee +++ /dev/null @@ -1,34 +0,0 @@ -class @Cable.Channel - constructor: (params = {}) -> - @channelName ?= "#{@underscore(@constructor.name)}_channel" - - params['channel'] = @channelName - @channelIdentifier = JSON.stringify params - - cable.subscribe(@channelIdentifier, { - onConnect: @connected - onDisconnect: @disconnected - onReceiveData: @received - }) - - - connected: => - # Override in the subclass - - disconnected: => - # Override in the subclass - - received: (data) => - # Override in the subclass - - # Perform a channel action with the optional data passed as an attribute - perform: (action, data = {}) -> - data.action = action - cable.sendData @channelIdentifier, JSON.stringify data - - send: (data) -> - cable.sendData @channelIdentifier, JSON.stringify data - - - underscore: (value) -> - value.replace(/[A-Z]/g, (match) => "_#{match.toLowerCase()}").substr(1)
\ No newline at end of file |