aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable/connection
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable/lib/action_cable/connection')
-rw-r--r--actioncable/lib/action_cable/connection/authorization.rb15
-rw-r--r--actioncable/lib/action_cable/connection/base.rb262
-rw-r--r--actioncable/lib/action_cable/connection/client_socket.rb157
-rw-r--r--actioncable/lib/action_cable/connection/identification.rb47
-rw-r--r--actioncable/lib/action_cable/connection/internal_channel.rb45
-rw-r--r--actioncable/lib/action_cable/connection/message_buffer.rb54
-rw-r--r--actioncable/lib/action_cable/connection/stream.rb117
-rw-r--r--actioncable/lib/action_cable/connection/stream_event_loop.rb136
-rw-r--r--actioncable/lib/action_cable/connection/subscriptions.rb79
-rw-r--r--actioncable/lib/action_cable/connection/tagged_logger_proxy.rb42
-rw-r--r--actioncable/lib/action_cable/connection/web_socket.rb41
11 files changed, 995 insertions, 0 deletions
diff --git a/actioncable/lib/action_cable/connection/authorization.rb b/actioncable/lib/action_cable/connection/authorization.rb
new file mode 100644
index 0000000000..aef3386f71
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/authorization.rb
@@ -0,0 +1,15 @@
+# frozen_string_literal: true
+
+module ActionCable
+ module Connection
+ module Authorization
+ class UnauthorizedError < StandardError; end
+
+ # Closes the WebSocket connection if it is open and returns a 404 "File not Found" response.
+ def reject_unauthorized_connection
+ logger.error "An unauthorized connection attempt was rejected"
+ raise UnauthorizedError
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb
new file mode 100644
index 0000000000..0044afad98
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/base.rb
@@ -0,0 +1,262 @@
+# frozen_string_literal: true
+
+require "action_dispatch"
+
+module ActionCable
+ module Connection
+ # For every WebSocket connection the Action Cable server accepts, a Connection object will be instantiated. This instance becomes the parent
+ # of all of the channel subscriptions that are created from there on. Incoming messages are then routed to these channel subscriptions
+ # based on an identifier sent by the Action Cable consumer. The Connection itself does not deal with any specific application logic beyond
+ # authentication and authorization.
+ #
+ # Here's a basic example:
+ #
+ # module ApplicationCable
+ # class Connection < ActionCable::Connection::Base
+ # identified_by :current_user
+ #
+ # def connect
+ # self.current_user = find_verified_user
+ # logger.add_tags current_user.name
+ # end
+ #
+ # def disconnect
+ # # Any cleanup work needed when the cable connection is cut.
+ # end
+ #
+ # private
+ # def find_verified_user
+ # User.find_by_identity(cookies.encrypted[:identity_id]) ||
+ # reject_unauthorized_connection
+ # end
+ # end
+ # end
+ #
+ # First, we declare that this connection can be identified by its current_user. This allows us to later be able to find all connections
+ # established for that current_user (and potentially disconnect them). You can declare as many
+ # identification indexes as you like. Declaring an identification means that an attr_accessor is automatically set for that key.
+ #
+ # Second, we rely on the fact that the WebSocket connection is established with the cookies from the domain being sent along. This makes
+ # it easy to use signed cookies that were set when logging in via a web interface to authorize the WebSocket connection.
+ #
+ # Finally, we add a tag to the connection-specific logger with the name of the current user to easily distinguish their messages in the log.
+ #
+ # Pretty simple, eh?
+ class Base
+ include Identification
+ include InternalChannel
+ include Authorization
+
+ attr_reader :server, :env, :subscriptions, :logger, :worker_pool, :protocol
+ delegate :event_loop, :pubsub, to: :server
+
+ def initialize(server, env, coder: ActiveSupport::JSON)
+ @server, @env, @coder = server, env, coder
+
+ @worker_pool = server.worker_pool
+ @logger = new_tagged_logger
+
+ @websocket = ActionCable::Connection::WebSocket.new(env, self, event_loop)
+ @subscriptions = ActionCable::Connection::Subscriptions.new(self)
+ @message_buffer = ActionCable::Connection::MessageBuffer.new(self)
+
+ @_internal_subscriptions = nil
+ @started_at = Time.now
+ end
+
+ # Called by the server when a new WebSocket connection is established. This configures the callbacks intended for overwriting by the user.
+ # This method should not be called directly -- instead rely upon on the #connect (and #disconnect) callbacks.
+ def process #:nodoc:
+ logger.info started_request_message
+
+ if websocket.possible? && allow_request_origin?
+ respond_to_successful_request
+ else
+ respond_to_invalid_request
+ end
+ end
+
+ # Decodes WebSocket messages and dispatches them to subscribed channels.
+ # WebSocket message transfer encoding is always JSON.
+ def receive(websocket_message) #:nodoc:
+ send_async :dispatch_websocket_message, websocket_message
+ end
+
+ def dispatch_websocket_message(websocket_message) #:nodoc:
+ if websocket.alive?
+ subscriptions.execute_command decode(websocket_message)
+ else
+ logger.error "Ignoring message processed after the WebSocket was closed: #{websocket_message.inspect})"
+ end
+ end
+
+ def transmit(cable_message) # :nodoc:
+ websocket.transmit encode(cable_message)
+ end
+
+ # Close the WebSocket connection.
+ def close(reason: nil, reconnect: true)
+ transmit(
+ type: ActionCable::INTERNAL[:message_types][:disconnect],
+ reason: reason,
+ reconnect: reconnect
+ )
+ websocket.close
+ end
+
+ # Invoke a method on the connection asynchronously through the pool of thread workers.
+ def send_async(method, *arguments)
+ worker_pool.async_invoke(self, method, *arguments)
+ end
+
+ # Return a basic hash of statistics for the connection keyed with <tt>identifier</tt>, <tt>started_at</tt>, <tt>subscriptions</tt>, and <tt>request_id</tt>.
+ # This can be returned by a health check against the connection.
+ def statistics
+ {
+ identifier: connection_identifier,
+ started_at: @started_at,
+ subscriptions: subscriptions.identifiers,
+ request_id: @env["action_dispatch.request_id"]
+ }
+ end
+
+ def beat
+ transmit type: ActionCable::INTERNAL[:message_types][:ping], message: Time.now.to_i
+ end
+
+ def on_open # :nodoc:
+ send_async :handle_open
+ end
+
+ def on_message(message) # :nodoc:
+ message_buffer.append message
+ end
+
+ def on_error(message) # :nodoc:
+ # log errors to make diagnosing socket errors easier
+ logger.error "WebSocket error occurred: #{message}"
+ end
+
+ def on_close(reason, code) # :nodoc:
+ send_async :handle_close
+ end
+
+ private
+ attr_reader :websocket
+ attr_reader :message_buffer
+
+ # The request that initiated the WebSocket connection is available here. This gives access to the environment, cookies, etc.
+ def request # :doc:
+ @request ||= begin
+ environment = Rails.application.env_config.merge(env) if defined?(Rails.application) && Rails.application
+ ActionDispatch::Request.new(environment || env)
+ end
+ end
+
+ # The cookies of the request that initiated the WebSocket connection. Useful for performing authorization checks.
+ def cookies # :doc:
+ request.cookie_jar
+ end
+
+ def encode(cable_message)
+ @coder.encode cable_message
+ end
+
+ def decode(websocket_message)
+ @coder.decode websocket_message
+ end
+
+ def handle_open
+ @protocol = websocket.protocol
+ connect if respond_to?(:connect)
+ subscribe_to_internal_channel
+ send_welcome_message
+
+ message_buffer.process!
+ server.add_connection(self)
+ rescue ActionCable::Connection::Authorization::UnauthorizedError
+ close(reason: ActionCable::INTERNAL[:disconnect_reasons][:unauthorized], reconnect: false) if websocket.alive?
+ end
+
+ def handle_close
+ logger.info finished_request_message
+
+ server.remove_connection(self)
+
+ subscriptions.unsubscribe_from_all
+ unsubscribe_from_internal_channel
+
+ disconnect if respond_to?(:disconnect)
+ end
+
+ def send_welcome_message
+ # Send welcome message to the internal connection monitor channel.
+ # This ensures the connection monitor state is reset after a successful
+ # websocket connection.
+ transmit type: ActionCable::INTERNAL[:message_types][:welcome]
+ end
+
+ def allow_request_origin?
+ return true if server.config.disable_request_forgery_protection
+
+ proto = Rack::Request.new(env).ssl? ? "https" : "http"
+ if server.config.allow_same_origin_as_host && env["HTTP_ORIGIN"] == "#{proto}://#{env['HTTP_HOST']}"
+ true
+ elsif Array(server.config.allowed_request_origins).any? { |allowed_origin| allowed_origin === env["HTTP_ORIGIN"] }
+ true
+ else
+ logger.error("Request origin not allowed: #{env['HTTP_ORIGIN']}")
+ false
+ end
+ end
+
+ def respond_to_successful_request
+ logger.info successful_request_message
+ websocket.rack_response
+ end
+
+ def respond_to_invalid_request
+ close(reason: ActionCable::INTERNAL[:disconnect_reasons][:invalid_request]) if websocket.alive?
+
+ logger.error invalid_request_message
+ logger.info finished_request_message
+ [ 404, { "Content-Type" => "text/plain" }, [ "Page not found" ] ]
+ end
+
+ # Tags are declared in the server but computed in the connection. This allows us per-connection tailored tags.
+ def new_tagged_logger
+ TaggedLoggerProxy.new server.logger,
+ tags: server.config.log_tags.map { |tag| tag.respond_to?(:call) ? tag.call(request) : tag.to_s.camelize }
+ end
+
+ def started_request_message
+ 'Started %s "%s"%s for %s at %s' % [
+ request.request_method,
+ request.filtered_path,
+ websocket.possible? ? " [WebSocket]" : "[non-WebSocket]",
+ request.ip,
+ Time.now.to_s ]
+ end
+
+ def finished_request_message
+ 'Finished "%s"%s for %s at %s' % [
+ request.filtered_path,
+ websocket.possible? ? " [WebSocket]" : "[non-WebSocket]",
+ request.ip,
+ Time.now.to_s ]
+ end
+
+ def invalid_request_message
+ "Failed to upgrade to WebSocket (REQUEST_METHOD: %s, HTTP_CONNECTION: %s, HTTP_UPGRADE: %s)" % [
+ env["REQUEST_METHOD"], env["HTTP_CONNECTION"], env["HTTP_UPGRADE"]
+ ]
+ end
+
+ def successful_request_message
+ "Successfully upgraded to WebSocket (REQUEST_METHOD: %s, HTTP_CONNECTION: %s, HTTP_UPGRADE: %s)" % [
+ env["REQUEST_METHOD"], env["HTTP_CONNECTION"], env["HTTP_UPGRADE"]
+ ]
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/connection/client_socket.rb b/actioncable/lib/action_cable/connection/client_socket.rb
new file mode 100644
index 0000000000..4b1964c4ae
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/client_socket.rb
@@ -0,0 +1,157 @@
+# frozen_string_literal: true
+
+require "websocket/driver"
+
+module ActionCable
+ module Connection
+ #--
+ # This class is heavily based on faye-websocket-ruby
+ #
+ # Copyright (c) 2010-2015 James Coglan
+ class ClientSocket # :nodoc:
+ def self.determine_url(env)
+ scheme = secure_request?(env) ? "wss:" : "ws:"
+ "#{ scheme }//#{ env['HTTP_HOST'] }#{ env['REQUEST_URI'] }"
+ end
+
+ def self.secure_request?(env)
+ return true if env["HTTPS"] == "on"
+ return true if env["HTTP_X_FORWARDED_SSL"] == "on"
+ return true if env["HTTP_X_FORWARDED_SCHEME"] == "https"
+ return true if env["HTTP_X_FORWARDED_PROTO"] == "https"
+ return true if env["rack.url_scheme"] == "https"
+
+ false
+ end
+
+ CONNECTING = 0
+ OPEN = 1
+ CLOSING = 2
+ CLOSED = 3
+
+ attr_reader :env, :url
+
+ def initialize(env, event_target, event_loop, protocols)
+ @env = env
+ @event_target = event_target
+ @event_loop = event_loop
+
+ @url = ClientSocket.determine_url(@env)
+
+ @driver = @driver_started = nil
+ @close_params = ["", 1006]
+
+ @ready_state = CONNECTING
+
+ # The driver calls +env+, +url+, and +write+
+ @driver = ::WebSocket::Driver.rack(self, protocols: protocols)
+
+ @driver.on(:open) { |e| open }
+ @driver.on(:message) { |e| receive_message(e.data) }
+ @driver.on(:close) { |e| begin_close(e.reason, e.code) }
+ @driver.on(:error) { |e| emit_error(e.message) }
+
+ @stream = ActionCable::Connection::Stream.new(@event_loop, self)
+ end
+
+ def start_driver
+ return if @driver.nil? || @driver_started
+ @stream.hijack_rack_socket
+
+ if callback = @env["async.callback"]
+ callback.call([101, {}, @stream])
+ end
+
+ @driver_started = true
+ @driver.start
+ end
+
+ def rack_response
+ start_driver
+ [ -1, {}, [] ]
+ end
+
+ def write(data)
+ @stream.write(data)
+ rescue => e
+ emit_error e.message
+ end
+
+ def transmit(message)
+ return false if @ready_state > OPEN
+ case message
+ when Numeric then @driver.text(message.to_s)
+ when String then @driver.text(message)
+ when Array then @driver.binary(message)
+ else false
+ end
+ end
+
+ def close(code = nil, reason = nil)
+ code ||= 1000
+ reason ||= ""
+
+ unless code == 1000 || (code >= 3000 && code <= 4999)
+ raise ArgumentError, "Failed to execute 'close' on WebSocket: " \
+ "The code must be either 1000, or between 3000 and 4999. " \
+ "#{code} is neither."
+ end
+
+ @ready_state = CLOSING unless @ready_state == CLOSED
+ @driver.close(reason, code)
+ end
+
+ def parse(data)
+ @driver.parse(data)
+ end
+
+ def client_gone
+ finalize_close
+ end
+
+ def alive?
+ @ready_state == OPEN
+ end
+
+ def protocol
+ @driver.protocol
+ end
+
+ private
+ def open
+ return unless @ready_state == CONNECTING
+ @ready_state = OPEN
+
+ @event_target.on_open
+ end
+
+ def receive_message(data)
+ return unless @ready_state == OPEN
+
+ @event_target.on_message(data)
+ end
+
+ def emit_error(message)
+ return if @ready_state >= CLOSING
+
+ @event_target.on_error(message)
+ end
+
+ def begin_close(reason, code)
+ return if @ready_state == CLOSED
+ @ready_state = CLOSING
+ @close_params = [reason, code]
+
+ @stream.shutdown if @stream
+ finalize_close
+ end
+
+ def finalize_close
+ return if @ready_state == CLOSED
+ @ready_state = CLOSED
+
+ @event_target.on_close(*@close_params)
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/connection/identification.rb b/actioncable/lib/action_cable/connection/identification.rb
new file mode 100644
index 0000000000..cc544685dd
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/identification.rb
@@ -0,0 +1,47 @@
+# frozen_string_literal: true
+
+require "set"
+
+module ActionCable
+ module Connection
+ module Identification
+ extend ActiveSupport::Concern
+
+ included do
+ class_attribute :identifiers, default: Set.new
+ end
+
+ module ClassMethods
+ # Mark a key as being a connection identifier index that can then be used to find the specific connection again later.
+ # Common identifiers are current_user and current_account, but could be anything, really.
+ #
+ # Note that anything marked as an identifier will automatically create a delegate by the same name on any
+ # channel instances created off the connection.
+ def identified_by(*identifiers)
+ Array(identifiers).each { |identifier| attr_accessor identifier }
+ self.identifiers += identifiers
+ end
+ end
+
+ # Return a single connection identifier that combines the value of all the registered identifiers into a single gid.
+ def connection_identifier
+ unless defined? @connection_identifier
+ @connection_identifier = connection_gid identifiers.map { |id| instance_variable_get("@#{id}") }.compact
+ end
+
+ @connection_identifier
+ end
+
+ private
+ def connection_gid(ids)
+ ids.map do |o|
+ if o.respond_to? :to_gid_param
+ o.to_gid_param
+ else
+ o.to_s
+ end
+ end.sort.join(":")
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/connection/internal_channel.rb b/actioncable/lib/action_cable/connection/internal_channel.rb
new file mode 100644
index 0000000000..f03904137b
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/internal_channel.rb
@@ -0,0 +1,45 @@
+# frozen_string_literal: true
+
+module ActionCable
+ module Connection
+ # Makes it possible for the RemoteConnection to disconnect a specific connection.
+ module InternalChannel
+ extend ActiveSupport::Concern
+
+ private
+ def internal_channel
+ "action_cable/#{connection_identifier}"
+ end
+
+ def subscribe_to_internal_channel
+ if connection_identifier.present?
+ callback = -> (message) { process_internal_message decode(message) }
+ @_internal_subscriptions ||= []
+ @_internal_subscriptions << [ internal_channel, callback ]
+
+ server.event_loop.post { pubsub.subscribe(internal_channel, callback) }
+ logger.info "Registered connection (#{connection_identifier})"
+ end
+ end
+
+ def unsubscribe_from_internal_channel
+ if @_internal_subscriptions.present?
+ @_internal_subscriptions.each { |channel, callback| server.event_loop.post { pubsub.unsubscribe(channel, callback) } }
+ end
+ end
+
+ def process_internal_message(message)
+ case message["type"]
+ when "disconnect"
+ logger.info "Removing connection (#{connection_identifier})"
+ websocket.close
+ end
+ rescue Exception => e
+ logger.error "There was an exception - #{e.class}(#{e.message})"
+ logger.error e.backtrace.join("\n")
+
+ close
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/connection/message_buffer.rb b/actioncable/lib/action_cable/connection/message_buffer.rb
new file mode 100644
index 0000000000..965841b67e
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/message_buffer.rb
@@ -0,0 +1,54 @@
+# frozen_string_literal: true
+
+module ActionCable
+ module Connection
+ # Allows us to buffer messages received from the WebSocket before the Connection has been fully initialized, and is ready to receive them.
+ class MessageBuffer # :nodoc:
+ def initialize(connection)
+ @connection = connection
+ @buffered_messages = []
+ end
+
+ def append(message)
+ if valid? message
+ if processing?
+ receive message
+ else
+ buffer message
+ end
+ else
+ connection.logger.error "Couldn't handle non-string message: #{message.class}"
+ end
+ end
+
+ def processing?
+ @processing
+ end
+
+ def process!
+ @processing = true
+ receive_buffered_messages
+ end
+
+ private
+ attr_reader :connection
+ attr_reader :buffered_messages
+
+ def valid?(message)
+ message.is_a?(String)
+ end
+
+ def receive(message)
+ connection.receive message
+ end
+
+ def buffer(message)
+ buffered_messages << message
+ end
+
+ def receive_buffered_messages
+ receive buffered_messages.shift until buffered_messages.empty?
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/connection/stream.rb b/actioncable/lib/action_cable/connection/stream.rb
new file mode 100644
index 0000000000..e658948a55
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/stream.rb
@@ -0,0 +1,117 @@
+# frozen_string_literal: true
+
+require "thread"
+
+module ActionCable
+ module Connection
+ #--
+ # This class is heavily based on faye-websocket-ruby
+ #
+ # Copyright (c) 2010-2015 James Coglan
+ class Stream # :nodoc:
+ def initialize(event_loop, socket)
+ @event_loop = event_loop
+ @socket_object = socket
+ @stream_send = socket.env["stream.send"]
+
+ @rack_hijack_io = nil
+ @write_lock = Mutex.new
+
+ @write_head = nil
+ @write_buffer = Queue.new
+ end
+
+ def each(&callback)
+ @stream_send ||= callback
+ end
+
+ def close
+ shutdown
+ @socket_object.client_gone
+ end
+
+ def shutdown
+ clean_rack_hijack
+ end
+
+ def write(data)
+ if @stream_send
+ return @stream_send.call(data)
+ end
+
+ if @write_lock.try_lock
+ begin
+ if @write_head.nil? && @write_buffer.empty?
+ written = @rack_hijack_io.write_nonblock(data, exception: false)
+
+ case written
+ when :wait_writable
+ # proceed below
+ when data.bytesize
+ return data.bytesize
+ else
+ @write_head = data.byteslice(written, data.bytesize)
+ @event_loop.writes_pending @rack_hijack_io
+
+ return data.bytesize
+ end
+ end
+ ensure
+ @write_lock.unlock
+ end
+ end
+
+ @write_buffer << data
+ @event_loop.writes_pending @rack_hijack_io
+
+ data.bytesize
+ rescue EOFError, Errno::ECONNRESET
+ @socket_object.client_gone
+ end
+
+ def flush_write_buffer
+ @write_lock.synchronize do
+ loop do
+ if @write_head.nil?
+ return true if @write_buffer.empty?
+ @write_head = @write_buffer.pop
+ end
+
+ written = @rack_hijack_io.write_nonblock(@write_head, exception: false)
+ case written
+ when :wait_writable
+ return false
+ when @write_head.bytesize
+ @write_head = nil
+ else
+ @write_head = @write_head.byteslice(written, @write_head.bytesize)
+ return false
+ end
+ end
+ end
+ end
+
+ def receive(data)
+ @socket_object.parse(data)
+ end
+
+ def hijack_rack_socket
+ return unless @socket_object.env["rack.hijack"]
+
+ # This should return the underlying io according to the SPEC:
+ @rack_hijack_io = @socket_object.env["rack.hijack"].call
+ # Retain existing behaviour if required:
+ @rack_hijack_io ||= @socket_object.env["rack.hijack_io"]
+
+ @event_loop.attach(@rack_hijack_io, self)
+ end
+
+ private
+ def clean_rack_hijack
+ return unless @rack_hijack_io
+ @event_loop.detach(@rack_hijack_io, self)
+ @rack_hijack_io = nil
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/connection/stream_event_loop.rb b/actioncable/lib/action_cable/connection/stream_event_loop.rb
new file mode 100644
index 0000000000..d95afc50ba
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/stream_event_loop.rb
@@ -0,0 +1,136 @@
+# frozen_string_literal: true
+
+require "nio"
+require "thread"
+
+module ActionCable
+ module Connection
+ class StreamEventLoop
+ def initialize
+ @nio = @executor = @thread = nil
+ @map = {}
+ @stopping = false
+ @todo = Queue.new
+
+ @spawn_mutex = Mutex.new
+ end
+
+ def timer(interval, &block)
+ Concurrent::TimerTask.new(execution_interval: interval, &block).tap(&:execute)
+ end
+
+ def post(task = nil, &block)
+ task ||= block
+
+ spawn
+ @executor << task
+ end
+
+ def attach(io, stream)
+ @todo << lambda do
+ @map[io] = @nio.register(io, :r)
+ @map[io].value = stream
+ end
+ wakeup
+ end
+
+ def detach(io, stream)
+ @todo << lambda do
+ @nio.deregister io
+ @map.delete io
+ io.close
+ end
+ wakeup
+ end
+
+ def writes_pending(io)
+ @todo << lambda do
+ if monitor = @map[io]
+ monitor.interests = :rw
+ end
+ end
+ wakeup
+ end
+
+ def stop
+ @stopping = true
+ wakeup if @nio
+ end
+
+ private
+ def spawn
+ return if @thread && @thread.status
+
+ @spawn_mutex.synchronize do
+ return if @thread && @thread.status
+
+ @nio ||= NIO::Selector.new
+
+ @executor ||= Concurrent::ThreadPoolExecutor.new(
+ min_threads: 1,
+ max_threads: 10,
+ max_queue: 0,
+ )
+
+ @thread = Thread.new { run }
+
+ return true
+ end
+ end
+
+ def wakeup
+ spawn || @nio.wakeup
+ end
+
+ def run
+ loop do
+ if @stopping
+ @nio.close
+ break
+ end
+
+ until @todo.empty?
+ @todo.pop(true).call
+ end
+
+ next unless monitors = @nio.select
+
+ monitors.each do |monitor|
+ io = monitor.io
+ stream = monitor.value
+
+ begin
+ if monitor.writable?
+ if stream.flush_write_buffer
+ monitor.interests = :r
+ end
+ next unless monitor.readable?
+ end
+
+ incoming = io.read_nonblock(4096, exception: false)
+ case incoming
+ when :wait_readable
+ next
+ when nil
+ stream.close
+ else
+ stream.receive incoming
+ end
+ rescue
+ # We expect one of EOFError or Errno::ECONNRESET in
+ # normal operation (when the client goes away). But if
+ # anything else goes wrong, this is still the best way
+ # to handle it.
+ begin
+ stream.close
+ rescue
+ @nio.deregister io
+ @map.delete io
+ end
+ end
+ end
+ end
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/connection/subscriptions.rb b/actioncable/lib/action_cable/connection/subscriptions.rb
new file mode 100644
index 0000000000..1ad8d05107
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/subscriptions.rb
@@ -0,0 +1,79 @@
+# frozen_string_literal: true
+
+require "active_support/core_ext/hash/indifferent_access"
+
+module ActionCable
+ module Connection
+ # Collection class for all the channel subscriptions established on a given connection. Responsible for routing incoming commands that arrive on
+ # the connection to the proper channel.
+ class Subscriptions # :nodoc:
+ def initialize(connection)
+ @connection = connection
+ @subscriptions = {}
+ end
+
+ def execute_command(data)
+ case data["command"]
+ when "subscribe" then add data
+ when "unsubscribe" then remove data
+ when "message" then perform_action data
+ else
+ logger.error "Received unrecognized command in #{data.inspect}"
+ end
+ rescue Exception => e
+ logger.error "Could not execute command from (#{data.inspect}) [#{e.class} - #{e.message}]: #{e.backtrace.first(5).join(" | ")}"
+ end
+
+ def add(data)
+ id_key = data["identifier"]
+ id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access
+
+ return if subscriptions.key?(id_key)
+
+ subscription_klass = id_options[:channel].safe_constantize
+
+ if subscription_klass && ActionCable::Channel::Base >= subscription_klass
+ subscription = subscription_klass.new(connection, id_key, id_options)
+ subscriptions[id_key] = subscription
+ subscription.subscribe_to_channel
+ else
+ logger.error "Subscription class not found: #{id_options[:channel].inspect}"
+ end
+ end
+
+ def remove(data)
+ logger.info "Unsubscribing from channel: #{data['identifier']}"
+ remove_subscription find(data)
+ end
+
+ def remove_subscription(subscription)
+ subscription.unsubscribe_from_channel
+ subscriptions.delete(subscription.identifier)
+ end
+
+ def perform_action(data)
+ find(data).perform_action ActiveSupport::JSON.decode(data["data"])
+ end
+
+ def identifiers
+ subscriptions.keys
+ end
+
+ def unsubscribe_from_all
+ subscriptions.each { |id, channel| remove_subscription(channel) }
+ end
+
+ private
+ attr_reader :connection, :subscriptions
+ delegate :logger, to: :connection
+
+ def find(data)
+ if subscription = subscriptions[data["identifier"]]
+ subscription
+ else
+ raise "Unable to find subscription with identifier: #{data['identifier']}"
+ end
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/connection/tagged_logger_proxy.rb b/actioncable/lib/action_cable/connection/tagged_logger_proxy.rb
new file mode 100644
index 0000000000..85831806a9
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/tagged_logger_proxy.rb
@@ -0,0 +1,42 @@
+# frozen_string_literal: true
+
+module ActionCable
+ module Connection
+ # Allows the use of per-connection tags against the server logger. This wouldn't work using the traditional
+ # <tt>ActiveSupport::TaggedLogging</tt> enhanced Rails.logger, as that logger will reset the tags between requests.
+ # The connection is long-lived, so it needs its own set of tags for its independent duration.
+ class TaggedLoggerProxy
+ attr_reader :tags
+
+ def initialize(logger, tags:)
+ @logger = logger
+ @tags = tags.flatten
+ end
+
+ def add_tags(*tags)
+ @tags += tags.flatten
+ @tags = @tags.uniq
+ end
+
+ def tag(logger)
+ if logger.respond_to?(:tagged)
+ current_tags = tags - logger.formatter.current_tags
+ logger.tagged(*current_tags) { yield }
+ else
+ yield
+ end
+ end
+
+ %i( debug info warn error fatal unknown ).each do |severity|
+ define_method(severity) do |message|
+ log severity, message
+ end
+ end
+
+ private
+ def log(type, message) # :doc:
+ tag(@logger) { @logger.send type, message }
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/connection/web_socket.rb b/actioncable/lib/action_cable/connection/web_socket.rb
new file mode 100644
index 0000000000..31f29fdd2f
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/web_socket.rb
@@ -0,0 +1,41 @@
+# frozen_string_literal: true
+
+require "websocket/driver"
+
+module ActionCable
+ module Connection
+ # Wrap the real socket to minimize the externally-presented API
+ class WebSocket # :nodoc:
+ def initialize(env, event_target, event_loop, protocols: ActionCable::INTERNAL[:protocols])
+ @websocket = ::WebSocket::Driver.websocket?(env) ? ClientSocket.new(env, event_target, event_loop, protocols) : nil
+ end
+
+ def possible?
+ websocket
+ end
+
+ def alive?
+ websocket && websocket.alive?
+ end
+
+ def transmit(data)
+ websocket.transmit data
+ end
+
+ def close
+ websocket.close
+ end
+
+ def protocol
+ websocket.protocol
+ end
+
+ def rack_response
+ websocket.rack_response
+ end
+
+ private
+ attr_reader :websocket
+ end
+ end
+end