diff options
Diffstat (limited to 'lib')
31 files changed, 445 insertions, 253 deletions
diff --git a/lib/action_cable.rb b/lib/action_cable.rb index 13c5c77578..3919812161 100644 --- a/lib/action_cable.rb +++ b/lib/action_cable.rb @@ -1,33 +1,31 @@ -require 'eventmachine' -EM.epoll - -require 'set' - require 'active_support' -require 'active_support/json' -require 'active_support/concern' -require 'active_support/core_ext/hash/indifferent_access' -require 'active_support/core_ext/module/delegation' -require 'active_support/callbacks' - -require 'faye/websocket' -require 'celluloid' -require 'em-hiredis' -require 'redis' - -require 'action_cable/engine' if defined?(Rails) +require 'active_support/rails' require 'action_cable/version' module ActionCable - autoload :Server, 'action_cable/server' - autoload :Connection, 'action_cable/connection' - autoload :Channel, 'action_cable/channel' + extend ActiveSupport::Autoload - autoload :RemoteConnections, 'action_cable/remote_connections' - autoload :Broadcaster, 'action_cable/broadcaster' + INTERNAL = { + identifiers: { + ping: '_ping'.freeze + }, + message_types: { + confirmation: 'confirm_subscription'.freeze, + rejection: 'reject_subscription'.freeze + } + } # Singleton instance of the server module_function def server @server ||= ActionCable::Server::Base.new end + + eager_autoload do + autoload :Server + autoload :Connection + autoload :Channel + autoload :RemoteConnections + end end + +require 'action_cable/engine' if defined?(Rails) diff --git a/lib/action_cable/channel.rb b/lib/action_cable/channel.rb index 3b973ba0a7..7ae262ce5f 100644 --- a/lib/action_cable/channel.rb +++ b/lib/action_cable/channel.rb @@ -1,10 +1,14 @@ module ActionCable module Channel - autoload :Base, 'action_cable/channel/base' - autoload :Broadcasting, 'action_cable/channel/broadcasting' - autoload :Callbacks, 'action_cable/channel/callbacks' - autoload :Naming, 'action_cable/channel/naming' - autoload :PeriodicTimers, 'action_cable/channel/periodic_timers' - autoload :Streams, 'action_cable/channel/streams' + extend ActiveSupport::Autoload + + eager_autoload do + autoload :Base + autoload :Broadcasting + autoload :Callbacks + autoload :Naming + autoload :PeriodicTimers + autoload :Streams + end end end diff --git a/lib/action_cable/channel/base.rb b/lib/action_cable/channel/base.rb index 2f1b4a187d..ca903a810d 100644 --- a/lib/action_cable/channel/base.rb +++ b/lib/action_cable/channel/base.rb @@ -1,6 +1,8 @@ +require 'set' + module ActionCable module Channel - # The channel provides the basic structure of grouping behavior into logical units when communicating over the websocket connection. + # The channel provides the basic structure of grouping behavior into logical units when communicating over the WebSocket connection. # You can think of a channel like a form of controller, but one that's capable of pushing content to the subscriber in addition to simply # responding to the subscriber's direct requests. # @@ -64,6 +66,22 @@ module ActionCable # # Also note that in this example, current_user is available because it was marked as an identifying attribute on the connection. # All such identifiers will automatically create a delegation method of the same name on the channel instance. + # + # == Rejecting subscription requests + # + # A channel can reject a subscription request in the #subscribed callback by invoking #reject! + # + # Example: + # + # class ChatChannel < ApplicationCable::Channel + # def subscribed + # @room = Chat::Room[params[:room_number]] + # reject unless current_user.can_access?(@room) + # end + # end + # + # In this example, the subscription will be rejected if the current_user does not have access to the chat room. + # On the client-side, Channel#rejected callback will get invoked when the server rejects the subscription request. class Base include Callbacks include PeriodicTimers @@ -71,10 +89,7 @@ module ActionCable include Naming include Broadcasting - on_subscribe :subscribed - on_unsubscribe :unsubscribed - - attr_reader :params, :connection + attr_reader :params, :connection, :identifier delegate :logger, to: :connection class << self @@ -118,6 +133,10 @@ module ActionCable @identifier = identifier @params = params + # When a channel is streaming via redis pubsub, we want to delay the confirmation + # transmission until redis pubsub subscription is confirmed. + @defer_subscription_confirmation = false + delegate_connection_identifiers subscribe_to_channel end @@ -138,8 +157,9 @@ module ActionCable # Called by the cable connection when its cut so the channel has a chance to cleanup with callbacks. # This method is not intended to be called directly by the user. Instead, overwrite the #unsubscribed callback. def unsubscribe_from_channel - run_unsubscribe_callbacks - logger.info "#{self.class.name} unsubscribed" + run_callbacks :unsubscribe do + unsubscribed + end end @@ -160,9 +180,28 @@ module ActionCable # the proper channel identifier marked as the recipient. def transmit(data, via: nil) logger.info "#{self.class.name} transmitting #{data.inspect}".tap { |m| m << " (via #{via})" if via } - connection.transmit({ identifier: @identifier, message: data }.to_json) + connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, message: data) end + def defer_subscription_confirmation! + @defer_subscription_confirmation = true + end + + def defer_subscription_confirmation? + @defer_subscription_confirmation + end + + def subscription_confirmation_sent? + @subscription_confirmation_sent + end + + def reject + @reject_subscription = true + end + + def subscription_rejected? + @reject_subscription + end private def delegate_connection_identifiers @@ -175,8 +214,15 @@ module ActionCable def subscribe_to_channel - logger.info "#{self.class.name} subscribing" - run_subscribe_callbacks + run_callbacks :subscribe do + subscribed + end + + if subscription_rejected? + reject_subscription + else + transmit_subscription_confirmation unless defer_subscription_confirmation? + end end @@ -206,12 +252,22 @@ module ActionCable end end - def run_subscribe_callbacks - self.class.on_subscribe_callbacks.each { |callback| send(callback) } + def transmit_subscription_confirmation + unless subscription_confirmation_sent? + logger.info "#{self.class.name} is transmitting the subscription confirmation" + connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:confirmation]) + @subscription_confirmation_sent = true + end + end + + def reject_subscription + connection.subscriptions.remove_subscription self + transmit_subscription_rejection end - def run_unsubscribe_callbacks - self.class.on_unsubscribe_callbacks.each { |callback| send(callback) } + def transmit_subscription_rejection + logger.info "#{self.class.name} is transmitting the subscription rejection" + connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:rejection]) end end end diff --git a/lib/action_cable/channel/callbacks.rb b/lib/action_cable/channel/callbacks.rb index dcdd27b9a7..295d750e86 100644 --- a/lib/action_cable/channel/callbacks.rb +++ b/lib/action_cable/channel/callbacks.rb @@ -1,28 +1,35 @@ +require 'active_support/callbacks' + module ActionCable module Channel module Callbacks - extend ActiveSupport::Concern + extend ActiveSupport::Concern + include ActiveSupport::Callbacks included do - class_attribute :on_subscribe_callbacks, :on_unsubscribe_callbacks, instance_reader: false - - self.on_subscribe_callbacks = [] - self.on_unsubscribe_callbacks = [] + define_callbacks :subscribe + define_callbacks :unsubscribe end - module ClassMethods - # Name methods that should be called when the channel is subscribed to. - # (These methods should be private, so they're not callable by the user). - def on_subscribe(*methods) - self.on_subscribe_callbacks += methods + class_methods do + def before_subscribe(*methods, &block) + set_callback(:subscribe, :before, *methods, &block) + end + + def after_subscribe(*methods, &block) + set_callback(:subscribe, :after, *methods, &block) + end + alias_method :on_subscribe, :after_subscribe + + def before_unsubscribe(*methods, &block) + set_callback(:unsubscribe, :before, *methods, &block) end - # Name methods that should be called when the channel is unsubscribed from. - # (These methods should be private, so they're not callable by the user). - def on_unsubscribe(*methods) - self.on_unsubscribe_callbacks += methods + def after_unsubscribe(*methods, &block) + set_callback(:unsubscribe, :after, *methods, &block) end + alias_method :on_unsubscribe, :after_unsubscribe end end end -end
\ No newline at end of file +end diff --git a/lib/action_cable/channel/periodic_timers.rb b/lib/action_cable/channel/periodic_timers.rb index 9bdcc87aa5..25fe8e5e54 100644 --- a/lib/action_cable/channel/periodic_timers.rb +++ b/lib/action_cable/channel/periodic_timers.rb @@ -7,8 +7,8 @@ module ActionCable class_attribute :periodic_timers, instance_reader: false self.periodic_timers = [] - on_subscribe :start_periodic_timers - on_unsubscribe :stop_periodic_timers + after_subscribe :start_periodic_timers + after_unsubscribe :stop_periodic_timers end module ClassMethods @@ -38,4 +38,4 @@ module ActionCable end end end -end
\ No newline at end of file +end diff --git a/lib/action_cable/channel/streams.rb b/lib/action_cable/channel/streams.rb index a37194b884..b5ffa17f72 100644 --- a/lib/action_cable/channel/streams.rb +++ b/lib/action_cable/channel/streams.rb @@ -69,12 +69,18 @@ module ActionCable # Start streaming from the named <tt>broadcasting</tt> pubsub queue. Optionally, you can pass a <tt>callback</tt> that'll be used # instead of the default of just transmitting the updates straight to the subscriber. def stream_from(broadcasting, callback = nil) - callback ||= default_stream_callback(broadcasting) + # Hold off the confirmation until pubsub#subscribe is successful + defer_subscription_confirmation! + callback ||= default_stream_callback(broadcasting) streams << [ broadcasting, callback ] - pubsub.subscribe broadcasting, &callback - logger.info "#{self.class.name} is streaming from #{broadcasting}" + EM.next_tick do + pubsub.subscribe(broadcasting, &callback).callback do |reply| + transmit_subscription_confirmation + logger.info "#{self.class.name} is streaming from #{broadcasting}" + end + end end # Start streaming the pubsub queue for the <tt>model</tt> in this channel. Optionally, you can pass a @@ -88,7 +94,7 @@ module ActionCable streams.each do |broadcasting, callback| pubsub.unsubscribe_proc broadcasting, callback logger.info "#{self.class.name} stopped streaming from #{broadcasting}" - end + end.clear end private diff --git a/lib/action_cable/connection.rb b/lib/action_cable/connection.rb index c63621c519..b672e00682 100644 --- a/lib/action_cable/connection.rb +++ b/lib/action_cable/connection.rb @@ -1,13 +1,16 @@ module ActionCable module Connection - autoload :Authorization, 'action_cable/connection/authorization' - autoload :Base, 'action_cable/connection/base' - autoload :Heartbeat, 'action_cable/connection/heartbeat' - autoload :Identification, 'action_cable/connection/identification' - autoload :InternalChannel, 'action_cable/connection/internal_channel' - autoload :MessageBuffer, 'action_cable/connection/message_buffer' - autoload :WebSocket, 'action_cable/connection/web_socket' - autoload :Subscriptions, 'action_cable/connection/subscriptions' - autoload :TaggedLoggerProxy, 'action_cable/connection/tagged_logger_proxy' + extend ActiveSupport::Autoload + + eager_autoload do + autoload :Authorization + autoload :Base + autoload :Identification + autoload :InternalChannel + autoload :MessageBuffer + autoload :WebSocket + autoload :Subscriptions + autoload :TaggedLoggerProxy + end end end diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index 08a75156a3..b93b6a8a50 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -1,8 +1,8 @@ -require 'action_dispatch/http/request' +require 'action_dispatch' module ActionCable module Connection - # For every websocket the cable server is accepting, a Connection object will be instantiated. This instance becomes the parent + # For every WebSocket the cable server is accepting, a Connection object will be instantiated. This instance becomes the parent # of all the channel subscriptions that are created from there on. Incoming messages are then routed to these channel subscriptions # based on an identifier sent by the cable consumer. The Connection itself does not deal with any specific application logic beyond # authentication and authorization. @@ -37,8 +37,8 @@ module ActionCable # established for that current_user (and potentially disconnect them if the user was removed from an account). You can declare as many # identification indexes as you like. Declaring an identification means that a attr_accessor is automatically set for that key. # - # Second, we rely on the fact that the websocket connection is established with the cookies from the domain being sent along. This makes - # it easy to use signed cookies that were set when logging in via a web interface to authorize the websocket connection. + # Second, we rely on the fact that the WebSocket connection is established with the cookies from the domain being sent along. This makes + # it easy to use signed cookies that were set when logging in via a web interface to authorize the WebSocket connection. # # Finally, we add a tag to the connection-specific logger with name of the current user to easily distinguish their messages in the log. # @@ -48,7 +48,7 @@ module ActionCable include InternalChannel include Authorization - attr_reader :server, :env + attr_reader :server, :env, :subscriptions delegate :worker_pool, :pubsub, to: :server attr_reader :logger @@ -59,19 +59,18 @@ module ActionCable @logger = new_tagged_logger @websocket = ActionCable::Connection::WebSocket.new(env) - @heartbeat = ActionCable::Connection::Heartbeat.new(self) @subscriptions = ActionCable::Connection::Subscriptions.new(self) @message_buffer = ActionCable::Connection::MessageBuffer.new(self) @started_at = Time.now end - # Called by the server when a new websocket connection is established. This configures the callbacks intended for overwriting by the user. + # Called by the server when a new WebSocket connection is established. This configures the callbacks intended for overwriting by the user. # This method should not be called directly. Rely on the #connect (and #disconnect) callback instead. def process logger.info started_request_message - if websocket.possible? + if websocket.possible? && allow_request_origin? websocket.on(:open) { |event| send_async :on_open } websocket.on(:message) { |event| on_message event.data } websocket.on(:close) { |event| send_async :on_close } @@ -88,19 +87,18 @@ module ActionCable if websocket.alive? subscriptions.execute_command ActiveSupport::JSON.decode(data_in_json) else - logger.error "Received data without a live websocket (#{data.inspect})" + logger.error "Received data without a live WebSocket (#{data_in_json.inspect})" end end - # Send raw data straight back down the websocket. This is not intended to be called directly. Use the #transmit available on the + # Send raw data straight back down the WebSocket. This is not intended to be called directly. Use the #transmit available on the # Channel instead, as that'll automatically address the correct subscriber and wrap the message in JSON. def transmit(data) websocket.transmit data end - # Close the websocket connection. + # Close the WebSocket connection. def close - logger.error "Closing connection" websocket.close end @@ -112,12 +110,21 @@ module ActionCable # Return a basic hash of statistics for the connection keyed with `identifier`, `started_at`, and `subscriptions`. # This can be returned by a health check against the connection. def statistics - { identifier: connection_identifier, started_at: @started_at, subscriptions: subscriptions.identifiers } + { + identifier: connection_identifier, + started_at: @started_at, + subscriptions: subscriptions.identifiers, + request_id: @env['action_dispatch.request_id'] + } + end + + def beat + transmit ActiveSupport::JSON.encode(identifier: ActionCable::INTERNAL[:identifiers][:ping], message: Time.now.to_i) end protected - # The request that initiated the websocket connection is available here. This gives access to the environment, cookies, etc. + # The request that initiated the WebSocket connection is available here. This gives access to the environment, cookies, etc. def request @request ||= begin environment = Rails.application.env_config.merge(env) if defined?(Rails.application) && Rails.application @@ -125,7 +132,7 @@ module ActionCable end end - # The cookies of the request that initiated the websocket connection. Useful for performing authorization checks. + # The cookies of the request that initiated the WebSocket connection. Useful for performing authorization checks. def cookies request.cookie_jar end @@ -133,19 +140,17 @@ module ActionCable private attr_reader :websocket - attr_reader :heartbeat, :subscriptions, :message_buffer + attr_reader :message_buffer def on_open - server.add_connection(self) - connect if respond_to?(:connect) subscribe_to_internal_channel - heartbeat.start + beat message_buffer.process! + server.add_connection(self) rescue ActionCable::Connection::Authorization::UnauthorizedError respond_to_invalid_request - close end def on_message(message) @@ -159,17 +164,29 @@ module ActionCable subscriptions.unsubscribe_from_all unsubscribe_from_internal_channel - heartbeat.stop disconnect if respond_to?(:disconnect) end + def allow_request_origin? + return true if server.config.disable_request_forgery_protection + + if Array(server.config.allowed_request_origins).include? env['HTTP_ORIGIN'] + true + else + logger.error("Request origin not allowed: #{env['HTTP_ORIGIN']}") + false + end + end + def respond_to_successful_request websocket.rack_response end def respond_to_invalid_request + close if websocket.alive? + logger.info finished_request_message [ 404, { 'Content-Type' => 'text/plain' }, [ 'Page not found' ] ] end @@ -185,17 +202,17 @@ module ActionCable 'Started %s "%s"%s for %s at %s' % [ request.request_method, request.filtered_path, - websocket.possible? ? ' [Websocket]' : '', + websocket.possible? ? ' [WebSocket]' : '', request.ip, - Time.now.to_default_s ] + Time.now.to_s ] end def finished_request_message 'Finished "%s"%s for %s at %s' % [ request.filtered_path, - websocket.possible? ? ' [Websocket]' : '', + websocket.possible? ? ' [WebSocket]' : '', request.ip, - Time.now.to_default_s ] + Time.now.to_s ] end end end diff --git a/lib/action_cable/connection/heartbeat.rb b/lib/action_cable/connection/heartbeat.rb deleted file mode 100644 index 2918938ba5..0000000000 --- a/lib/action_cable/connection/heartbeat.rb +++ /dev/null @@ -1,30 +0,0 @@ -module ActionCable - module Connection - # Websocket connection implementations differ on when they'll mark a connection as stale. We basically never want a connection to go stale, as you - # then can't rely on being able to receive and send to it. So there's a 3 second heartbeat running on all connections. If the beat fails, we automatically - # disconnect. - class Heartbeat - BEAT_INTERVAL = 3 - - def initialize(connection) - @connection = connection - end - - def start - beat - @timer = EventMachine.add_periodic_timer(BEAT_INTERVAL) { beat } - end - - def stop - EventMachine.cancel_timer(@timer) if @timer - end - - private - attr_reader :connection - - def beat - connection.transmit({ identifier: '_ping', message: Time.now.to_i }.to_json) - end - end - end -end diff --git a/lib/action_cable/connection/identification.rb b/lib/action_cable/connection/identification.rb index 1be6f9ac76..2d75ff8d6d 100644 --- a/lib/action_cable/connection/identification.rb +++ b/lib/action_cable/connection/identification.rb @@ -1,3 +1,5 @@ +require 'set' + module ActionCable module Connection module Identification @@ -22,12 +24,22 @@ module ActionCable # Return a single connection identifier that combines the value of all the registered identifiers into a single gid. def connection_identifier - @connection_identifier ||= connection_gid identifiers.map { |id| instance_variable_get("@#{id}") }.compact + unless defined? @connection_identifier + @connection_identifier = connection_gid identifiers.map { |id| instance_variable_get("@#{id}") }.compact + end + + @connection_identifier end private def connection_gid(ids) - ids.map { |o| o.to_global_id.to_s }.sort.join(":") + ids.map do |o| + if o.respond_to? :to_gid_param + o.to_gid_param + else + o.to_s + end + end.sort.join(":") end end end diff --git a/lib/action_cable/connection/internal_channel.rb b/lib/action_cable/connection/internal_channel.rb index b00e21824c..c065a24ab7 100644 --- a/lib/action_cable/connection/internal_channel.rb +++ b/lib/action_cable/connection/internal_channel.rb @@ -15,14 +15,14 @@ module ActionCable @_internal_redis_subscriptions ||= [] @_internal_redis_subscriptions << [ internal_redis_channel, callback ] - pubsub.subscribe(internal_redis_channel, &callback) + EM.next_tick { pubsub.subscribe(internal_redis_channel, &callback) } logger.info "Registered connection (#{connection_identifier})" end end def unsubscribe_from_internal_channel if @_internal_redis_subscriptions.present? - @_internal_redis_subscriptions.each { |channel, callback| pubsub.unsubscribe_proc(channel, callback) } + @_internal_redis_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe_proc(channel, callback) } } end end diff --git a/lib/action_cable/connection/message_buffer.rb b/lib/action_cable/connection/message_buffer.rb index d5a8e9eba9..25cff75b41 100644 --- a/lib/action_cable/connection/message_buffer.rb +++ b/lib/action_cable/connection/message_buffer.rb @@ -1,6 +1,6 @@ module ActionCable module Connection - # Allows us to buffer messages received from the websocket before the Connection has been fully initialized and is ready to receive them. + # Allows us to buffer messages received from the WebSocket before the Connection has been fully initialized and is ready to receive them. # Entirely internal operation and should not be used directly by the user. class MessageBuffer def initialize(connection) @@ -50,4 +50,4 @@ module ActionCable end end end -end
\ No newline at end of file +end diff --git a/lib/action_cable/connection/subscriptions.rb b/lib/action_cable/connection/subscriptions.rb index 69e3f60706..6199db4898 100644 --- a/lib/action_cable/connection/subscriptions.rb +++ b/lib/action_cable/connection/subscriptions.rb @@ -1,3 +1,5 @@ +require 'active_support/core_ext/hash/indifferent_access' + module ActionCable module Connection # Collection class for all the channel subscriptions established on a given connection. Responsible for routing incoming commands that arrive on @@ -35,8 +37,12 @@ module ActionCable def remove(data) logger.info "Unsubscribing from channel: #{data['identifier']}" - subscriptions[data['identifier']].unsubscribe_from_channel - subscriptions.delete(data['identifier']) + remove_subscription subscriptions[data['identifier']] + end + + def remove_subscription(subscription) + subscription.unsubscribe_from_channel + subscriptions.delete(subscription.identifier) end def perform_action(data) diff --git a/lib/action_cable/connection/tagged_logger_proxy.rb b/lib/action_cable/connection/tagged_logger_proxy.rb index 854f613f1c..e5319087fb 100644 --- a/lib/action_cable/connection/tagged_logger_proxy.rb +++ b/lib/action_cable/connection/tagged_logger_proxy.rb @@ -4,6 +4,8 @@ module ActionCable # ActiveSupport::TaggedLogging-enhanced Rails.logger, as that logger will reset the tags between requests. # The connection is long-lived, so it needs its own set of tags for its independent duration. class TaggedLoggerProxy + attr_reader :tags + def initialize(logger, tags:) @logger = logger @tags = tags.flatten @@ -14,6 +16,15 @@ module ActionCable @tags = @tags.uniq end + def tag(logger) + if logger.respond_to?(:tagged) + current_tags = tags - logger.formatter.current_tags + logger.tagged(*current_tags) { yield } + else + yield + end + end + %i( debug info warn error fatal unknown ).each do |severity| define_method(severity) do |message| log severity, message @@ -22,7 +33,7 @@ module ActionCable protected def log(type, message) - @logger.tagged(*@tags) { @logger.send type, message } + tag(@logger) { @logger.send type, message } end end end diff --git a/lib/action_cable/connection/web_socket.rb b/lib/action_cable/connection/web_socket.rb index 135a28cfe4..169b683b8c 100644 --- a/lib/action_cable/connection/web_socket.rb +++ b/lib/action_cable/connection/web_socket.rb @@ -1,3 +1,5 @@ +require 'faye/websocket' + module ActionCable module Connection # Decorate the Faye::WebSocket with helpers we need. diff --git a/lib/action_cable/engine.rb b/lib/action_cable/engine.rb index 6c943c7971..613a9b99f2 100644 --- a/lib/action_cable/engine.rb +++ b/lib/action_cable/engine.rb @@ -1,4 +1,22 @@ +require 'rails/engine' +require 'active_support/ordered_options' + module ActionCable class Engine < ::Rails::Engine + config.action_cable = ActiveSupport::OrderedOptions.new + + initializer "action_cable.logger" do + ActiveSupport.on_load(:action_cable) { self.logger ||= ::Rails.logger } + end + + initializer "action_cable.set_configs" do |app| + options = app.config.action_cable + + options.allowed_request_origins ||= "http://localhost:3000" if ::Rails.env.development? + + ActiveSupport.on_load(:action_cable) do + options.each { |k,v| send("#{k}=", v) } + end + end end end diff --git a/lib/action_cable/process/logging.rb b/lib/action_cable/process/logging.rb index 827a58fdb8..618ba7357a 100644 --- a/lib/action_cable/process/logging.rb +++ b/lib/action_cable/process/logging.rb @@ -1,3 +1,7 @@ +require 'action_cable/server' +require 'eventmachine' +require 'celluloid' + EM.error_handler do |e| puts "Error raised inside the event loop: #{e.message}" puts e.backtrace.join("\n") diff --git a/lib/action_cable/server.rb b/lib/action_cable/server.rb index 919ebd94de..a2a89d5f1e 100644 --- a/lib/action_cable/server.rb +++ b/lib/action_cable/server.rb @@ -1,11 +1,19 @@ +require 'eventmachine' +EventMachine.epoll if EventMachine.epoll? +EventMachine.kqueue if EventMachine.kqueue? + module ActionCable module Server - autoload :Base, 'action_cable/server/base' - autoload :Broadcasting, 'action_cable/server/broadcasting' - autoload :Connections, 'action_cable/server/connections' - autoload :Configuration, 'action_cable/server/configuration' + extend ActiveSupport::Autoload + + eager_autoload do + autoload :Base + autoload :Broadcasting + autoload :Connections + autoload :Configuration - autoload :Worker, 'action_cable/server/worker' - autoload :ClearDatabaseConnections, 'action_cable/server/worker/clear_database_connections' + autoload :Worker + autoload :ActiveRecordConnectionManagement, 'action_cable/server/worker/active_record_connection_management' + end end end diff --git a/lib/action_cable/server/base.rb b/lib/action_cable/server/base.rb index 43849928b9..f1585dc776 100644 --- a/lib/action_cable/server/base.rb +++ b/lib/action_cable/server/base.rb @@ -1,3 +1,5 @@ +require 'em-hiredis' + module ActionCable module Server # A singleton ActionCable::Server instance is available via ActionCable.server. It's used by the rack process that starts the cable server, but @@ -18,6 +20,7 @@ module ActionCable # Called by rack to setup the server. def call(env) + setup_heartbeat_timer config.connection_class.new(self, env).process end @@ -65,5 +68,7 @@ module ActionCable config.connection_class.identifiers end end + + ActiveSupport.run_load_hooks(:action_cable, Base.config) end end diff --git a/lib/action_cable/server/broadcasting.rb b/lib/action_cable/server/broadcasting.rb index 037b98951e..6e0fbae387 100644 --- a/lib/action_cable/server/broadcasting.rb +++ b/lib/action_cable/server/broadcasting.rb @@ -1,3 +1,5 @@ +require 'redis' + module ActionCable module Server # Broadcasting is how other parts of your application can send messages to the channel subscribers. As explained in Channel, most of the time, these @@ -44,9 +46,9 @@ module ActionCable def broadcast(message) server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message}" - server.broadcasting_redis.publish broadcasting, message.to_json + server.broadcasting_redis.publish broadcasting, ActiveSupport::JSON.encode(message) end end end end -end
\ No newline at end of file +end diff --git a/lib/action_cable/server/configuration.rb b/lib/action_cable/server/configuration.rb index 4808d170ff..89a0caddb4 100644 --- a/lib/action_cable/server/configuration.rb +++ b/lib/action_cable/server/configuration.rb @@ -1,3 +1,5 @@ +require 'active_support/core_ext/hash/indifferent_access' + module ActionCable module Server # An instance of this configuration object is available via ActionCable.server.config, which allows you to tweak the configuration points @@ -6,6 +8,7 @@ module ActionCable attr_accessor :logger, :log_tags attr_accessor :connection_class, :worker_pool_size attr_accessor :redis_path, :channels_path + attr_accessor :disable_request_forgery_protection, :allowed_request_origins def initialize @logger = Rails.logger @@ -16,6 +19,8 @@ module ActionCable @redis_path = Rails.root.join('config/redis/cable.yml') @channels_path = Rails.root.join('app/channels') + + @disable_request_forgery_protection = false end def log_to_stdout diff --git a/lib/action_cable/server/connections.rb b/lib/action_cable/server/connections.rb index 15d7c3c8c7..47dcea8c20 100644 --- a/lib/action_cable/server/connections.rb +++ b/lib/action_cable/server/connections.rb @@ -4,6 +4,8 @@ module ActionCable # you can't use this collection as an full list of all the connections established against your application. Use RemoteConnections for that. # As such, this is primarily for internal use. module Connections + BEAT_INTERVAL = 3 + def connections @connections ||= [] end @@ -16,9 +18,20 @@ module ActionCable connections.delete connection end + # WebSocket connection implementations differ on when they'll mark a connection as stale. We basically never want a connection to go stale, as you + # then can't rely on being able to receive and send to it. So there's a 3 second heartbeat running on all connections. If the beat fails, we automatically + # disconnect. + def setup_heartbeat_timer + EM.next_tick do + @heartbeat_timer ||= EventMachine.add_periodic_timer(BEAT_INTERVAL) do + EM.next_tick { connections.map(&:beat) } + end + end + end + def open_connections_statistics connections.map(&:statistics) end end end -end
\ No newline at end of file +end diff --git a/lib/action_cable/server/worker.rb b/lib/action_cable/server/worker.rb index d7823ecf93..e063b2a2e1 100644 --- a/lib/action_cable/server/worker.rb +++ b/lib/action_cable/server/worker.rb @@ -1,3 +1,6 @@ +require 'celluloid' +require 'active_support/callbacks' + module ActionCable module Server # Worker used by Server.send_async to do connection work in threads. Only for internal use. @@ -5,10 +8,13 @@ module ActionCable include ActiveSupport::Callbacks include Celluloid + attr_reader :connection define_callbacks :work - include ClearDatabaseConnections + include ActiveRecordConnectionManagement def invoke(receiver, method, *args) + @connection = receiver + run_callbacks :work do receiver.send method, *args end @@ -20,6 +26,8 @@ module ActionCable end def run_periodic_timer(channel, callback) + @connection = channel.connection + run_callbacks :work do callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback) end @@ -31,4 +39,4 @@ module ActionCable end end end -end
\ No newline at end of file +end diff --git a/lib/action_cable/server/worker/clear_database_connections.rb b/lib/action_cable/server/worker/active_record_connection_management.rb index 722d363a41..ecece4e270 100644 --- a/lib/action_cable/server/worker/clear_database_connections.rb +++ b/lib/action_cable/server/worker/active_record_connection_management.rb @@ -2,7 +2,7 @@ module ActionCable module Server class Worker # Clear active connections between units of work so the long-running channel or connection processes do not hoard connections. - module ClearDatabaseConnections + module ActiveRecordConnectionManagement extend ActiveSupport::Concern included do @@ -12,7 +12,7 @@ module ActionCable end def with_database_connections - yield + connection.logger.tag(ActiveRecord::Base.logger) { yield } ensure ActiveRecord::Base.clear_active_connections! end diff --git a/lib/assets/javascripts/cable.js.coffee b/lib/assets/javascripts/cable.coffee.erb index 0bd1757505..8498233c11 100644 --- a/lib/assets/javascripts/cable.js.coffee +++ b/lib/assets/javascripts/cable.coffee.erb @@ -2,7 +2,7 @@ #= require cable/consumer @Cable = - PING_IDENTIFIER: "_ping" + INTERNAL: <%= ActionCable::INTERNAL.to_json %> createConsumer: (url) -> new Cable.Consumer url diff --git a/lib/assets/javascripts/cable/connection.coffee b/lib/assets/javascripts/cable/connection.coffee new file mode 100644 index 0000000000..b2abe8dcb2 --- /dev/null +++ b/lib/assets/javascripts/cable/connection.coffee @@ -0,0 +1,84 @@ +# Encapsulate the cable connection held by the consumer. This is an internal class not intended for direct user manipulation. + +{message_types} = Cable.INTERNAL + +class Cable.Connection + @reopenDelay: 500 + + constructor: (@consumer) -> + @open() + + send: (data) -> + if @isOpen() + @webSocket.send(JSON.stringify(data)) + true + else + false + + open: => + if @webSocket and not @isState("closed") + throw new Error("Existing connection must be closed before opening") + else + @webSocket = new WebSocket(@consumer.url) + @installEventHandlers() + true + + close: -> + @webSocket?.close() + + reopen: -> + if @isState("closed") + @open() + else + try + @close() + finally + setTimeout(@open, @constructor.reopenDelay) + + isOpen: -> + @isState("open") + + # Private + + isState: (states...) -> + @getState() in states + + getState: -> + return state.toLowerCase() for state, value of WebSocket when value is @webSocket?.readyState + null + + installEventHandlers: -> + for eventName of @events + handler = @events[eventName].bind(this) + @webSocket["on#{eventName}"] = handler + return + + events: + message: (event) -> + {identifier, message, type} = JSON.parse(event.data) + + switch type + when message_types.confirmation + @consumer.subscriptions.notify(identifier, "connected") + when message_types.rejection + @consumer.subscriptions.reject(identifier) + else + @consumer.subscriptions.notify(identifier, "received", message) + + open: -> + @disconnected = false + @consumer.subscriptions.reload() + + close: -> + @disconnect() + + error: -> + @disconnect() + + disconnect: -> + return if @disconnected + @disconnected = true + @consumer.subscriptions.notifyAll("disconnected") + + toJSON: -> + state: @getState() diff --git a/lib/assets/javascripts/cable/connection.js.coffee b/lib/assets/javascripts/cable/connection.js.coffee deleted file mode 100644 index 464f0c1ff7..0000000000 --- a/lib/assets/javascripts/cable/connection.js.coffee +++ /dev/null @@ -1,77 +0,0 @@ -# Encapsulate the cable connection held by the consumer. This is an internal class not intended for direct user manipulation. -class Cable.Connection - constructor: (@consumer) -> - @open() - - send: (data) -> - if @isOpen() - @webSocket.send(JSON.stringify(data)) - true - else - false - - open: -> - return if @isState("open", "connecting") - @webSocket = new WebSocket(@consumer.url) - @installEventHandlers() - - close: -> - return if @isState("closed", "closing") - @webSocket?.close() - - reopen: -> - if @isOpen() - @closeSilently => @open() - else - @open() - - isOpen: -> - @isState("open") - - # Private - - isState: (states...) -> - @getState() in states - - getState: -> - return state.toLowerCase() for state, value of WebSocket when value is @webSocket?.readyState - null - - closeSilently: (callback = ->) -> - @uninstallEventHandlers() - @installEventHandler("close", callback) - @installEventHandler("error", callback) - try - @webSocket.close() - finally - @uninstallEventHandlers() - - installEventHandlers: -> - for eventName of @events - @installEventHandler(eventName) - - installEventHandler: (eventName, handler) -> - handler ?= @events[eventName].bind(this) - @webSocket.addEventListener(eventName, handler) - - uninstallEventHandlers: -> - for eventName of @events - @webSocket.removeEventListener(eventName) - - events: - message: (event) -> - {identifier, message} = JSON.parse(event.data) - @consumer.subscriptions.notify(identifier, "received", message) - - open: -> - @consumer.subscriptions.reload() - - close: -> - @consumer.subscriptions.notifyAll("disconnected") - - error: -> - @consumer.subscriptions.notifyAll("disconnected") - @closeSilently() - - toJSON: -> - state: @getState() diff --git a/lib/assets/javascripts/cable/connection_monitor.js.coffee b/lib/assets/javascripts/cable/connection_monitor.coffee index cac65d9043..435efcc361 100644 --- a/lib/assets/javascripts/cable/connection_monitor.js.coffee +++ b/lib/assets/javascripts/cable/connection_monitor.coffee @@ -1,15 +1,13 @@ # Responsible for ensuring the cable connection is in good health by validating the heartbeat pings sent from the server, and attempting # revival reconnections if things go astray. Internal class, not intended for direct user manipulation. class Cable.ConnectionMonitor - identifier: Cable.PING_IDENTIFIER - - pollInterval: - min: 2 + @pollInterval: + min: 3 max: 30 - staleThreshold: - startedAt: 4 - pingedAt: 8 + @staleThreshold: 6 # Server::Connections::BEAT_INTERVAL * 2 (missed two pings) + + identifier: Cable.INTERNAL.identifiers.ping constructor: (@consumer) -> @consumer.subscriptions.add(this) @@ -18,6 +16,10 @@ class Cable.ConnectionMonitor connected: -> @reset() @pingedAt = now() + delete @disconnectedAt + + disconnected: -> + @disconnectedAt = now() received: -> @pingedAt = now() @@ -30,9 +32,11 @@ class Cable.ConnectionMonitor delete @stoppedAt @startedAt = now() @poll() + document.addEventListener("visibilitychange", @visibilityDidChange) stop: -> @stoppedAt = now() + document.removeEventListener("visibilitychange", @visibilityDidChange) poll: -> setTimeout => @@ -42,20 +46,28 @@ class Cable.ConnectionMonitor , @getInterval() getInterval: -> - {min, max} = @pollInterval - interval = 4 * Math.log(@reconnectAttempts + 1) + {min, max} = @constructor.pollInterval + interval = 5 * Math.log(@reconnectAttempts + 1) clamp(interval, min, max) * 1000 reconnectIfStale: -> if @connectionIsStale() - @reconnectAttempts += 1 - @consumer.connection.reopen() + @reconnectAttempts++ + unless @disconnectedRecently() + @consumer.connection.reopen() connectionIsStale: -> - if @pingedAt - secondsSince(@pingedAt) > @staleThreshold.pingedAt - else - secondsSince(@startedAt) > @staleThreshold.startedAt + secondsSince(@pingedAt ? @startedAt) > @constructor.staleThreshold + + disconnectedRecently: -> + @disconnectedAt and secondsSince(@disconnectedAt) < @constructor.staleThreshold + + visibilityDidChange: => + if document.visibilityState is "visible" + setTimeout => + if @connectionIsStale() or not @consumer.connection.isOpen() + @consumer.connection.reopen() + , 200 toJSON: -> interval = @getInterval() diff --git a/lib/assets/javascripts/cable/consumer.js.coffee b/lib/assets/javascripts/cable/consumer.coffee index 05a7398e79..05a7398e79 100644 --- a/lib/assets/javascripts/cable/consumer.js.coffee +++ b/lib/assets/javascripts/cable/consumer.coffee diff --git a/lib/assets/javascripts/cable/subscription.js.coffee b/lib/assets/javascripts/cable/subscription.coffee index 5b024d4e15..5b024d4e15 100644 --- a/lib/assets/javascripts/cable/subscription.js.coffee +++ b/lib/assets/javascripts/cable/subscription.coffee diff --git a/lib/assets/javascripts/cable/subscriptions.js.coffee b/lib/assets/javascripts/cable/subscriptions.coffee index fe6975c870..7955565f06 100644 --- a/lib/assets/javascripts/cable/subscriptions.js.coffee +++ b/lib/assets/javascripts/cable/subscriptions.coffee @@ -9,6 +9,7 @@ class Cable.Subscriptions constructor: (@consumer) -> @subscriptions = [] + @history = [] create: (channelName, mixin) -> channel = channelName @@ -20,22 +21,29 @@ class Cable.Subscriptions add: (subscription) -> @subscriptions.push(subscription) @notify(subscription, "initialized") - if @sendCommand(subscription, "subscribe") - @notify(subscription, "connected") - - reload: -> - for subscription in @subscriptions - if @sendCommand(subscription, "subscribe") - @notify(subscription, "connected") + @sendCommand(subscription, "subscribe") remove: (subscription) -> - @subscriptions = (s for s in @subscriptions when s isnt subscription) + @forget(subscription) + unless @findAll(subscription.identifier).length @sendCommand(subscription, "unsubscribe") + reject: (identifier) -> + for subscription in @findAll(identifier) + @forget(subscription) + @notify(subscription, "rejected") + + forget: (subscription) -> + @subscriptions = (s for s in @subscriptions when s isnt subscription) + findAll: (identifier) -> s for s in @subscriptions when s.identifier is identifier + reload: -> + for subscription in @subscriptions + @sendCommand(subscription, "subscribe") + notifyAll: (callbackName, args...) -> for subscription in @subscriptions @notify(subscription, callbackName, args...) @@ -49,12 +57,22 @@ class Cable.Subscriptions for subscription in subscriptions subscription[callbackName]?(args...) + if callbackName in ["initialized", "connected", "disconnected", "rejected"] + {identifier} = subscription + @record(notification: {identifier, callbackName, args}) + sendCommand: (subscription, command) -> {identifier} = subscription - if identifier is Cable.PING_IDENTIFIER + if identifier is Cable.INTERNAL.identifiers.ping @consumer.connection.isOpen() else @consumer.send({command, identifier}) + record: (data) -> + data.time = new Date() + @history = @history.slice(-19) + @history.push(data) + toJSON: -> - subscription.identifier for subscription in @subscriptions + history: @history + identifiers: (subscription.identifier for subscription in @subscriptions) |