aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable/connection
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable/lib/action_cable/connection')
-rw-r--r--actioncable/lib/action_cable/connection/authorization.rb13
-rw-r--r--actioncable/lib/action_cable/connection/base.rb214
-rw-r--r--actioncable/lib/action_cable/connection/identification.rb46
-rw-r--r--actioncable/lib/action_cable/connection/internal_channel.rb45
-rw-r--r--actioncable/lib/action_cable/connection/message_buffer.rb54
-rw-r--r--actioncable/lib/action_cable/connection/subscriptions.rb75
-rw-r--r--actioncable/lib/action_cable/connection/tagged_logger_proxy.rb40
-rw-r--r--actioncable/lib/action_cable/connection/web_socket.rb29
8 files changed, 516 insertions, 0 deletions
diff --git a/actioncable/lib/action_cable/connection/authorization.rb b/actioncable/lib/action_cable/connection/authorization.rb
new file mode 100644
index 0000000000..070a70e4e2
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/authorization.rb
@@ -0,0 +1,13 @@
+module ActionCable
+ module Connection
+ module Authorization
+ class UnauthorizedError < StandardError; end
+
+ private
+ def reject_unauthorized_connection
+ logger.error "An unauthorized connection attempt was rejected"
+ raise UnauthorizedError
+ end
+ end
+ end
+end \ No newline at end of file
diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb
new file mode 100644
index 0000000000..977856d656
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/base.rb
@@ -0,0 +1,214 @@
+require 'action_dispatch'
+
+module ActionCable
+ module Connection
+ # For every WebSocket the cable server is accepting, a Connection object will be instantiated. This instance becomes the parent
+ # of all the channel subscriptions that are created from there on. Incoming messages are then routed to these channel subscriptions
+ # based on an identifier sent by the cable consumer. The Connection itself does not deal with any specific application logic beyond
+ # authentication and authorization.
+ #
+ # Here's a basic example:
+ #
+ # module ApplicationCable
+ # class Connection < ActionCable::Connection::Base
+ # identified_by :current_user
+ #
+ # def connect
+ # self.current_user = find_verified_user
+ # logger.add_tags current_user.name
+ # end
+ #
+ # def disconnect
+ # # Any cleanup work needed when the cable connection is cut.
+ # end
+ #
+ # protected
+ # def find_verified_user
+ # if current_user = User.find_by_identity cookies.signed[:identity_id]
+ # current_user
+ # else
+ # reject_unauthorized_connection
+ # end
+ # end
+ # end
+ # end
+ #
+ # First, we declare that this connection can be identified by its current_user. This allows us later to be able to find all connections
+ # established for that current_user (and potentially disconnect them if the user was removed from an account). You can declare as many
+ # identification indexes as you like. Declaring an identification means that a attr_accessor is automatically set for that key.
+ #
+ # Second, we rely on the fact that the WebSocket connection is established with the cookies from the domain being sent along. This makes
+ # it easy to use signed cookies that were set when logging in via a web interface to authorize the WebSocket connection.
+ #
+ # Finally, we add a tag to the connection-specific logger with name of the current user to easily distinguish their messages in the log.
+ #
+ # Pretty simple, eh?
+ class Base
+ include Identification
+ include InternalChannel
+ include Authorization
+
+ attr_reader :server, :env, :subscriptions, :logger
+ delegate :worker_pool, :pubsub, to: :server
+
+ def initialize(server, env)
+ @server, @env = server, env
+
+ @logger = new_tagged_logger
+
+ @websocket = ActionCable::Connection::WebSocket.new(env)
+ @subscriptions = ActionCable::Connection::Subscriptions.new(self)
+ @message_buffer = ActionCable::Connection::MessageBuffer.new(self)
+
+ @_internal_redis_subscriptions = nil
+ @started_at = Time.now
+ end
+
+ # Called by the server when a new WebSocket connection is established. This configures the callbacks intended for overwriting by the user.
+ # This method should not be called directly. Rely on the #connect (and #disconnect) callback instead.
+ def process
+ logger.info started_request_message
+
+ if websocket.possible? && allow_request_origin?
+ websocket.on(:open) { |event| send_async :on_open }
+ websocket.on(:message) { |event| on_message event.data }
+ websocket.on(:close) { |event| send_async :on_close }
+
+ respond_to_successful_request
+ else
+ respond_to_invalid_request
+ end
+ end
+
+ # Data received over the cable is handled by this method. It's expected that everything inbound is JSON encoded.
+ # The data is routed to the proper channel that the connection has subscribed to.
+ def receive(data_in_json)
+ if websocket.alive?
+ subscriptions.execute_command ActiveSupport::JSON.decode(data_in_json)
+ else
+ logger.error "Received data without a live WebSocket (#{data_in_json.inspect})"
+ end
+ end
+
+ # Send raw data straight back down the WebSocket. This is not intended to be called directly. Use the #transmit available on the
+ # Channel instead, as that'll automatically address the correct subscriber and wrap the message in JSON.
+ def transmit(data)
+ websocket.transmit data
+ end
+
+ # Close the WebSocket connection.
+ def close
+ websocket.close
+ end
+
+ # Invoke a method on the connection asynchronously through the pool of thread workers.
+ def send_async(method, *arguments)
+ worker_pool.async.invoke(self, method, *arguments)
+ end
+
+ # Return a basic hash of statistics for the connection keyed with `identifier`, `started_at`, and `subscriptions`.
+ # This can be returned by a health check against the connection.
+ def statistics
+ {
+ identifier: connection_identifier,
+ started_at: @started_at,
+ subscriptions: subscriptions.identifiers,
+ request_id: @env['action_dispatch.request_id']
+ }
+ end
+
+ def beat
+ transmit ActiveSupport::JSON.encode(identifier: ActionCable::INTERNAL[:identifiers][:ping], message: Time.now.to_i)
+ end
+
+ protected
+ # The request that initiated the WebSocket connection is available here. This gives access to the environment, cookies, etc.
+ def request
+ @request ||= begin
+ environment = Rails.application.env_config.merge(env) if defined?(Rails.application) && Rails.application
+ ActionDispatch::Request.new(environment || env)
+ end
+ end
+
+ # The cookies of the request that initiated the WebSocket connection. Useful for performing authorization checks.
+ def cookies
+ request.cookie_jar
+ end
+
+ attr_reader :websocket
+ attr_reader :message_buffer
+
+ private
+ def on_open
+ connect if respond_to?(:connect)
+ subscribe_to_internal_channel
+ beat
+
+ message_buffer.process!
+ server.add_connection(self)
+ rescue ActionCable::Connection::Authorization::UnauthorizedError
+ respond_to_invalid_request
+ end
+
+ def on_message(message)
+ message_buffer.append message
+ end
+
+ def on_close
+ logger.info finished_request_message
+
+ server.remove_connection(self)
+
+ subscriptions.unsubscribe_from_all
+ unsubscribe_from_internal_channel
+
+ disconnect if respond_to?(:disconnect)
+ end
+
+ def allow_request_origin?
+ return true if server.config.disable_request_forgery_protection
+
+ if Array(server.config.allowed_request_origins).any? { |allowed_origin| allowed_origin === env['HTTP_ORIGIN'] }
+ true
+ else
+ logger.error("Request origin not allowed: #{env['HTTP_ORIGIN']}")
+ false
+ end
+ end
+
+ def respond_to_successful_request
+ websocket.rack_response
+ end
+
+ def respond_to_invalid_request
+ close if websocket.alive?
+
+ logger.info finished_request_message
+ [ 404, { 'Content-Type' => 'text/plain' }, [ 'Page not found' ] ]
+ end
+
+ # Tags are declared in the server but computed in the connection. This allows us per-connection tailored tags.
+ def new_tagged_logger
+ TaggedLoggerProxy.new server.logger,
+ tags: server.config.log_tags.map { |tag| tag.respond_to?(:call) ? tag.call(request) : tag.to_s.camelize }
+ end
+
+ def started_request_message
+ 'Started %s "%s"%s for %s at %s' % [
+ request.request_method,
+ request.filtered_path,
+ websocket.possible? ? ' [WebSocket]' : '',
+ request.ip,
+ Time.now.to_s ]
+ end
+
+ def finished_request_message
+ 'Finished "%s"%s for %s at %s' % [
+ request.filtered_path,
+ websocket.possible? ? ' [WebSocket]' : '',
+ request.ip,
+ Time.now.to_s ]
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/connection/identification.rb b/actioncable/lib/action_cable/connection/identification.rb
new file mode 100644
index 0000000000..885ff3f102
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/identification.rb
@@ -0,0 +1,46 @@
+require 'set'
+
+module ActionCable
+ module Connection
+ module Identification
+ extend ActiveSupport::Concern
+
+ included do
+ class_attribute :identifiers
+ self.identifiers = Set.new
+ end
+
+ class_methods do
+ # Mark a key as being a connection identifier index that can then be used to find the specific connection again later.
+ # Common identifiers are current_user and current_account, but could be anything really.
+ #
+ # Note that anything marked as an identifier will automatically create a delegate by the same name on any
+ # channel instances created off the connection.
+ def identified_by(*identifiers)
+ Array(identifiers).each { |identifier| attr_accessor identifier }
+ self.identifiers += identifiers
+ end
+ end
+
+ # Return a single connection identifier that combines the value of all the registered identifiers into a single gid.
+ def connection_identifier
+ unless defined? @connection_identifier
+ @connection_identifier = connection_gid identifiers.map { |id| instance_variable_get("@#{id}") }.compact
+ end
+
+ @connection_identifier
+ end
+
+ private
+ def connection_gid(ids)
+ ids.map do |o|
+ if o.respond_to? :to_gid_param
+ o.to_gid_param
+ else
+ o.to_s
+ end
+ end.sort.join(":")
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/connection/internal_channel.rb b/actioncable/lib/action_cable/connection/internal_channel.rb
new file mode 100644
index 0000000000..c065a24ab7
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/internal_channel.rb
@@ -0,0 +1,45 @@
+module ActionCable
+ module Connection
+ # Makes it possible for the RemoteConnection to disconnect a specific connection.
+ module InternalChannel
+ extend ActiveSupport::Concern
+
+ private
+ def internal_redis_channel
+ "action_cable/#{connection_identifier}"
+ end
+
+ def subscribe_to_internal_channel
+ if connection_identifier.present?
+ callback = -> (message) { process_internal_message(message) }
+ @_internal_redis_subscriptions ||= []
+ @_internal_redis_subscriptions << [ internal_redis_channel, callback ]
+
+ EM.next_tick { pubsub.subscribe(internal_redis_channel, &callback) }
+ logger.info "Registered connection (#{connection_identifier})"
+ end
+ end
+
+ def unsubscribe_from_internal_channel
+ if @_internal_redis_subscriptions.present?
+ @_internal_redis_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe_proc(channel, callback) } }
+ end
+ end
+
+ def process_internal_message(message)
+ message = ActiveSupport::JSON.decode(message)
+
+ case message['type']
+ when 'disconnect'
+ logger.info "Removing connection (#{connection_identifier})"
+ websocket.close
+ end
+ rescue Exception => e
+ logger.error "There was an exception - #{e.class}(#{e.message})"
+ logger.error e.backtrace.join("\n")
+
+ close
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/connection/message_buffer.rb b/actioncable/lib/action_cable/connection/message_buffer.rb
new file mode 100644
index 0000000000..2f65a1e84a
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/message_buffer.rb
@@ -0,0 +1,54 @@
+module ActionCable
+ module Connection
+ # Allows us to buffer messages received from the WebSocket before the Connection has been fully initialized and is ready to receive them.
+ # Entirely internal operation and should not be used directly by the user.
+ class MessageBuffer
+ def initialize(connection)
+ @connection = connection
+ @buffered_messages = []
+ end
+
+ def append(message)
+ if valid? message
+ if processing?
+ receive message
+ else
+ buffer message
+ end
+ else
+ connection.logger.error "Couldn't handle non-string message: #{message.class}"
+ end
+ end
+
+ def processing?
+ @processing
+ end
+
+ def process!
+ @processing = true
+ receive_buffered_messages
+ end
+
+ protected
+ attr_reader :connection
+ attr_accessor :buffered_messages
+
+ private
+ def valid?(message)
+ message.is_a?(String)
+ end
+
+ def receive(message)
+ connection.send_async :receive, message
+ end
+
+ def buffer(message)
+ buffered_messages << message
+ end
+
+ def receive_buffered_messages
+ receive buffered_messages.shift until buffered_messages.empty?
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/connection/subscriptions.rb b/actioncable/lib/action_cable/connection/subscriptions.rb
new file mode 100644
index 0000000000..d7f95e6a62
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/subscriptions.rb
@@ -0,0 +1,75 @@
+require 'active_support/core_ext/hash/indifferent_access'
+
+module ActionCable
+ module Connection
+ # Collection class for all the channel subscriptions established on a given connection. Responsible for routing incoming commands that arrive on
+ # the connection to the proper channel. Should not be used directly by the user.
+ class Subscriptions
+ def initialize(connection)
+ @connection = connection
+ @subscriptions = {}
+ end
+
+ def execute_command(data)
+ case data['command']
+ when 'subscribe' then add data
+ when 'unsubscribe' then remove data
+ when 'message' then perform_action data
+ else
+ logger.error "Received unrecognized command in #{data.inspect}"
+ end
+ rescue Exception => e
+ logger.error "Could not execute command from #{data.inspect}) [#{e.class} - #{e.message}]: #{e.backtrace.first(5).join(" | ")}"
+ end
+
+ def add(data)
+ id_key = data['identifier']
+ id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access
+
+ subscription_klass = connection.server.channel_classes[id_options[:channel]]
+
+ if subscription_klass
+ subscriptions[id_key] ||= subscription_klass.new(connection, id_key, id_options)
+ else
+ logger.error "Subscription class not found (#{data.inspect})"
+ end
+ end
+
+ def remove(data)
+ logger.info "Unsubscribing from channel: #{data['identifier']}"
+ remove_subscription subscriptions[data['identifier']]
+ end
+
+ def remove_subscription(subscription)
+ subscription.unsubscribe_from_channel
+ subscriptions.delete(subscription.identifier)
+ end
+
+ def perform_action(data)
+ find(data).perform_action ActiveSupport::JSON.decode(data['data'])
+ end
+
+ def identifiers
+ subscriptions.keys
+ end
+
+ def unsubscribe_from_all
+ subscriptions.each { |id, channel| channel.unsubscribe_from_channel }
+ end
+
+ protected
+ attr_reader :connection, :subscriptions
+
+ private
+ delegate :logger, to: :connection
+
+ def find(data)
+ if subscription = subscriptions[data['identifier']]
+ subscription
+ else
+ raise "Unable to find subscription with identifier: #{data['identifier']}"
+ end
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/connection/tagged_logger_proxy.rb b/actioncable/lib/action_cable/connection/tagged_logger_proxy.rb
new file mode 100644
index 0000000000..41afa9680a
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/tagged_logger_proxy.rb
@@ -0,0 +1,40 @@
+module ActionCable
+ module Connection
+ # Allows the use of per-connection tags against the server logger. This wouldn't work using the traditional
+ # <tt>ActiveSupport::TaggedLogging</tt> enhanced Rails.logger, as that logger will reset the tags between requests.
+ # The connection is long-lived, so it needs its own set of tags for its independent duration.
+ class TaggedLoggerProxy
+ attr_reader :tags
+
+ def initialize(logger, tags:)
+ @logger = logger
+ @tags = tags.flatten
+ end
+
+ def add_tags(*tags)
+ @tags += tags.flatten
+ @tags = @tags.uniq
+ end
+
+ def tag(logger)
+ if logger.respond_to?(:tagged)
+ current_tags = tags - logger.formatter.current_tags
+ logger.tagged(*current_tags) { yield }
+ else
+ yield
+ end
+ end
+
+ %i( debug info warn error fatal unknown ).each do |severity|
+ define_method(severity) do |message|
+ log severity, message
+ end
+ end
+
+ protected
+ def log(type, message)
+ tag(@logger) { @logger.send type, message }
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/connection/web_socket.rb b/actioncable/lib/action_cable/connection/web_socket.rb
new file mode 100644
index 0000000000..670d5690ae
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/web_socket.rb
@@ -0,0 +1,29 @@
+require 'faye/websocket'
+
+module ActionCable
+ module Connection
+ # Decorate the Faye::WebSocket with helpers we need.
+ class WebSocket
+ delegate :rack_response, :close, :on, to: :websocket
+
+ def initialize(env)
+ @websocket = Faye::WebSocket.websocket?(env) ? Faye::WebSocket.new(env) : nil
+ end
+
+ def possible?
+ websocket
+ end
+
+ def alive?
+ websocket && websocket.ready_state == Faye::WebSocket::API::OPEN
+ end
+
+ def transmit(data)
+ websocket.send data
+ end
+
+ protected
+ attr_reader :websocket
+ end
+ end
+end