diff options
Diffstat (limited to 'lib/action_cable/connection')
-rw-r--r-- | lib/action_cable/connection/base.rb | 62 | ||||
-rw-r--r-- | lib/action_cable/connection/heartbeat.rb | 30 | ||||
-rw-r--r-- | lib/action_cable/connection/identification.rb | 16 | ||||
-rw-r--r-- | lib/action_cable/connection/internal_channel.rb | 4 | ||||
-rw-r--r-- | lib/action_cable/connection/message_buffer.rb | 4 | ||||
-rw-r--r-- | lib/action_cable/connection/subscriptions.rb | 2 | ||||
-rw-r--r-- | lib/action_cable/connection/tagged_logger_proxy.rb | 5 | ||||
-rw-r--r-- | lib/action_cable/connection/web_socket.rb | 2 |
8 files changed, 65 insertions, 60 deletions
diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index 84393845c4..a629f29643 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -1,8 +1,8 @@ -require 'action_dispatch/http/request' +require 'action_dispatch' module ActionCable module Connection - # For every websocket the cable server is accepting, a Connection object will be instantiated. This instance becomes the parent + # For every WebSocket the cable server is accepting, a Connection object will be instantiated. This instance becomes the parent # of all the channel subscriptions that are created from there on. Incoming messages are then routed to these channel subscriptions # based on an identifier sent by the cable consumer. The Connection itself does not deal with any specific application logic beyond # authentication and authorization. @@ -37,8 +37,8 @@ module ActionCable # established for that current_user (and potentially disconnect them if the user was removed from an account). You can declare as many # identification indexes as you like. Declaring an identification means that a attr_accessor is automatically set for that key. # - # Second, we rely on the fact that the websocket connection is established with the cookies from the domain being sent along. This makes - # it easy to use signed cookies that were set when logging in via a web interface to authorize the websocket connection. + # Second, we rely on the fact that the WebSocket connection is established with the cookies from the domain being sent along. This makes + # it easy to use signed cookies that were set when logging in via a web interface to authorize the WebSocket connection. # # Finally, we add a tag to the connection-specific logger with name of the current user to easily distinguish their messages in the log. # @@ -59,19 +59,18 @@ module ActionCable @logger = new_tagged_logger || server.logger @websocket = ActionCable::Connection::WebSocket.new(env) - @heartbeat = ActionCable::Connection::Heartbeat.new(self) @subscriptions = ActionCable::Connection::Subscriptions.new(self) @message_buffer = ActionCable::Connection::MessageBuffer.new(self) @started_at = Time.now end - # Called by the server when a new websocket connection is established. This configures the callbacks intended for overwriting by the user. + # Called by the server when a new WebSocket connection is established. This configures the callbacks intended for overwriting by the user. # This method should not be called directly. Rely on the #connect (and #disconnect) callback instead. def process logger.info started_request_message - if websocket.possible? + if websocket.possible? && allow_request_origin? websocket.on(:open) { |event| send_async :on_open } websocket.on(:message) { |event| on_message event.data } websocket.on(:close) { |event| send_async :on_close } @@ -88,19 +87,18 @@ module ActionCable if websocket.alive? subscriptions.execute_command ActiveSupport::JSON.decode(data_in_json) else - logger.error "Received data without a live websocket (#{data.inspect})" + logger.error "Received data without a live WebSocket (#{data_in_json.inspect})" end end - # Send raw data straight back down the websocket. This is not intended to be called directly. Use the #transmit available on the + # Send raw data straight back down the WebSocket. This is not intended to be called directly. Use the #transmit available on the # Channel instead, as that'll automatically address the correct subscriber and wrap the message in JSON. def transmit(data) websocket.transmit data end - # Close the websocket connection. + # Close the WebSocket connection. def close - logger.error "Closing connection" websocket.close end @@ -112,12 +110,21 @@ module ActionCable # Return a basic hash of statistics for the connection keyed with `identifier`, `started_at`, and `subscriptions`. # This can be returned by a health check against the connection. def statistics - { identifier: connection_identifier, started_at: @started_at, subscriptions: subscriptions.identifiers } + { + identifier: connection_identifier, + started_at: @started_at, + subscriptions: subscriptions.identifiers, + request_id: @env['action_dispatch.request_id'] + } + end + + def beat + transmit ActiveSupport::JSON.encode(identifier: '_ping', message: Time.now.to_i) end protected - # The request that initiated the websocket connection is available here. This gives access to the environment, cookies, etc. + # The request that initiated the WebSocket connection is available here. This gives access to the environment, cookies, etc. def request @request ||= begin environment = Rails.application.env_config.merge(env) if defined?(Rails.application) && Rails.application @@ -125,7 +132,7 @@ module ActionCable end end - # The cookies of the request that initiated the websocket connection. Useful for performing authorization checks. + # The cookies of the request that initiated the WebSocket connection. Useful for performing authorization checks. def cookies request.cookie_jar end @@ -133,16 +140,15 @@ module ActionCable private attr_reader :websocket - attr_reader :heartbeat, :subscriptions, :message_buffer + attr_reader :subscriptions, :message_buffer def on_open - server.add_connection(self) - connect if respond_to?(:connect) subscribe_to_internal_channel - heartbeat.start + beat message_buffer.process! + server.add_connection(self) rescue ActionCable::Connection::Authorization::UnauthorizedError respond_to_invalid_request close @@ -159,12 +165,22 @@ module ActionCable subscriptions.unsubscribe_from_all unsubscribe_from_internal_channel - heartbeat.stop disconnect if respond_to?(:disconnect) end + def allow_request_origin? + return true if server.config.disable_request_forgery_protection + + if Array(server.config.allowed_request_origins).include? env['HTTP_ORIGIN'] + true + else + logger.error("Request origin not allowed: #{env['HTTP_ORIGIN']}") + false + end + end + def respond_to_successful_request websocket.rack_response end @@ -187,17 +203,17 @@ module ActionCable 'Started %s "%s"%s for %s at %s' % [ request.request_method, request.filtered_path, - websocket.possible? ? ' [Websocket]' : '', + websocket.possible? ? ' [WebSocket]' : '', request.ip, - Time.now.to_default_s ] + Time.now.to_s ] end def finished_request_message 'Finished "%s"%s for %s at %s' % [ request.filtered_path, - websocket.possible? ? ' [Websocket]' : '', + websocket.possible? ? ' [WebSocket]' : '', request.ip, - Time.now.to_default_s ] + Time.now.to_s ] end end end diff --git a/lib/action_cable/connection/heartbeat.rb b/lib/action_cable/connection/heartbeat.rb deleted file mode 100644 index 2918938ba5..0000000000 --- a/lib/action_cable/connection/heartbeat.rb +++ /dev/null @@ -1,30 +0,0 @@ -module ActionCable - module Connection - # Websocket connection implementations differ on when they'll mark a connection as stale. We basically never want a connection to go stale, as you - # then can't rely on being able to receive and send to it. So there's a 3 second heartbeat running on all connections. If the beat fails, we automatically - # disconnect. - class Heartbeat - BEAT_INTERVAL = 3 - - def initialize(connection) - @connection = connection - end - - def start - beat - @timer = EventMachine.add_periodic_timer(BEAT_INTERVAL) { beat } - end - - def stop - EventMachine.cancel_timer(@timer) if @timer - end - - private - attr_reader :connection - - def beat - connection.transmit({ identifier: '_ping', message: Time.now.to_i }.to_json) - end - end - end -end diff --git a/lib/action_cable/connection/identification.rb b/lib/action_cable/connection/identification.rb index 701e6885ad..431493aa70 100644 --- a/lib/action_cable/connection/identification.rb +++ b/lib/action_cable/connection/identification.rb @@ -1,3 +1,5 @@ +require 'set' + module ActionCable module Connection module Identification @@ -22,12 +24,22 @@ module ActionCable # Return a single connection identifier that combines the value of all the registered identifiers into a single gid. def connection_identifier - @connection_identifier ||= connection_gid identifiers.map { |id| instance_variable_get("@#{id}") }.compact + unless defined? @connection_identifier + @connection_identifier = connection_gid identifiers.map { |id| instance_variable_get("@#{id}") }.compact + end + + @connection_identifier end private def connection_gid(ids) - ids.map { |o| (o.try(:to_global_id) || o).to_s }.sort.join(":") + ids.map do |o| + if o.respond_to? :to_global_id + o.to_global_id + else + o.to_s + end + end.sort.join(":") end end end diff --git a/lib/action_cable/connection/internal_channel.rb b/lib/action_cable/connection/internal_channel.rb index b00e21824c..c065a24ab7 100644 --- a/lib/action_cable/connection/internal_channel.rb +++ b/lib/action_cable/connection/internal_channel.rb @@ -15,14 +15,14 @@ module ActionCable @_internal_redis_subscriptions ||= [] @_internal_redis_subscriptions << [ internal_redis_channel, callback ] - pubsub.subscribe(internal_redis_channel, &callback) + EM.next_tick { pubsub.subscribe(internal_redis_channel, &callback) } logger.info "Registered connection (#{connection_identifier})" end end def unsubscribe_from_internal_channel if @_internal_redis_subscriptions.present? - @_internal_redis_subscriptions.each { |channel, callback| pubsub.unsubscribe_proc(channel, callback) } + @_internal_redis_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe_proc(channel, callback) } } end end diff --git a/lib/action_cable/connection/message_buffer.rb b/lib/action_cable/connection/message_buffer.rb index d5a8e9eba9..25cff75b41 100644 --- a/lib/action_cable/connection/message_buffer.rb +++ b/lib/action_cable/connection/message_buffer.rb @@ -1,6 +1,6 @@ module ActionCable module Connection - # Allows us to buffer messages received from the websocket before the Connection has been fully initialized and is ready to receive them. + # Allows us to buffer messages received from the WebSocket before the Connection has been fully initialized and is ready to receive them. # Entirely internal operation and should not be used directly by the user. class MessageBuffer def initialize(connection) @@ -50,4 +50,4 @@ module ActionCable end end end -end
\ No newline at end of file +end diff --git a/lib/action_cable/connection/subscriptions.rb b/lib/action_cable/connection/subscriptions.rb index 69e3f60706..229be2a316 100644 --- a/lib/action_cable/connection/subscriptions.rb +++ b/lib/action_cable/connection/subscriptions.rb @@ -1,3 +1,5 @@ +require 'active_support/core_ext/hash/indifferent_access' + module ActionCable module Connection # Collection class for all the channel subscriptions established on a given connection. Responsible for routing incoming commands that arrive on diff --git a/lib/action_cable/connection/tagged_logger_proxy.rb b/lib/action_cable/connection/tagged_logger_proxy.rb index 854f613f1c..34063c1d42 100644 --- a/lib/action_cable/connection/tagged_logger_proxy.rb +++ b/lib/action_cable/connection/tagged_logger_proxy.rb @@ -4,6 +4,8 @@ module ActionCable # ActiveSupport::TaggedLogging-enhanced Rails.logger, as that logger will reset the tags between requests. # The connection is long-lived, so it needs its own set of tags for its independent duration. class TaggedLoggerProxy + attr_reader :tags + def initialize(logger, tags:) @logger = logger @tags = tags.flatten @@ -22,7 +24,8 @@ module ActionCable protected def log(type, message) - @logger.tagged(*@tags) { @logger.send type, message } + current_tags = tags - @logger.formatter.current_tags + @logger.tagged(*current_tags) { @logger.send type, message } end end end diff --git a/lib/action_cable/connection/web_socket.rb b/lib/action_cable/connection/web_socket.rb index 135a28cfe4..169b683b8c 100644 --- a/lib/action_cable/connection/web_socket.rb +++ b/lib/action_cable/connection/web_socket.rb @@ -1,3 +1,5 @@ +require 'faye/websocket' + module ActionCable module Connection # Decorate the Faye::WebSocket with helpers we need. |