aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable/connection
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable/lib/action_cable/connection')
-rw-r--r--actioncable/lib/action_cable/connection/base.rb56
-rw-r--r--actioncable/lib/action_cable/connection/client_socket.rb150
-rw-r--r--actioncable/lib/action_cable/connection/identification.rb2
-rw-r--r--actioncable/lib/action_cable/connection/internal_channel.rb12
-rw-r--r--actioncable/lib/action_cable/connection/stream.rb59
-rw-r--r--actioncable/lib/action_cable/connection/stream_event_loop.rb94
-rw-r--r--actioncable/lib/action_cable/connection/web_socket.rb22
7 files changed, 363 insertions, 32 deletions
diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb
index 977856d656..1acef93025 100644
--- a/actioncable/lib/action_cable/connection/base.rb
+++ b/actioncable/lib/action_cable/connection/base.rb
@@ -35,7 +35,7 @@ module ActionCable
#
# First, we declare that this connection can be identified by its current_user. This allows us later to be able to find all connections
# established for that current_user (and potentially disconnect them if the user was removed from an account). You can declare as many
- # identification indexes as you like. Declaring an identification means that a attr_accessor is automatically set for that key.
+ # identification indexes as you like. Declaring an identification means that an attr_accessor is automatically set for that key.
#
# Second, we rely on the fact that the WebSocket connection is established with the cookies from the domain being sent along. This makes
# it easy to use signed cookies that were set when logging in via a web interface to authorize the WebSocket connection.
@@ -49,18 +49,18 @@ module ActionCable
include Authorization
attr_reader :server, :env, :subscriptions, :logger
- delegate :worker_pool, :pubsub, to: :server
+ delegate :stream_event_loop, :worker_pool, :pubsub, to: :server
def initialize(server, env)
@server, @env = server, env
@logger = new_tagged_logger
- @websocket = ActionCable::Connection::WebSocket.new(env)
+ @websocket = ActionCable::Connection::WebSocket.new(env, self, stream_event_loop)
@subscriptions = ActionCable::Connection::Subscriptions.new(self)
@message_buffer = ActionCable::Connection::MessageBuffer.new(self)
- @_internal_redis_subscriptions = nil
+ @_internal_subscriptions = nil
@started_at = Time.now
end
@@ -70,10 +70,6 @@ module ActionCable
logger.info started_request_message
if websocket.possible? && allow_request_origin?
- websocket.on(:open) { |event| send_async :on_open }
- websocket.on(:message) { |event| on_message event.data }
- websocket.on(:close) { |event| send_async :on_close }
-
respond_to_successful_request
else
respond_to_invalid_request
@@ -103,7 +99,7 @@ module ActionCable
# Invoke a method on the connection asynchronously through the pool of thread workers.
def send_async(method, *arguments)
- worker_pool.async.invoke(self, method, *arguments)
+ worker_pool.async_invoke(self, method, *arguments)
end
# Return a basic hash of statistics for the connection keyed with `identifier`, `started_at`, and `subscriptions`.
@@ -121,6 +117,22 @@ module ActionCable
transmit ActiveSupport::JSON.encode(identifier: ActionCable::INTERNAL[:identifiers][:ping], message: Time.now.to_i)
end
+ def on_open # :nodoc:
+ send_async :handle_open
+ end
+
+ def on_message(message) # :nodoc:
+ message_buffer.append message
+ end
+
+ def on_error(message) # :nodoc:
+ # ignore
+ end
+
+ def on_close(reason, code) # :nodoc:
+ send_async :handle_close
+ end
+
protected
# The request that initiated the WebSocket connection is available here. This gives access to the environment, cookies, etc.
def request
@@ -139,7 +151,7 @@ module ActionCable
attr_reader :message_buffer
private
- def on_open
+ def handle_open
connect if respond_to?(:connect)
subscribe_to_internal_channel
beat
@@ -150,11 +162,7 @@ module ActionCable
respond_to_invalid_request
end
- def on_message(message)
- message_buffer.append message
- end
-
- def on_close
+ def handle_close
logger.info finished_request_message
server.remove_connection(self)
@@ -177,12 +185,14 @@ module ActionCable
end
def respond_to_successful_request
+ logger.info successful_request_message
websocket.rack_response
end
def respond_to_invalid_request
close if websocket.alive?
+ logger.error invalid_request_message
logger.info finished_request_message
[ 404, { 'Content-Type' => 'text/plain' }, [ 'Page not found' ] ]
end
@@ -197,7 +207,7 @@ module ActionCable
'Started %s "%s"%s for %s at %s' % [
request.request_method,
request.filtered_path,
- websocket.possible? ? ' [WebSocket]' : '',
+ websocket.possible? ? ' [WebSocket]' : '[non-WebSocket]',
request.ip,
Time.now.to_s ]
end
@@ -205,10 +215,22 @@ module ActionCable
def finished_request_message
'Finished "%s"%s for %s at %s' % [
request.filtered_path,
- websocket.possible? ? ' [WebSocket]' : '',
+ websocket.possible? ? ' [WebSocket]' : '[non-WebSocket]',
request.ip,
Time.now.to_s ]
end
+
+ def invalid_request_message
+ 'Failed to upgrade to WebSocket (REQUEST_METHOD: %s, HTTP_CONNECTION: %s, HTTP_UPGRADE: %s)' % [
+ env["REQUEST_METHOD"], env["HTTP_CONNECTION"], env["HTTP_UPGRADE"]
+ ]
+ end
+
+ def successful_request_message
+ 'Successfully upgraded to WebSocket (REQUEST_METHOD: %s, HTTP_CONNECTION: %s, HTTP_UPGRADE: %s)' % [
+ env["REQUEST_METHOD"], env["HTTP_CONNECTION"], env["HTTP_UPGRADE"]
+ ]
+ end
end
end
end
diff --git a/actioncable/lib/action_cable/connection/client_socket.rb b/actioncable/lib/action_cable/connection/client_socket.rb
new file mode 100644
index 0000000000..ef937d7c16
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/client_socket.rb
@@ -0,0 +1,150 @@
+require 'websocket/driver'
+
+module ActionCable
+ module Connection
+ #--
+ # This class is heavily based on faye-websocket-ruby
+ #
+ # Copyright (c) 2010-2015 James Coglan
+ class ClientSocket # :nodoc:
+ def self.determine_url(env)
+ scheme = secure_request?(env) ? 'wss:' : 'ws:'
+ "#{ scheme }//#{ env['HTTP_HOST'] }#{ env['REQUEST_URI'] }"
+ end
+
+ def self.secure_request?(env)
+ return true if env['HTTPS'] == 'on'
+ return true if env['HTTP_X_FORWARDED_SSL'] == 'on'
+ return true if env['HTTP_X_FORWARDED_SCHEME'] == 'https'
+ return true if env['HTTP_X_FORWARDED_PROTO'] == 'https'
+ return true if env['rack.url_scheme'] == 'https'
+
+ return false
+ end
+
+ CONNECTING = 0
+ OPEN = 1
+ CLOSING = 2
+ CLOSED = 3
+
+ attr_reader :env, :url
+
+ def initialize(env, event_target, stream_event_loop)
+ @env = env
+ @event_target = event_target
+ @stream_event_loop = stream_event_loop
+
+ @url = ClientSocket.determine_url(@env)
+
+ @driver = @driver_started = nil
+ @close_params = ['', 1006]
+
+ @ready_state = CONNECTING
+
+ # The driver calls +env+, +url+, and +write+
+ @driver = ::WebSocket::Driver.rack(self)
+
+ @driver.on(:open) { |e| open }
+ @driver.on(:message) { |e| receive_message(e.data) }
+ @driver.on(:close) { |e| begin_close(e.reason, e.code) }
+ @driver.on(:error) { |e| emit_error(e.message) }
+
+ @stream = ActionCable::Connection::Stream.new(@stream_event_loop, self)
+
+ if callback = @env['async.callback']
+ callback.call([101, {}, @stream])
+ end
+ end
+
+ def start_driver
+ return if @driver.nil? || @driver_started
+ @driver_started = true
+ @driver.start
+ end
+
+ def rack_response
+ start_driver
+ [ -1, {}, [] ]
+ end
+
+ def write(data)
+ @stream.write(data)
+ end
+
+ def transmit(message)
+ return false if @ready_state > OPEN
+ case message
+ when Numeric then @driver.text(message.to_s)
+ when String then @driver.text(message)
+ when Array then @driver.binary(message)
+ else false
+ end
+ end
+
+ def close(code = nil, reason = nil)
+ code ||= 1000
+ reason ||= ''
+
+ unless code == 1000 or (code >= 3000 and code <= 4999)
+ raise ArgumentError, "Failed to execute 'close' on WebSocket: " +
+ "The code must be either 1000, or between 3000 and 4999. " +
+ "#{code} is neither."
+ end
+
+ @ready_state = CLOSING unless @ready_state == CLOSED
+ @driver.close(reason, code)
+ end
+
+ def parse(data)
+ @driver.parse(data)
+ end
+
+ def client_gone
+ finalize_close
+ end
+
+ def alive?
+ @ready_state == OPEN
+ end
+
+ private
+ def open
+ return unless @ready_state == CONNECTING
+ @ready_state = OPEN
+
+ @event_target.on_open
+ end
+
+ def receive_message(data)
+ return unless @ready_state == OPEN
+
+ @event_target.on_message(data)
+ end
+
+ def emit_error(message)
+ return if @ready_state >= CLOSING
+
+ @event_target.on_error(message)
+ end
+
+ def begin_close(reason, code)
+ return if @ready_state == CLOSED
+ @ready_state = CLOSING
+ @close_params = [reason, code]
+
+ if @stream
+ @stream.shutdown
+ else
+ finalize_close
+ end
+ end
+
+ def finalize_close
+ return if @ready_state == CLOSED
+ @ready_state = CLOSED
+
+ @event_target.on_close(*@close_params)
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/connection/identification.rb b/actioncable/lib/action_cable/connection/identification.rb
index 2d75ff8d6d..885ff3f102 100644
--- a/actioncable/lib/action_cable/connection/identification.rb
+++ b/actioncable/lib/action_cable/connection/identification.rb
@@ -11,7 +11,7 @@ module ActionCable
end
class_methods do
- # Mark a key as being a connection identifier index that can then used to find the specific connection again later.
+ # Mark a key as being a connection identifier index that can then be used to find the specific connection again later.
# Common identifiers are current_user and current_account, but could be anything really.
#
# Note that anything marked as an identifier will automatically create a delegate by the same name on any
diff --git a/actioncable/lib/action_cable/connection/internal_channel.rb b/actioncable/lib/action_cable/connection/internal_channel.rb
index c065a24ab7..27826792b3 100644
--- a/actioncable/lib/action_cable/connection/internal_channel.rb
+++ b/actioncable/lib/action_cable/connection/internal_channel.rb
@@ -5,24 +5,24 @@ module ActionCable
extend ActiveSupport::Concern
private
- def internal_redis_channel
+ def internal_channel
"action_cable/#{connection_identifier}"
end
def subscribe_to_internal_channel
if connection_identifier.present?
callback = -> (message) { process_internal_message(message) }
- @_internal_redis_subscriptions ||= []
- @_internal_redis_subscriptions << [ internal_redis_channel, callback ]
+ @_internal_subscriptions ||= []
+ @_internal_subscriptions << [ internal_channel, callback ]
- EM.next_tick { pubsub.subscribe(internal_redis_channel, &callback) }
+ Concurrent.global_io_executor.post { pubsub.subscribe(internal_channel, callback) }
logger.info "Registered connection (#{connection_identifier})"
end
end
def unsubscribe_from_internal_channel
- if @_internal_redis_subscriptions.present?
- @_internal_redis_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe_proc(channel, callback) } }
+ if @_internal_subscriptions.present?
+ @_internal_subscriptions.each { |channel, callback| Concurrent.global_io_executor.post { pubsub.unsubscribe(channel, callback) } }
end
end
diff --git a/actioncable/lib/action_cable/connection/stream.rb b/actioncable/lib/action_cable/connection/stream.rb
new file mode 100644
index 0000000000..ace250cd16
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/stream.rb
@@ -0,0 +1,59 @@
+module ActionCable
+ module Connection
+ #--
+ # This class is heavily based on faye-websocket-ruby
+ #
+ # Copyright (c) 2010-2015 James Coglan
+ class Stream
+ def initialize(event_loop, socket)
+ @event_loop = event_loop
+ @socket_object = socket
+ @stream_send = socket.env['stream.send']
+
+ @rack_hijack_io = nil
+
+ hijack_rack_socket
+ end
+
+ def each(&callback)
+ @stream_send ||= callback
+ end
+
+ def close
+ shutdown
+ @socket_object.client_gone
+ end
+
+ def shutdown
+ clean_rack_hijack
+ end
+
+ def write(data)
+ return @rack_hijack_io.write(data) if @rack_hijack_io
+ return @stream_send.call(data) if @stream_send
+ rescue EOFError
+ @socket_object.client_gone
+ end
+
+ def receive(data)
+ @socket_object.parse(data)
+ end
+
+ private
+ def hijack_rack_socket
+ return unless @socket_object.env['rack.hijack']
+
+ @socket_object.env['rack.hijack'].call
+ @rack_hijack_io = @socket_object.env['rack.hijack_io']
+
+ @event_loop.attach(@rack_hijack_io, self)
+ end
+
+ def clean_rack_hijack
+ return unless @rack_hijack_io
+ @event_loop.detach(@rack_hijack_io, self)
+ @rack_hijack_io = nil
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/connection/stream_event_loop.rb b/actioncable/lib/action_cable/connection/stream_event_loop.rb
new file mode 100644
index 0000000000..e6335082d2
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/stream_event_loop.rb
@@ -0,0 +1,94 @@
+require 'nio'
+require 'thread'
+
+module ActionCable
+ module Connection
+ class StreamEventLoop
+ def initialize
+ @nio = @thread = nil
+ @map = {}
+ @stopping = false
+ @todo = Queue.new
+
+ @spawn_mutex = Mutex.new
+ spawn
+ end
+
+ def attach(io, stream)
+ @todo << lambda do
+ @map[io] = stream
+ @nio.register(io, :r)
+ end
+ wakeup
+ end
+
+ def detach(io, stream)
+ @todo << lambda do
+ @nio.deregister io
+ @map.delete io
+ end
+ wakeup
+ end
+
+ def stop
+ @stopping = true
+ wakeup if @nio
+ end
+
+ private
+ def spawn
+ return if @thread && @thread.status
+
+ @spawn_mutex.synchronize do
+ return if @thread && @thread.status
+
+ @nio ||= NIO::Selector.new
+ @thread = Thread.new { run }
+
+ return true
+ end
+ end
+
+ def wakeup
+ spawn || @nio.wakeup
+ end
+
+ def run
+ loop do
+ if @stopping
+ @nio.close
+ break
+ end
+
+ until @todo.empty?
+ @todo.pop(true).call
+ end
+
+ next unless monitors = @nio.select
+
+ monitors.each do |monitor|
+ io = monitor.io
+ stream = @map[io]
+
+ begin
+ stream.receive io.read_nonblock(4096)
+ rescue IO::WaitReadable
+ next
+ rescue
+ # We expect one of EOFError or Errno::ECONNRESET in
+ # normal operation (when the client goes away). But if
+ # anything else goes wrong, this is still the best way
+ # to handle it.
+ begin
+ stream.close
+ rescue
+ @nio.deregister io
+ @map.delete io
+ end
+ end
+ end
+ end
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/connection/web_socket.rb b/actioncable/lib/action_cable/connection/web_socket.rb
index 670d5690ae..5e89fb9b72 100644
--- a/actioncable/lib/action_cable/connection/web_socket.rb
+++ b/actioncable/lib/action_cable/connection/web_socket.rb
@@ -1,13 +1,11 @@
-require 'faye/websocket'
+require 'websocket/driver'
module ActionCable
module Connection
- # Decorate the Faye::WebSocket with helpers we need.
+ # Wrap the real socket to minimize the externally-presented API
class WebSocket
- delegate :rack_response, :close, :on, to: :websocket
-
- def initialize(env)
- @websocket = Faye::WebSocket.websocket?(env) ? Faye::WebSocket.new(env) : nil
+ def initialize(env, event_target, stream_event_loop)
+ @websocket = ::WebSocket::Driver.websocket?(env) ? ClientSocket.new(env, event_target, stream_event_loop) : nil
end
def possible?
@@ -15,11 +13,19 @@ module ActionCable
end
def alive?
- websocket && websocket.ready_state == Faye::WebSocket::API::OPEN
+ websocket && websocket.alive?
end
def transmit(data)
- websocket.send data
+ websocket.transmit data
+ end
+
+ def close
+ websocket.close
+ end
+
+ def rack_response
+ websocket.rack_response
end
protected