diff options
Diffstat (limited to 'lib')
30 files changed, 346 insertions, 223 deletions
diff --git a/lib/action_cable.rb b/lib/action_cable.rb index b269386c81..89ffa1fda7 100644 --- a/lib/action_cable.rb +++ b/lib/action_cable.rb @@ -1,31 +1,21 @@ -require 'eventmachine' -EM.epoll - -require 'set' - require 'active_support' -require 'active_support/json' -require 'active_support/concern' -require 'active_support/core_ext/hash/indifferent_access' -require 'active_support/core_ext/module/delegation' -require 'active_support/callbacks' - -require 'faye/websocket' -require 'celluloid' -require 'em-hiredis' -require 'redis' - -require 'action_cable/engine' if defined?(Rails) +require 'active_support/rails' require 'action_cable/version' module ActionCable - autoload :Server, 'action_cable/server' - autoload :Connection, 'action_cable/connection' - autoload :Channel, 'action_cable/channel' - autoload :RemoteConnections, 'action_cable/remote_connections' + extend ActiveSupport::Autoload # Singleton instance of the server module_function def server @server ||= ActionCable::Server::Base.new end + + eager_autoload do + autoload :Server + autoload :Connection + autoload :Channel + autoload :RemoteConnections + end end + +require 'action_cable/engine' if defined?(Rails) diff --git a/lib/action_cable/channel.rb b/lib/action_cable/channel.rb index 3b973ba0a7..7ae262ce5f 100644 --- a/lib/action_cable/channel.rb +++ b/lib/action_cable/channel.rb @@ -1,10 +1,14 @@ module ActionCable module Channel - autoload :Base, 'action_cable/channel/base' - autoload :Broadcasting, 'action_cable/channel/broadcasting' - autoload :Callbacks, 'action_cable/channel/callbacks' - autoload :Naming, 'action_cable/channel/naming' - autoload :PeriodicTimers, 'action_cable/channel/periodic_timers' - autoload :Streams, 'action_cable/channel/streams' + extend ActiveSupport::Autoload + + eager_autoload do + autoload :Base + autoload :Broadcasting + autoload :Callbacks + autoload :Naming + autoload :PeriodicTimers + autoload :Streams + end end end diff --git a/lib/action_cable/channel/base.rb b/lib/action_cable/channel/base.rb index 171558e371..7607b5ad59 100644 --- a/lib/action_cable/channel/base.rb +++ b/lib/action_cable/channel/base.rb @@ -1,6 +1,8 @@ +require 'set' + module ActionCable module Channel - # The channel provides the basic structure of grouping behavior into logical units when communicating over the websocket connection. + # The channel provides the basic structure of grouping behavior into logical units when communicating over the WebSocket connection. # You can think of a channel like a form of controller, but one that's capable of pushing content to the subscriber in addition to simply # responding to the subscriber's direct requests. # @@ -71,6 +73,8 @@ module ActionCable include Naming include Broadcasting + SUBSCRIPTION_CONFIRMATION_INTERNAL_MESSAGE = 'confirm_subscription'.freeze + attr_reader :params, :connection delegate :logger, to: :connection @@ -115,6 +119,10 @@ module ActionCable @identifier = identifier @params = params + # When a channel is streaming via redis pubsub, we want to delay the confirmation + # transmission until redis pubsub subscription is confirmed. + @defer_subscription_confirmation = false + delegate_connection_identifiers subscribe_to_channel end @@ -135,10 +143,7 @@ module ActionCable # Called by the cable connection when its cut so the channel has a chance to cleanup with callbacks. # This method is not intended to be called directly by the user. Instead, overwrite the #unsubscribed callback. def unsubscribe_from_channel - run_callbacks :unsubscribe do - unsubscribed - end - logger.info "#{self.class.name} unsubscribed" + _run_unsubscribe_callbacks { unsubscribed } end @@ -159,9 +164,22 @@ module ActionCable # the proper channel identifier marked as the recipient. def transmit(data, via: nil) logger.info "#{self.class.name} transmitting #{data.inspect}".tap { |m| m << " (via #{via})" if via } - connection.transmit({ identifier: @identifier, message: data }.to_json) + connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, message: data) + end + + + protected + def defer_subscription_confirmation! + @defer_subscription_confirmation = true end + def defer_subscription_confirmation? + @defer_subscription_confirmation + end + + def subscription_confirmation_sent? + @subscription_confirmation_sent + end private def delegate_connection_identifiers @@ -174,10 +192,8 @@ module ActionCable def subscribe_to_channel - logger.info "#{self.class.name} subscribing" - run_callbacks :subscribe do - subscribed - end + _run_subscribe_callbacks { subscribed } + transmit_subscription_confirmation unless defer_subscription_confirmation? end @@ -206,6 +222,15 @@ module ActionCable end end end + + def transmit_subscription_confirmation + unless subscription_confirmation_sent? + logger.info "#{self.class.name} is transmitting the subscription confirmation" + connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: SUBSCRIPTION_CONFIRMATION_INTERNAL_MESSAGE) + + @subscription_confirmation_sent = true + end + end end end end diff --git a/lib/action_cable/channel/callbacks.rb b/lib/action_cable/channel/callbacks.rb index f050fc95c0..295d750e86 100644 --- a/lib/action_cable/channel/callbacks.rb +++ b/lib/action_cable/channel/callbacks.rb @@ -21,7 +21,6 @@ module ActionCable end alias_method :on_subscribe, :after_subscribe - def before_unsubscribe(*methods, &block) set_callback(:unsubscribe, :before, *methods, &block) end diff --git a/lib/action_cable/channel/streams.rb b/lib/action_cable/channel/streams.rb index 2d1506ee98..b5ffa17f72 100644 --- a/lib/action_cable/channel/streams.rb +++ b/lib/action_cable/channel/streams.rb @@ -69,12 +69,18 @@ module ActionCable # Start streaming from the named <tt>broadcasting</tt> pubsub queue. Optionally, you can pass a <tt>callback</tt> that'll be used # instead of the default of just transmitting the updates straight to the subscriber. def stream_from(broadcasting, callback = nil) - callback ||= default_stream_callback(broadcasting) + # Hold off the confirmation until pubsub#subscribe is successful + defer_subscription_confirmation! + callback ||= default_stream_callback(broadcasting) streams << [ broadcasting, callback ] - pubsub.subscribe broadcasting, &callback - logger.info "#{self.class.name} is streaming from #{broadcasting}" + EM.next_tick do + pubsub.subscribe(broadcasting, &callback).callback do |reply| + transmit_subscription_confirmation + logger.info "#{self.class.name} is streaming from #{broadcasting}" + end + end end # Start streaming the pubsub queue for the <tt>model</tt> in this channel. Optionally, you can pass a diff --git a/lib/action_cable/connection.rb b/lib/action_cable/connection.rb index c63621c519..b672e00682 100644 --- a/lib/action_cable/connection.rb +++ b/lib/action_cable/connection.rb @@ -1,13 +1,16 @@ module ActionCable module Connection - autoload :Authorization, 'action_cable/connection/authorization' - autoload :Base, 'action_cable/connection/base' - autoload :Heartbeat, 'action_cable/connection/heartbeat' - autoload :Identification, 'action_cable/connection/identification' - autoload :InternalChannel, 'action_cable/connection/internal_channel' - autoload :MessageBuffer, 'action_cable/connection/message_buffer' - autoload :WebSocket, 'action_cable/connection/web_socket' - autoload :Subscriptions, 'action_cable/connection/subscriptions' - autoload :TaggedLoggerProxy, 'action_cable/connection/tagged_logger_proxy' + extend ActiveSupport::Autoload + + eager_autoload do + autoload :Authorization + autoload :Base + autoload :Identification + autoload :InternalChannel + autoload :MessageBuffer + autoload :WebSocket + autoload :Subscriptions + autoload :TaggedLoggerProxy + end end end diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index 08a75156a3..a629f29643 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -1,8 +1,8 @@ -require 'action_dispatch/http/request' +require 'action_dispatch' module ActionCable module Connection - # For every websocket the cable server is accepting, a Connection object will be instantiated. This instance becomes the parent + # For every WebSocket the cable server is accepting, a Connection object will be instantiated. This instance becomes the parent # of all the channel subscriptions that are created from there on. Incoming messages are then routed to these channel subscriptions # based on an identifier sent by the cable consumer. The Connection itself does not deal with any specific application logic beyond # authentication and authorization. @@ -37,8 +37,8 @@ module ActionCable # established for that current_user (and potentially disconnect them if the user was removed from an account). You can declare as many # identification indexes as you like. Declaring an identification means that a attr_accessor is automatically set for that key. # - # Second, we rely on the fact that the websocket connection is established with the cookies from the domain being sent along. This makes - # it easy to use signed cookies that were set when logging in via a web interface to authorize the websocket connection. + # Second, we rely on the fact that the WebSocket connection is established with the cookies from the domain being sent along. This makes + # it easy to use signed cookies that were set when logging in via a web interface to authorize the WebSocket connection. # # Finally, we add a tag to the connection-specific logger with name of the current user to easily distinguish their messages in the log. # @@ -56,22 +56,21 @@ module ActionCable def initialize(server, env) @server, @env = server, env - @logger = new_tagged_logger + @logger = new_tagged_logger || server.logger @websocket = ActionCable::Connection::WebSocket.new(env) - @heartbeat = ActionCable::Connection::Heartbeat.new(self) @subscriptions = ActionCable::Connection::Subscriptions.new(self) @message_buffer = ActionCable::Connection::MessageBuffer.new(self) @started_at = Time.now end - # Called by the server when a new websocket connection is established. This configures the callbacks intended for overwriting by the user. + # Called by the server when a new WebSocket connection is established. This configures the callbacks intended for overwriting by the user. # This method should not be called directly. Rely on the #connect (and #disconnect) callback instead. def process logger.info started_request_message - if websocket.possible? + if websocket.possible? && allow_request_origin? websocket.on(:open) { |event| send_async :on_open } websocket.on(:message) { |event| on_message event.data } websocket.on(:close) { |event| send_async :on_close } @@ -88,19 +87,18 @@ module ActionCable if websocket.alive? subscriptions.execute_command ActiveSupport::JSON.decode(data_in_json) else - logger.error "Received data without a live websocket (#{data.inspect})" + logger.error "Received data without a live WebSocket (#{data_in_json.inspect})" end end - # Send raw data straight back down the websocket. This is not intended to be called directly. Use the #transmit available on the + # Send raw data straight back down the WebSocket. This is not intended to be called directly. Use the #transmit available on the # Channel instead, as that'll automatically address the correct subscriber and wrap the message in JSON. def transmit(data) websocket.transmit data end - # Close the websocket connection. + # Close the WebSocket connection. def close - logger.error "Closing connection" websocket.close end @@ -112,12 +110,21 @@ module ActionCable # Return a basic hash of statistics for the connection keyed with `identifier`, `started_at`, and `subscriptions`. # This can be returned by a health check against the connection. def statistics - { identifier: connection_identifier, started_at: @started_at, subscriptions: subscriptions.identifiers } + { + identifier: connection_identifier, + started_at: @started_at, + subscriptions: subscriptions.identifiers, + request_id: @env['action_dispatch.request_id'] + } + end + + def beat + transmit ActiveSupport::JSON.encode(identifier: '_ping', message: Time.now.to_i) end protected - # The request that initiated the websocket connection is available here. This gives access to the environment, cookies, etc. + # The request that initiated the WebSocket connection is available here. This gives access to the environment, cookies, etc. def request @request ||= begin environment = Rails.application.env_config.merge(env) if defined?(Rails.application) && Rails.application @@ -125,7 +132,7 @@ module ActionCable end end - # The cookies of the request that initiated the websocket connection. Useful for performing authorization checks. + # The cookies of the request that initiated the WebSocket connection. Useful for performing authorization checks. def cookies request.cookie_jar end @@ -133,16 +140,15 @@ module ActionCable private attr_reader :websocket - attr_reader :heartbeat, :subscriptions, :message_buffer + attr_reader :subscriptions, :message_buffer def on_open - server.add_connection(self) - connect if respond_to?(:connect) subscribe_to_internal_channel - heartbeat.start + beat message_buffer.process! + server.add_connection(self) rescue ActionCable::Connection::Authorization::UnauthorizedError respond_to_invalid_request close @@ -159,12 +165,22 @@ module ActionCable subscriptions.unsubscribe_from_all unsubscribe_from_internal_channel - heartbeat.stop disconnect if respond_to?(:disconnect) end + def allow_request_origin? + return true if server.config.disable_request_forgery_protection + + if Array(server.config.allowed_request_origins).include? env['HTTP_ORIGIN'] + true + else + logger.error("Request origin not allowed: #{env['HTTP_ORIGIN']}") + false + end + end + def respond_to_successful_request websocket.rack_response end @@ -177,25 +193,27 @@ module ActionCable # Tags are declared in the server but computed in the connection. This allows us per-connection tailored tags. def new_tagged_logger - TaggedLoggerProxy.new server.logger, - tags: server.config.log_tags.map { |tag| tag.respond_to?(:call) ? tag.call(request) : tag.to_s.camelize } + if server.logger.respond_to?(:tagged) + TaggedLoggerProxy.new server.logger, + tags: server.config.log_tags.map { |tag| tag.respond_to?(:call) ? tag.call(request) : tag.to_s.camelize } + end end def started_request_message 'Started %s "%s"%s for %s at %s' % [ request.request_method, request.filtered_path, - websocket.possible? ? ' [Websocket]' : '', + websocket.possible? ? ' [WebSocket]' : '', request.ip, - Time.now.to_default_s ] + Time.now.to_s ] end def finished_request_message 'Finished "%s"%s for %s at %s' % [ request.filtered_path, - websocket.possible? ? ' [Websocket]' : '', + websocket.possible? ? ' [WebSocket]' : '', request.ip, - Time.now.to_default_s ] + Time.now.to_s ] end end end diff --git a/lib/action_cable/connection/heartbeat.rb b/lib/action_cable/connection/heartbeat.rb deleted file mode 100644 index 2918938ba5..0000000000 --- a/lib/action_cable/connection/heartbeat.rb +++ /dev/null @@ -1,30 +0,0 @@ -module ActionCable - module Connection - # Websocket connection implementations differ on when they'll mark a connection as stale. We basically never want a connection to go stale, as you - # then can't rely on being able to receive and send to it. So there's a 3 second heartbeat running on all connections. If the beat fails, we automatically - # disconnect. - class Heartbeat - BEAT_INTERVAL = 3 - - def initialize(connection) - @connection = connection - end - - def start - beat - @timer = EventMachine.add_periodic_timer(BEAT_INTERVAL) { beat } - end - - def stop - EventMachine.cancel_timer(@timer) if @timer - end - - private - attr_reader :connection - - def beat - connection.transmit({ identifier: '_ping', message: Time.now.to_i }.to_json) - end - end - end -end diff --git a/lib/action_cable/connection/identification.rb b/lib/action_cable/connection/identification.rb index 1be6f9ac76..431493aa70 100644 --- a/lib/action_cable/connection/identification.rb +++ b/lib/action_cable/connection/identification.rb @@ -1,3 +1,5 @@ +require 'set' + module ActionCable module Connection module Identification @@ -22,12 +24,22 @@ module ActionCable # Return a single connection identifier that combines the value of all the registered identifiers into a single gid. def connection_identifier - @connection_identifier ||= connection_gid identifiers.map { |id| instance_variable_get("@#{id}") }.compact + unless defined? @connection_identifier + @connection_identifier = connection_gid identifiers.map { |id| instance_variable_get("@#{id}") }.compact + end + + @connection_identifier end private def connection_gid(ids) - ids.map { |o| o.to_global_id.to_s }.sort.join(":") + ids.map do |o| + if o.respond_to? :to_global_id + o.to_global_id + else + o.to_s + end + end.sort.join(":") end end end diff --git a/lib/action_cable/connection/internal_channel.rb b/lib/action_cable/connection/internal_channel.rb index b00e21824c..c065a24ab7 100644 --- a/lib/action_cable/connection/internal_channel.rb +++ b/lib/action_cable/connection/internal_channel.rb @@ -15,14 +15,14 @@ module ActionCable @_internal_redis_subscriptions ||= [] @_internal_redis_subscriptions << [ internal_redis_channel, callback ] - pubsub.subscribe(internal_redis_channel, &callback) + EM.next_tick { pubsub.subscribe(internal_redis_channel, &callback) } logger.info "Registered connection (#{connection_identifier})" end end def unsubscribe_from_internal_channel if @_internal_redis_subscriptions.present? - @_internal_redis_subscriptions.each { |channel, callback| pubsub.unsubscribe_proc(channel, callback) } + @_internal_redis_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe_proc(channel, callback) } } end end diff --git a/lib/action_cable/connection/message_buffer.rb b/lib/action_cable/connection/message_buffer.rb index d5a8e9eba9..25cff75b41 100644 --- a/lib/action_cable/connection/message_buffer.rb +++ b/lib/action_cable/connection/message_buffer.rb @@ -1,6 +1,6 @@ module ActionCable module Connection - # Allows us to buffer messages received from the websocket before the Connection has been fully initialized and is ready to receive them. + # Allows us to buffer messages received from the WebSocket before the Connection has been fully initialized and is ready to receive them. # Entirely internal operation and should not be used directly by the user. class MessageBuffer def initialize(connection) @@ -50,4 +50,4 @@ module ActionCable end end end -end
\ No newline at end of file +end diff --git a/lib/action_cable/connection/subscriptions.rb b/lib/action_cable/connection/subscriptions.rb index 69e3f60706..229be2a316 100644 --- a/lib/action_cable/connection/subscriptions.rb +++ b/lib/action_cable/connection/subscriptions.rb @@ -1,3 +1,5 @@ +require 'active_support/core_ext/hash/indifferent_access' + module ActionCable module Connection # Collection class for all the channel subscriptions established on a given connection. Responsible for routing incoming commands that arrive on diff --git a/lib/action_cable/connection/tagged_logger_proxy.rb b/lib/action_cable/connection/tagged_logger_proxy.rb index 854f613f1c..34063c1d42 100644 --- a/lib/action_cable/connection/tagged_logger_proxy.rb +++ b/lib/action_cable/connection/tagged_logger_proxy.rb @@ -4,6 +4,8 @@ module ActionCable # ActiveSupport::TaggedLogging-enhanced Rails.logger, as that logger will reset the tags between requests. # The connection is long-lived, so it needs its own set of tags for its independent duration. class TaggedLoggerProxy + attr_reader :tags + def initialize(logger, tags:) @logger = logger @tags = tags.flatten @@ -22,7 +24,8 @@ module ActionCable protected def log(type, message) - @logger.tagged(*@tags) { @logger.send type, message } + current_tags = tags - @logger.formatter.current_tags + @logger.tagged(*current_tags) { @logger.send type, message } end end end diff --git a/lib/action_cable/connection/web_socket.rb b/lib/action_cable/connection/web_socket.rb index 135a28cfe4..169b683b8c 100644 --- a/lib/action_cable/connection/web_socket.rb +++ b/lib/action_cable/connection/web_socket.rb @@ -1,3 +1,5 @@ +require 'faye/websocket' + module ActionCable module Connection # Decorate the Faye::WebSocket with helpers we need. diff --git a/lib/action_cable/engine.rb b/lib/action_cable/engine.rb index 6c943c7971..613a9b99f2 100644 --- a/lib/action_cable/engine.rb +++ b/lib/action_cable/engine.rb @@ -1,4 +1,22 @@ +require 'rails/engine' +require 'active_support/ordered_options' + module ActionCable class Engine < ::Rails::Engine + config.action_cable = ActiveSupport::OrderedOptions.new + + initializer "action_cable.logger" do + ActiveSupport.on_load(:action_cable) { self.logger ||= ::Rails.logger } + end + + initializer "action_cable.set_configs" do |app| + options = app.config.action_cable + + options.allowed_request_origins ||= "http://localhost:3000" if ::Rails.env.development? + + ActiveSupport.on_load(:action_cable) do + options.each { |k,v| send("#{k}=", v) } + end + end end end diff --git a/lib/action_cable/process/logging.rb b/lib/action_cable/process/logging.rb index bcceff4bec..72b1a080d1 100644 --- a/lib/action_cable/process/logging.rb +++ b/lib/action_cable/process/logging.rb @@ -1,3 +1,7 @@ +require 'action_cable/server' +require 'eventmachine' +require 'celluloid' + EM.error_handler do |e| puts "Error raised inside the event loop: #{e.message}" puts e.backtrace.join("\n") diff --git a/lib/action_cable/server.rb b/lib/action_cable/server.rb index 919ebd94de..a2a89d5f1e 100644 --- a/lib/action_cable/server.rb +++ b/lib/action_cable/server.rb @@ -1,11 +1,19 @@ +require 'eventmachine' +EventMachine.epoll if EventMachine.epoll? +EventMachine.kqueue if EventMachine.kqueue? + module ActionCable module Server - autoload :Base, 'action_cable/server/base' - autoload :Broadcasting, 'action_cable/server/broadcasting' - autoload :Connections, 'action_cable/server/connections' - autoload :Configuration, 'action_cable/server/configuration' + extend ActiveSupport::Autoload + + eager_autoload do + autoload :Base + autoload :Broadcasting + autoload :Connections + autoload :Configuration - autoload :Worker, 'action_cable/server/worker' - autoload :ClearDatabaseConnections, 'action_cable/server/worker/clear_database_connections' + autoload :Worker + autoload :ActiveRecordConnectionManagement, 'action_cable/server/worker/active_record_connection_management' + end end end diff --git a/lib/action_cable/server/base.rb b/lib/action_cable/server/base.rb index 43849928b9..f1585dc776 100644 --- a/lib/action_cable/server/base.rb +++ b/lib/action_cable/server/base.rb @@ -1,3 +1,5 @@ +require 'em-hiredis' + module ActionCable module Server # A singleton ActionCable::Server instance is available via ActionCable.server. It's used by the rack process that starts the cable server, but @@ -18,6 +20,7 @@ module ActionCable # Called by rack to setup the server. def call(env) + setup_heartbeat_timer config.connection_class.new(self, env).process end @@ -65,5 +68,7 @@ module ActionCable config.connection_class.identifiers end end + + ActiveSupport.run_load_hooks(:action_cable, Base.config) end end diff --git a/lib/action_cable/server/broadcasting.rb b/lib/action_cable/server/broadcasting.rb index 037b98951e..6e0fbae387 100644 --- a/lib/action_cable/server/broadcasting.rb +++ b/lib/action_cable/server/broadcasting.rb @@ -1,3 +1,5 @@ +require 'redis' + module ActionCable module Server # Broadcasting is how other parts of your application can send messages to the channel subscribers. As explained in Channel, most of the time, these @@ -44,9 +46,9 @@ module ActionCable def broadcast(message) server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message}" - server.broadcasting_redis.publish broadcasting, message.to_json + server.broadcasting_redis.publish broadcasting, ActiveSupport::JSON.encode(message) end end end end -end
\ No newline at end of file +end diff --git a/lib/action_cable/server/configuration.rb b/lib/action_cable/server/configuration.rb index ac9fa7b085..b22de273b8 100644 --- a/lib/action_cable/server/configuration.rb +++ b/lib/action_cable/server/configuration.rb @@ -1,3 +1,5 @@ +require 'active_support/core_ext/hash/indifferent_access' + module ActionCable module Server # An instance of this configuration object is available via ActionCable.server.config, which allows you to tweak the configuration points @@ -6,6 +8,7 @@ module ActionCable attr_accessor :logger, :log_tags attr_accessor :connection_class, :worker_pool_size attr_accessor :redis_path, :channels_path + attr_accessor :disable_request_forgery_protection, :allowed_request_origins def initialize @logger = Rails.logger @@ -16,6 +19,8 @@ module ActionCable @redis_path = Rails.root.join('config/redis/cable.yml') @channels_path = Rails.root.join('app/channels') + + @disable_request_forgery_protection = false end def channel_paths diff --git a/lib/action_cable/server/connections.rb b/lib/action_cable/server/connections.rb index 15d7c3c8c7..47dcea8c20 100644 --- a/lib/action_cable/server/connections.rb +++ b/lib/action_cable/server/connections.rb @@ -4,6 +4,8 @@ module ActionCable # you can't use this collection as an full list of all the connections established against your application. Use RemoteConnections for that. # As such, this is primarily for internal use. module Connections + BEAT_INTERVAL = 3 + def connections @connections ||= [] end @@ -16,9 +18,20 @@ module ActionCable connections.delete connection end + # WebSocket connection implementations differ on when they'll mark a connection as stale. We basically never want a connection to go stale, as you + # then can't rely on being able to receive and send to it. So there's a 3 second heartbeat running on all connections. If the beat fails, we automatically + # disconnect. + def setup_heartbeat_timer + EM.next_tick do + @heartbeat_timer ||= EventMachine.add_periodic_timer(BEAT_INTERVAL) do + EM.next_tick { connections.map(&:beat) } + end + end + end + def open_connections_statistics connections.map(&:statistics) end end end -end
\ No newline at end of file +end diff --git a/lib/action_cable/server/worker.rb b/lib/action_cable/server/worker.rb index d7823ecf93..e063b2a2e1 100644 --- a/lib/action_cable/server/worker.rb +++ b/lib/action_cable/server/worker.rb @@ -1,3 +1,6 @@ +require 'celluloid' +require 'active_support/callbacks' + module ActionCable module Server # Worker used by Server.send_async to do connection work in threads. Only for internal use. @@ -5,10 +8,13 @@ module ActionCable include ActiveSupport::Callbacks include Celluloid + attr_reader :connection define_callbacks :work - include ClearDatabaseConnections + include ActiveRecordConnectionManagement def invoke(receiver, method, *args) + @connection = receiver + run_callbacks :work do receiver.send method, *args end @@ -20,6 +26,8 @@ module ActionCable end def run_periodic_timer(channel, callback) + @connection = channel.connection + run_callbacks :work do callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback) end @@ -31,4 +39,4 @@ module ActionCable end end end -end
\ No newline at end of file +end diff --git a/lib/action_cable/server/worker/clear_database_connections.rb b/lib/action_cable/server/worker/active_record_connection_management.rb index 722d363a41..1ede0095f8 100644 --- a/lib/action_cable/server/worker/clear_database_connections.rb +++ b/lib/action_cable/server/worker/active_record_connection_management.rb @@ -2,7 +2,7 @@ module ActionCable module Server class Worker # Clear active connections between units of work so the long-running channel or connection processes do not hoard connections. - module ClearDatabaseConnections + module ActiveRecordConnectionManagement extend ActiveSupport::Concern included do @@ -12,7 +12,7 @@ module ActionCable end def with_database_connections - yield + ActiveRecord::Base.logger.tagged(*connection.logger.tags) { yield } ensure ActiveRecord::Base.clear_active_connections! end diff --git a/lib/assets/javascripts/cable.js.coffee b/lib/assets/javascripts/cable.coffee index 0bd1757505..476d90ef72 100644 --- a/lib/assets/javascripts/cable.js.coffee +++ b/lib/assets/javascripts/cable.coffee @@ -3,6 +3,8 @@ @Cable = PING_IDENTIFIER: "_ping" + INTERNAL_MESSAGES: + SUBSCRIPTION_CONFIRMATION: 'confirm_subscription' createConsumer: (url) -> new Cable.Consumer url diff --git a/lib/assets/javascripts/cable/connection.coffee b/lib/assets/javascripts/cable/connection.coffee new file mode 100644 index 0000000000..33159130c7 --- /dev/null +++ b/lib/assets/javascripts/cable/connection.coffee @@ -0,0 +1,80 @@ +# Encapsulate the cable connection held by the consumer. This is an internal class not intended for direct user manipulation. +class Cable.Connection + @reopenDelay: 500 + + constructor: (@consumer) -> + @open() + + send: (data) -> + if @isOpen() + @webSocket.send(JSON.stringify(data)) + true + else + false + + open: => + if @webSocket and not @isState("closed") + throw new Error("Existing connection must be closed before opening") + else + @webSocket = new WebSocket(@consumer.url) + @installEventHandlers() + true + + close: -> + @webSocket?.close() + + reopen: -> + if @isState("closed") + @open() + else + try + @close() + finally + setTimeout(@open, @constructor.reopenDelay) + + isOpen: -> + @isState("open") + + # Private + + isState: (states...) -> + @getState() in states + + getState: -> + return state.toLowerCase() for state, value of WebSocket when value is @webSocket?.readyState + null + + installEventHandlers: -> + for eventName of @events + handler = @events[eventName].bind(this) + @webSocket["on#{eventName}"] = handler + return + + events: + message: (event) -> + {identifier, message, type} = JSON.parse(event.data) + + if type? + switch type + when Cable.INTERNAL_MESSAGES.SUBSCRIPTION_CONFIRMATION + @consumer.subscriptions.notify(identifier, "connected") + else + @consumer.subscriptions.notify(identifier, "received", message) + + open: -> + @disconnected = false + @consumer.subscriptions.reload() + + close: -> + @disconnect() + + error: -> + @disconnect() + + disconnect: -> + return if @disconnected + @disconnected = true + @consumer.subscriptions.notifyAll("disconnected") + + toJSON: -> + state: @getState() diff --git a/lib/assets/javascripts/cable/connection.js.coffee b/lib/assets/javascripts/cable/connection.js.coffee deleted file mode 100644 index 464f0c1ff7..0000000000 --- a/lib/assets/javascripts/cable/connection.js.coffee +++ /dev/null @@ -1,77 +0,0 @@ -# Encapsulate the cable connection held by the consumer. This is an internal class not intended for direct user manipulation. -class Cable.Connection - constructor: (@consumer) -> - @open() - - send: (data) -> - if @isOpen() - @webSocket.send(JSON.stringify(data)) - true - else - false - - open: -> - return if @isState("open", "connecting") - @webSocket = new WebSocket(@consumer.url) - @installEventHandlers() - - close: -> - return if @isState("closed", "closing") - @webSocket?.close() - - reopen: -> - if @isOpen() - @closeSilently => @open() - else - @open() - - isOpen: -> - @isState("open") - - # Private - - isState: (states...) -> - @getState() in states - - getState: -> - return state.toLowerCase() for state, value of WebSocket when value is @webSocket?.readyState - null - - closeSilently: (callback = ->) -> - @uninstallEventHandlers() - @installEventHandler("close", callback) - @installEventHandler("error", callback) - try - @webSocket.close() - finally - @uninstallEventHandlers() - - installEventHandlers: -> - for eventName of @events - @installEventHandler(eventName) - - installEventHandler: (eventName, handler) -> - handler ?= @events[eventName].bind(this) - @webSocket.addEventListener(eventName, handler) - - uninstallEventHandlers: -> - for eventName of @events - @webSocket.removeEventListener(eventName) - - events: - message: (event) -> - {identifier, message} = JSON.parse(event.data) - @consumer.subscriptions.notify(identifier, "received", message) - - open: -> - @consumer.subscriptions.reload() - - close: -> - @consumer.subscriptions.notifyAll("disconnected") - - error: -> - @consumer.subscriptions.notifyAll("disconnected") - @closeSilently() - - toJSON: -> - state: @getState() diff --git a/lib/assets/javascripts/cable/connection_monitor.js.coffee b/lib/assets/javascripts/cable/connection_monitor.coffee index cac65d9043..bf99dee34d 100644 --- a/lib/assets/javascripts/cable/connection_monitor.js.coffee +++ b/lib/assets/javascripts/cable/connection_monitor.coffee @@ -1,15 +1,13 @@ # Responsible for ensuring the cable connection is in good health by validating the heartbeat pings sent from the server, and attempting # revival reconnections if things go astray. Internal class, not intended for direct user manipulation. class Cable.ConnectionMonitor - identifier: Cable.PING_IDENTIFIER - - pollInterval: - min: 2 + @pollInterval: + min: 3 max: 30 - staleThreshold: - startedAt: 4 - pingedAt: 8 + @staleThreshold: 6 # Server::Connections::BEAT_INTERVAL * 2 (missed two pings) + + identifier: Cable.PING_IDENTIFIER constructor: (@consumer) -> @consumer.subscriptions.add(this) @@ -18,6 +16,10 @@ class Cable.ConnectionMonitor connected: -> @reset() @pingedAt = now() + delete @disconnectedAt + + disconnected: -> + @disconnectedAt = now() received: -> @pingedAt = now() @@ -30,9 +32,11 @@ class Cable.ConnectionMonitor delete @stoppedAt @startedAt = now() @poll() + document.addEventListener("visibilitychange", @visibilityDidChange) stop: -> @stoppedAt = now() + document.removeEventListener("visibilitychange", @visibilityDidChange) poll: -> setTimeout => @@ -42,20 +46,28 @@ class Cable.ConnectionMonitor , @getInterval() getInterval: -> - {min, max} = @pollInterval - interval = 4 * Math.log(@reconnectAttempts + 1) + {min, max} = @constructor.pollInterval + interval = 5 * Math.log(@reconnectAttempts + 1) clamp(interval, min, max) * 1000 reconnectIfStale: -> if @connectionIsStale() - @reconnectAttempts += 1 - @consumer.connection.reopen() + @reconnectAttempts++ + unless @disconnectedRecently() + @consumer.connection.reopen() connectionIsStale: -> - if @pingedAt - secondsSince(@pingedAt) > @staleThreshold.pingedAt - else - secondsSince(@startedAt) > @staleThreshold.startedAt + secondsSince(@pingedAt ? @startedAt) > @constructor.staleThreshold + + disconnectedRecently: -> + @disconnectedAt and secondsSince(@disconnectedAt) < @constructor.staleThreshold + + visibilityDidChange: => + if document.visibilityState is "visible" + setTimeout => + if @connectionIsStale() or not @consumer.connection.isOpen() + @consumer.connection.reopen() + , 200 toJSON: -> interval = @getInterval() diff --git a/lib/assets/javascripts/cable/consumer.js.coffee b/lib/assets/javascripts/cable/consumer.coffee index 05a7398e79..05a7398e79 100644 --- a/lib/assets/javascripts/cable/consumer.js.coffee +++ b/lib/assets/javascripts/cable/consumer.coffee diff --git a/lib/assets/javascripts/cable/subscription.js.coffee b/lib/assets/javascripts/cable/subscription.coffee index 5b024d4e15..5b024d4e15 100644 --- a/lib/assets/javascripts/cable/subscription.js.coffee +++ b/lib/assets/javascripts/cable/subscription.coffee diff --git a/lib/assets/javascripts/cable/subscriptions.js.coffee b/lib/assets/javascripts/cable/subscriptions.coffee index fe6975c870..4efb384ee2 100644 --- a/lib/assets/javascripts/cable/subscriptions.js.coffee +++ b/lib/assets/javascripts/cable/subscriptions.coffee @@ -9,6 +9,7 @@ class Cable.Subscriptions constructor: (@consumer) -> @subscriptions = [] + @history = [] create: (channelName, mixin) -> channel = channelName @@ -20,13 +21,11 @@ class Cable.Subscriptions add: (subscription) -> @subscriptions.push(subscription) @notify(subscription, "initialized") - if @sendCommand(subscription, "subscribe") - @notify(subscription, "connected") + @sendCommand(subscription, "subscribe") reload: -> for subscription in @subscriptions - if @sendCommand(subscription, "subscribe") - @notify(subscription, "connected") + @sendCommand(subscription, "subscribe") remove: (subscription) -> @subscriptions = (s for s in @subscriptions when s isnt subscription) @@ -49,6 +48,10 @@ class Cable.Subscriptions for subscription in subscriptions subscription[callbackName]?(args...) + if callbackName in ["initialized", "connected", "disconnected"] + {identifier} = subscription + @record(notification: {identifier, callbackName, args}) + sendCommand: (subscription, command) -> {identifier} = subscription if identifier is Cable.PING_IDENTIFIER @@ -56,5 +59,11 @@ class Cable.Subscriptions else @consumer.send({command, identifier}) + record: (data) -> + data.time = new Date() + @history = @history.slice(-19) + @history.push(data) + toJSON: -> - subscription.identifier for subscription in @subscriptions + history: @history + identifiers: (subscription.identifier for subscription in @subscriptions) |