aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable')
-rw-r--r--actioncable/CHANGELOG.md13
-rw-r--r--actioncable/README.md2
-rw-r--r--actioncable/Rakefile2
-rw-r--r--actioncable/lib/action_cable/channel/base.rb38
-rw-r--r--actioncable/lib/action_cable/channel/streams.rb10
-rw-r--r--actioncable/lib/action_cable/connection.rb2
-rw-r--r--actioncable/lib/action_cable/connection/base.rb4
-rw-r--r--actioncable/lib/action_cable/connection/client_socket.rb2
-rw-r--r--actioncable/lib/action_cable/connection/faye_client_socket.rb48
-rw-r--r--actioncable/lib/action_cable/connection/faye_event_loop.rb44
-rw-r--r--actioncable/lib/action_cable/connection/stream.rb58
-rw-r--r--actioncable/lib/action_cable/connection/stream_event_loop.rb47
-rw-r--r--actioncable/lib/action_cable/connection/subscriptions.rb6
-rw-r--r--actioncable/lib/action_cable/connection/web_socket.rb4
-rw-r--r--actioncable/lib/action_cable/server/base.rb12
-rw-r--r--actioncable/lib/action_cable/server/configuration.rb18
-rw-r--r--actioncable/lib/action_cable/server/worker.rb2
-rw-r--r--actioncable/test/channel/base_test.rb15
-rw-r--r--actioncable/test/channel/periodic_timers_test.rb1
-rw-r--r--actioncable/test/channel/rejection_test.rb2
-rw-r--r--actioncable/test/channel/stream_test.rb31
-rw-r--r--actioncable/test/client_test.rb132
-rw-r--r--actioncable/test/connection/client_socket_test.rb16
-rw-r--r--actioncable/test/connection/stream_test.rb2
-rw-r--r--actioncable/test/server/base_test.rb33
-rw-r--r--actioncable/test/stubs/test_server.rb12
-rw-r--r--actioncable/test/subscription_adapter/common.rb4
-rw-r--r--actioncable/test/subscription_adapter/evented_redis_test.rb8
-rw-r--r--actioncable/test/test_helper.rb48
29 files changed, 332 insertions, 284 deletions
diff --git a/actioncable/CHANGELOG.md b/actioncable/CHANGELOG.md
index dec6f7c027..137c88d91b 100644
--- a/actioncable/CHANGELOG.md
+++ b/actioncable/CHANGELOG.md
@@ -1,3 +1,16 @@
+* Prevent race where the client could receive and act upon a
+ subscription confirmation before the channel's `subscribed` method
+ completed.
+
+ Fixes #25381.
+
+ *Vladimir Dementyev*
+
+* Buffer writes to websocket connections, to avoid blocking threads
+ that could be doing more useful things.
+
+ *Matthew Draper*, *Tinco Andringa*
+
* Protect against concurrent writes to a websocket connection from
multiple threads; the underlying OS write is not always threadsafe.
diff --git a/actioncable/README.md b/actioncable/README.md
index 28e2602cbf..a0b7412dd4 100644
--- a/actioncable/README.md
+++ b/actioncable/README.md
@@ -167,7 +167,7 @@ App.cable.subscriptions.create "AppearanceChannel",
buttonSelector = "[data-behavior~=appear_away]"
install: ->
- $(document).on "page:change.appearance", =>
+ $(document).on "turbolinks:load.appearance", =>
@appear()
$(document).on "click.appearance", buttonSelector, =>
diff --git a/actioncable/Rakefile b/actioncable/Rakefile
index 34649547a2..648de57004 100644
--- a/actioncable/Rakefile
+++ b/actioncable/Rakefile
@@ -21,7 +21,7 @@ namespace :test do
task :isolated do
Dir.glob("test/**/*_test.rb").all? do |file|
sh(Gem.ruby, "-w", "-Ilib:test", file)
- end or raise "Failures"
+ end || raise("Failures")
end
task :integration do
diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb
index 2e589a2cfa..a866044f95 100644
--- a/actioncable/lib/action_cable/channel/base.rb
+++ b/actioncable/lib/action_cable/channel/base.rb
@@ -144,13 +144,14 @@ module ActionCable
# When a channel is streaming via pubsub, we want to delay the confirmation
# transmission until pubsub subscription is confirmed.
- @defer_subscription_confirmation = false
+ #
+ # The counter starts at 1 because it's awaiting a call to #subscribe_to_channel
+ @defer_subscription_confirmation_counter = Concurrent::AtomicFixnum.new(1)
@reject_subscription = nil
@subscription_confirmation_sent = nil
delegate_connection_identifiers
- subscribe_to_channel
end
# Extract the action name from the passed data and process it via the channel. The process will ensure
@@ -169,6 +170,17 @@ module ActionCable
end
end
+ # This method is called after subscription has been added to the connection
+ # and confirms or rejects the subscription.
+ def subscribe_to_channel
+ run_callbacks :subscribe do
+ subscribed
+ end
+
+ reject_subscription if subscription_rejected?
+ ensure_confirmation_sent
+ end
+
# Called by the cable connection when it's cut, so the channel has a chance to cleanup with callbacks.
# This method is not intended to be called directly by the user. Instead, overwrite the #unsubscribed callback.
def unsubscribe_from_channel # :nodoc:
@@ -201,12 +213,18 @@ module ActionCable
end
end
+ def ensure_confirmation_sent
+ return if subscription_rejected?
+ @defer_subscription_confirmation_counter.decrement
+ transmit_subscription_confirmation unless defer_subscription_confirmation?
+ end
+
def defer_subscription_confirmation!
- @defer_subscription_confirmation = true
+ @defer_subscription_confirmation_counter.increment
end
def defer_subscription_confirmation?
- @defer_subscription_confirmation
+ @defer_subscription_confirmation_counter.value > 0
end
def subscription_confirmation_sent?
@@ -230,18 +248,6 @@ module ActionCable
end
end
- def subscribe_to_channel
- run_callbacks :subscribe do
- subscribed
- end
-
- if subscription_rejected?
- reject_subscription
- else
- transmit_subscription_confirmation unless defer_subscription_confirmation?
- end
- end
-
def extract_action(data)
(data["action"].presence || :receive).to_sym
end
diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb
index 561750d713..dbba333353 100644
--- a/actioncable/lib/action_cable/channel/streams.rb
+++ b/actioncable/lib/action_cable/channel/streams.rb
@@ -69,8 +69,8 @@ module ActionCable
# Start streaming from the named <tt>broadcasting</tt> pubsub queue. Optionally, you can pass a <tt>callback</tt> that'll be used
# instead of the default of just transmitting the updates straight to the subscriber.
- # Pass `coder: ActiveSupport::JSON` to decode messages as JSON before passing to the callback.
- # Defaults to `coder: nil` which does no decoding, passes raw messages.
+ # Pass <tt>coder: ActiveSupport::JSON</tt> to decode messages as JSON before passing to the callback.
+ # Defaults to <tt>coder: nil</tt> which does no decoding, passes raw messages.
def stream_from(broadcasting, callback = nil, coder: nil, &block)
broadcasting = String(broadcasting)
@@ -84,7 +84,7 @@ module ActionCable
connection.server.event_loop.post do
pubsub.subscribe(broadcasting, handler, lambda do
- transmit_subscription_confirmation
+ ensure_confirmation_sent
logger.info "#{self.class.name} is streaming from #{broadcasting}"
end)
end
@@ -94,8 +94,8 @@ module ActionCable
# <tt>callback</tt> that'll be used instead of the default of just transmitting the updates straight
# to the subscriber.
#
- # Pass `coder: ActiveSupport::JSON` to decode messages as JSON before passing to the callback.
- # Defaults to `coder: nil` which does no decoding, passes raw messages.
+ # Pass <tt>coder: ActiveSupport::JSON</tt> to decode messages as JSON before passing to the callback.
+ # Defaults to <tt>coder: nil</tt> which does no decoding, passes raw messages.
def stream_for(model, callback = nil, coder: nil, &block)
stream_from(broadcasting_for([ channel_name, model ]), callback || block, coder: coder)
end
diff --git a/actioncable/lib/action_cable/connection.rb b/actioncable/lib/action_cable/connection.rb
index 5f813cf8e0..902efb07e2 100644
--- a/actioncable/lib/action_cable/connection.rb
+++ b/actioncable/lib/action_cable/connection.rb
@@ -8,8 +8,6 @@ module ActionCable
autoload :ClientSocket
autoload :Identification
autoload :InternalChannel
- autoload :FayeClientSocket
- autoload :FayeEventLoop
autoload :MessageBuffer
autoload :Stream
autoload :StreamEventLoop
diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb
index b0615b08a1..06f4f5edd3 100644
--- a/actioncable/lib/action_cable/connection/base.rb
+++ b/actioncable/lib/action_cable/connection/base.rb
@@ -57,7 +57,7 @@ module ActionCable
@worker_pool = server.worker_pool
@logger = new_tagged_logger
- @websocket = ActionCable::Connection::WebSocket.new(env, self, event_loop, server.config.client_socket_class)
+ @websocket = ActionCable::Connection::WebSocket.new(env, self, event_loop)
@subscriptions = ActionCable::Connection::Subscriptions.new(self)
@message_buffer = ActionCable::Connection::MessageBuffer.new(self)
@@ -105,7 +105,7 @@ module ActionCable
worker_pool.async_invoke(self, method, *arguments)
end
- # Return a basic hash of statistics for the connection keyed with `identifier`, `started_at`, and `subscriptions`.
+ # Return a basic hash of statistics for the connection keyed with <tt>identifier</tt>, <tt>started_at</tt>, <tt>subscriptions</tt>, and <tt>request_id</tt>.
# This can be returned by a health check against the connection.
def statistics
{
diff --git a/actioncable/lib/action_cable/connection/client_socket.rb b/actioncable/lib/action_cable/connection/client_socket.rb
index c48f000d9c..70a2bbecb1 100644
--- a/actioncable/lib/action_cable/connection/client_socket.rb
+++ b/actioncable/lib/action_cable/connection/client_socket.rb
@@ -89,7 +89,7 @@ module ActionCable
code ||= 1000
reason ||= ""
- unless code == 1000 or (code >= 3000 and code <= 4999)
+ unless code == 1000 || (code >= 3000 && code <= 4999)
raise ArgumentError, "Failed to execute 'close' on WebSocket: " +
"The code must be either 1000, or between 3000 and 4999. " +
"#{code} is neither."
diff --git a/actioncable/lib/action_cable/connection/faye_client_socket.rb b/actioncable/lib/action_cable/connection/faye_client_socket.rb
deleted file mode 100644
index 06e92c5d52..0000000000
--- a/actioncable/lib/action_cable/connection/faye_client_socket.rb
+++ /dev/null
@@ -1,48 +0,0 @@
-require "faye/websocket"
-
-module ActionCable
- module Connection
- class FayeClientSocket
- def initialize(env, event_target, stream_event_loop, protocols)
- @env = env
- @event_target = event_target
- @protocols = protocols
-
- @faye = nil
- end
-
- def alive?
- @faye && @faye.ready_state == Faye::WebSocket::API::OPEN
- end
-
- def transmit(data)
- connect
- @faye.send data
- end
-
- def close
- @faye && @faye.close
- end
-
- def protocol
- @faye && @faye.protocol
- end
-
- def rack_response
- connect
- @faye.rack_response
- end
-
- private
- def connect
- return if @faye
- @faye = Faye::WebSocket.new(@env, @protocols)
-
- @faye.on(:open) { |event| @event_target.on_open }
- @faye.on(:message) { |event| @event_target.on_message(event.data) }
- @faye.on(:close) { |event| @event_target.on_close(event.reason, event.code) }
- @faye.on(:error) { |event| @event_target.on_error(event.message) }
- end
- end
- end
-end
diff --git a/actioncable/lib/action_cable/connection/faye_event_loop.rb b/actioncable/lib/action_cable/connection/faye_event_loop.rb
deleted file mode 100644
index cfbe26ee6a..0000000000
--- a/actioncable/lib/action_cable/connection/faye_event_loop.rb
+++ /dev/null
@@ -1,44 +0,0 @@
-require "thread"
-
-require "eventmachine"
-EventMachine.epoll if EventMachine.epoll?
-EventMachine.kqueue if EventMachine.kqueue?
-
-module ActionCable
- module Connection
- class FayeEventLoop
- @@mutex = Mutex.new
-
- def timer(interval, &block)
- ensure_reactor_running
- EMTimer.new(::EM::PeriodicTimer.new(interval, &block))
- end
-
- def post(task = nil, &block)
- task ||= block
-
- ensure_reactor_running
- ::EM.next_tick(&task)
- end
-
- private
- def ensure_reactor_running
- return if EventMachine.reactor_running?
- @@mutex.synchronize do
- Thread.new { EventMachine.run } unless EventMachine.reactor_running?
- Thread.pass until EventMachine.reactor_running?
- end
- end
-
- class EMTimer
- def initialize(inner)
- @inner = inner
- end
-
- def shutdown
- @inner.cancel
- end
- end
- end
- end
-end
diff --git a/actioncable/lib/action_cable/connection/stream.rb b/actioncable/lib/action_cable/connection/stream.rb
index 5a2aace0ba..e620b93845 100644
--- a/actioncable/lib/action_cable/connection/stream.rb
+++ b/actioncable/lib/action_cable/connection/stream.rb
@@ -14,6 +14,9 @@ module ActionCable
@rack_hijack_io = nil
@write_lock = Mutex.new
+
+ @write_head = nil
+ @write_buffer = Queue.new
end
def each(&callback)
@@ -30,14 +33,62 @@ module ActionCable
end
def write(data)
- @write_lock.synchronize do
- return @rack_hijack_io.write(data) if @rack_hijack_io
- return @stream_send.call(data) if @stream_send
+ if @stream_send
+ return @stream_send.call(data)
end
+
+ if @write_lock.try_lock
+ begin
+ if @write_head.nil? && @write_buffer.empty?
+ written = @rack_hijack_io.write_nonblock(data, exception: false)
+
+ case written
+ when :wait_writable
+ # proceed below
+ when data.bytesize
+ return data.bytesize
+ else
+ @write_head = data.byteslice(written, data.bytesize)
+ @event_loop.writes_pending @rack_hijack_io
+
+ return data.bytesize
+ end
+ end
+ ensure
+ @write_lock.unlock
+ end
+ end
+
+ @write_buffer << data
+ @event_loop.writes_pending @rack_hijack_io
+
+ data.bytesize
rescue EOFError, Errno::ECONNRESET
@socket_object.client_gone
end
+ def flush_write_buffer
+ @write_lock.synchronize do
+ loop do
+ if @write_head.nil?
+ return true if @write_buffer.empty?
+ @write_head = @write_buffer.pop
+ end
+
+ written = @rack_hijack_io.write_nonblock(@write_head, exception: false)
+ case written
+ when :wait_writable
+ return false
+ when @write_head.bytesize
+ @write_head = nil
+ else
+ @write_head = @write_head.byteslice(written, @write_head.bytesize)
+ return false
+ end
+ end
+ end
+ end
+
def receive(data)
@socket_object.parse(data)
end
@@ -55,7 +106,6 @@ module ActionCable
def clean_rack_hijack
return unless @rack_hijack_io
@event_loop.detach(@rack_hijack_io, self)
- @rack_hijack_io.close
@rack_hijack_io = nil
end
end
diff --git a/actioncable/lib/action_cable/connection/stream_event_loop.rb b/actioncable/lib/action_cable/connection/stream_event_loop.rb
index 106b948c45..2d1af0ff9f 100644
--- a/actioncable/lib/action_cable/connection/stream_event_loop.rb
+++ b/actioncable/lib/action_cable/connection/stream_event_loop.rb
@@ -5,7 +5,7 @@ module ActionCable
module Connection
class StreamEventLoop
def initialize
- @nio = @thread = nil
+ @nio = @executor = @thread = nil
@map = {}
@stopping = false
@todo = Queue.new
@@ -20,13 +20,14 @@ module ActionCable
def post(task = nil, &block)
task ||= block
- Concurrent.global_io_executor << task
+ spawn
+ @executor << task
end
def attach(io, stream)
@todo << lambda do
- @map[io] = stream
- @nio.register(io, :r)
+ @map[io] = @nio.register(io, :r)
+ @map[io].value = stream
end
wakeup
end
@@ -35,6 +36,16 @@ module ActionCable
@todo << lambda do
@nio.deregister io
@map.delete io
+ io.close
+ end
+ wakeup
+ end
+
+ def writes_pending(io)
+ @todo << lambda do
+ if monitor = @map[io]
+ monitor.interests = :rw
+ end
end
wakeup
end
@@ -52,6 +63,13 @@ module ActionCable
return if @thread && @thread.status
@nio ||= NIO::Selector.new
+
+ @executor ||= Concurrent::ThreadPoolExecutor.new(
+ min_threads: 1,
+ max_threads: 10,
+ max_queue: 0,
+ )
+
@thread = Thread.new { run }
return true
@@ -77,12 +95,25 @@ module ActionCable
monitors.each do |monitor|
io = monitor.io
- stream = @map[io]
+ stream = monitor.value
begin
- stream.receive io.read_nonblock(4096)
- rescue IO::WaitReadable
- next
+ if monitor.writable?
+ if stream.flush_write_buffer
+ monitor.interests = :r
+ end
+ next unless monitor.readable?
+ end
+
+ incoming = io.read_nonblock(4096, exception: false)
+ case incoming
+ when :wait_readable
+ next
+ when nil
+ stream.close
+ else
+ stream.receive incoming
+ end
rescue
# We expect one of EOFError or Errno::ECONNRESET in
# normal operation (when the client goes away). But if
diff --git a/actioncable/lib/action_cable/connection/subscriptions.rb b/actioncable/lib/action_cable/connection/subscriptions.rb
index 9060183249..00511aead5 100644
--- a/actioncable/lib/action_cable/connection/subscriptions.rb
+++ b/actioncable/lib/action_cable/connection/subscriptions.rb
@@ -26,10 +26,14 @@ module ActionCable
id_key = data["identifier"]
id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access
+ return if subscriptions.key?(id_key)
+
subscription_klass = id_options[:channel].safe_constantize
if subscription_klass && ActionCable::Channel::Base >= subscription_klass
- subscriptions[id_key] ||= subscription_klass.new(connection, id_key, id_options)
+ subscription = subscription_klass.new(connection, id_key, id_options)
+ subscriptions[id_key] = subscription
+ subscription.subscribe_to_channel
else
logger.error "Subscription class not found: #{id_options[:channel].inspect}"
end
diff --git a/actioncable/lib/action_cable/connection/web_socket.rb b/actioncable/lib/action_cable/connection/web_socket.rb
index 52d8daad4b..382141b89f 100644
--- a/actioncable/lib/action_cable/connection/web_socket.rb
+++ b/actioncable/lib/action_cable/connection/web_socket.rb
@@ -4,8 +4,8 @@ module ActionCable
module Connection
# Wrap the real socket to minimize the externally-presented API
class WebSocket
- def initialize(env, event_target, event_loop, client_socket_class, protocols: ActionCable::INTERNAL[:protocols])
- @websocket = ::WebSocket::Driver.websocket?(env) ? client_socket_class.new(env, event_target, event_loop, protocols) : nil
+ def initialize(env, event_target, event_loop, protocols: ActionCable::INTERNAL[:protocols])
+ @websocket = ::WebSocket::Driver.websocket?(env) ? ClientSocket.new(env, event_target, event_loop, protocols) : nil
end
def possible?
diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb
index c700297a8d..419eccd73c 100644
--- a/actioncable/lib/action_cable/server/base.rb
+++ b/actioncable/lib/action_cable/server/base.rb
@@ -37,9 +37,13 @@ module ActionCable
connections.each(&:close)
@mutex.synchronize do
- worker_pool.halt if @worker_pool
-
+ # Shutdown the worker pool
+ @worker_pool.halt if @worker_pool
@worker_pool = nil
+
+ # Shutdown the pub/sub adapter
+ @pubsub.shutdown if @pubsub
+ @pubsub = nil
end
end
@@ -49,12 +53,12 @@ module ActionCable
end
def event_loop
- @event_loop || @mutex.synchronize { @event_loop ||= config.event_loop_class.new }
+ @event_loop || @mutex.synchronize { @event_loop ||= ActionCable::Connection::StreamEventLoop.new }
end
# The worker pool is where we run connection callbacks and channel actions. We do as little as possible on the server's main thread.
# The worker pool is an executor service that's backed by a pool of threads working from a task queue. The thread pool size maxes out
- # at 4 worker threads by default. Tune the size yourself with `config.action_cable.worker_pool_size`.
+ # at 4 worker threads by default. Tune the size yourself with <tt>config.action_cable.worker_pool_size</tt>.
#
# Using Active Record, Redis, etc within your channel actions means you'll get a separate connection from each thread in the worker pool.
# Plan your deployment accordingly: 5 servers each running 5 Puma workers each running an 8-thread worker pool means at least 200 database
diff --git a/actioncable/lib/action_cable/server/configuration.rb b/actioncable/lib/action_cable/server/configuration.rb
index 7153593d4c..dc146f07b0 100644
--- a/actioncable/lib/action_cable/server/configuration.rb
+++ b/actioncable/lib/action_cable/server/configuration.rb
@@ -4,7 +4,7 @@ module ActionCable
# in a Rails config initializer.
class Configuration
attr_accessor :logger, :log_tags
- attr_accessor :use_faye, :connection_class, :worker_pool_size
+ attr_accessor :connection_class, :worker_pool_size
attr_accessor :disable_request_forgery_protection, :allowed_request_origins
attr_accessor :cable, :url, :mount_path
@@ -35,22 +35,6 @@ module ActionCable
adapter = "PostgreSQL" if adapter == "Postgresql"
"ActionCable::SubscriptionAdapter::#{adapter}".constantize
end
-
- def event_loop_class
- if use_faye
- ActionCable::Connection::FayeEventLoop
- else
- ActionCable::Connection::StreamEventLoop
- end
- end
-
- def client_socket_class
- if use_faye
- ActionCable::Connection::FayeClientSocket
- else
- ActionCable::Connection::ClientSocket
- end
- end
end
end
end
diff --git a/actioncable/lib/action_cable/server/worker.rb b/actioncable/lib/action_cable/server/worker.rb
index 7460472551..43639c27af 100644
--- a/actioncable/lib/action_cable/server/worker.rb
+++ b/actioncable/lib/action_cable/server/worker.rb
@@ -25,7 +25,7 @@ module ActionCable
# Stop processing work: any work that has not already started
# running will be discarded from the queue
def halt
- @executor.kill
+ @executor.shutdown
end
def stopping?
diff --git a/actioncable/test/channel/base_test.rb b/actioncable/test/channel/base_test.rb
index 2bb3214f74..9a3a3581e6 100644
--- a/actioncable/test/channel/base_test.rb
+++ b/actioncable/test/channel/base_test.rb
@@ -77,11 +77,13 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase
@channel = ChatChannel.new @connection, "{id: 1}", id: 1
end
- test "should subscribe to a channel on initialize" do
+ test "should subscribe to a channel" do
+ @channel.subscribe_to_channel
assert_equal 1, @channel.room.id
end
test "on subscribe callbacks" do
+ @channel.subscribe_to_channel
assert @channel.subscribed
end
@@ -90,6 +92,8 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase
end
test "unsubscribing from a channel" do
+ @channel.subscribe_to_channel
+
assert @channel.room
assert @channel.subscribed?
@@ -150,8 +154,13 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase
assert_equal expected, @connection.last_transmission
end
- test "subscription confirmation" do
+ test "do not send subscription confirmation on initialize" do
+ assert_nil @connection.last_transmission
+ end
+
+ test "subscription confirmation on subscribe_to_channel" do
expected = { "identifier" => "{id: 1}", "type" => "confirm_subscription" }
+ @channel.subscribe_to_channel
assert_equal expected, @connection.last_transmission
end
@@ -208,6 +217,8 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase
test "notification for transmit_subscription_confirmation" do
begin
+ @channel.subscribe_to_channel
+
events = []
ActiveSupport::Notifications.subscribe "transmit_subscription_confirmation.action_cable" do |*args|
events << ActiveSupport::Notifications::Event.new(*args)
diff --git a/actioncable/test/channel/periodic_timers_test.rb b/actioncable/test/channel/periodic_timers_test.rb
index 529abb9535..2ee711fd29 100644
--- a/actioncable/test/channel/periodic_timers_test.rb
+++ b/actioncable/test/channel/periodic_timers_test.rb
@@ -62,6 +62,7 @@ class ActionCable::Channel::PeriodicTimersTest < ActiveSupport::TestCase
@connection.server.event_loop.expects(:timer).times(3).returns(stub(shutdown: nil))
channel = ChatChannel.new @connection, "{id: 1}", id: 1
+ channel.subscribe_to_channel
channel.unsubscribe_from_channel
assert_equal [], channel.send(:active_periodic_timers)
end
diff --git a/actioncable/test/channel/rejection_test.rb b/actioncable/test/channel/rejection_test.rb
index faf35ad048..99c4a7603a 100644
--- a/actioncable/test/channel/rejection_test.rb
+++ b/actioncable/test/channel/rejection_test.rb
@@ -20,6 +20,7 @@ class ActionCable::Channel::RejectionTest < ActiveSupport::TestCase
test "subscription rejection" do
@connection.expects(:subscriptions).returns mock().tap { |m| m.expects(:remove_subscription).with instance_of(SecretChannel) }
@channel = SecretChannel.new @connection, "{id: 1}", id: 1
+ @channel.subscribe_to_channel
expected = { "identifier" => "{id: 1}", "type" => "reject_subscription" }
assert_equal expected, @connection.last_transmission
@@ -28,6 +29,7 @@ class ActionCable::Channel::RejectionTest < ActiveSupport::TestCase
test "does not execute action if subscription is rejected" do
@connection.expects(:subscriptions).returns mock().tap { |m| m.expects(:remove_subscription).with instance_of(SecretChannel) }
@channel = SecretChannel.new @connection, "{id: 1}", id: 1
+ @channel.subscribe_to_channel
expected = { "identifier" => "{id: 1}", "type" => "reject_subscription" }
assert_equal expected, @connection.last_transmission
diff --git a/actioncable/test/channel/stream_test.rb b/actioncable/test/channel/stream_test.rb
index da26f81a5c..31dcde2e29 100644
--- a/actioncable/test/channel/stream_test.rb
+++ b/actioncable/test/channel/stream_test.rb
@@ -53,6 +53,7 @@ module ActionCable::StreamTests
connection = TestConnection.new
connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("test_room_1", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) }
channel = ChatChannel.new connection, "{id: 1}", id: 1
+ channel.subscribe_to_channel
connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe) }
channel.unsubscribe_from_channel
@@ -64,6 +65,7 @@ module ActionCable::StreamTests
connection = TestConnection.new
connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("channel", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) }
channel = SymbolChannel.new connection, ""
+ channel.subscribe_to_channel
connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe) }
channel.unsubscribe_from_channel
@@ -76,6 +78,7 @@ module ActionCable::StreamTests
connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:stream_tests:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) }
channel = ChatChannel.new connection, ""
+ channel.subscribe_to_channel
channel.stream_for Room.new(1)
end
end
@@ -84,7 +87,9 @@ module ActionCable::StreamTests
run_in_eventmachine do
connection = TestConnection.new
- ChatChannel.new connection, "{id: 1}", id: 1
+ channel = ChatChannel.new connection, "{id: 1}", id: 1
+ channel.subscribe_to_channel
+
assert_nil connection.last_transmission
wait_for_async
@@ -114,7 +119,7 @@ module ActionCable::StreamTests
end
end
- require "action_cable/subscription_adapter/inline"
+ require "action_cable/subscription_adapter/async"
class UserCallbackChannel < ActionCable::Channel::Base
def subscribed
@@ -124,9 +129,16 @@ module ActionCable::StreamTests
end
end
- class StreamEncodingTest < ActionCable::TestCase
+ class MultiChatChannel < ActionCable::Channel::Base
+ def subscribed
+ stream_from "main_room"
+ stream_from "test_all_rooms"
+ end
+ end
+
+ class StreamFromTest < ActionCable::TestCase
setup do
- @server = TestServer.new(subscription_adapter: ActionCable::SubscriptionAdapter::Inline)
+ @server = TestServer.new(subscription_adapter: ActionCable::SubscriptionAdapter::Async)
@server.config.allowed_request_origins = %w( http://rubyonrails.com )
end
@@ -153,6 +165,17 @@ module ActionCable::StreamTests
end
end
+ test "subscription confirmation should only be sent out once with muptiple stream_from" do
+ run_in_eventmachine do
+ connection = open_connection
+ expected = { "identifier" => { "channel" => MultiChatChannel.name }.to_json, "type" => "confirm_subscription" }
+ connection.websocket.expects(:transmit).with(expected.to_json)
+ receive(connection, command: "subscribe", channel: MultiChatChannel.name, identifiers: {})
+
+ wait_for_async
+ end
+ end
+
private
def subscribe_to(connection, identifiers:)
receive connection, command: "subscribe", identifiers: identifiers
diff --git a/actioncable/test/client_test.rb b/actioncable/test/client_test.rb
index 2e3821828f..db10a7ad16 100644
--- a/actioncable/test/client_test.rb
+++ b/actioncable/test/client_test.rb
@@ -1,13 +1,31 @@
require "test_helper"
require "concurrent"
-require "faye/websocket"
+require "websocket-client-simple"
require "json"
require "active_support/hash_with_indifferent_access"
+####
+# 😷 Warning suppression 😷
+WebSocket::Frame::Handler::Handler03.prepend Module.new {
+ def initialize(*)
+ @application_data_buffer = nil
+ super
+ end
+}
+
+WebSocket::Frame::Data.prepend Module.new {
+ def initialize(*)
+ @masking_key = nil
+ super
+ end
+}
+#
+####
+
class ClientTest < ActionCable::TestCase
- WAIT_WHEN_EXPECTING_EVENT = 8
+ WAIT_WHEN_EXPECTING_EVENT = 2
WAIT_WHEN_NOT_EXPECTING_EVENT = 0.5
class EchoChannel < ActionCable::Channel::Base
@@ -39,20 +57,9 @@ class ClientTest < ActionCable::TestCase
server.config.logger = Logger.new(StringIO.new).tap { |l| l.level = Logger::UNKNOWN }
server.config.cable = ActiveSupport::HashWithIndifferentAccess.new(adapter: "async")
- server.config.use_faye = ENV["FAYE"].present?
# and now the "real" setup for our test:
server.config.disable_request_forgery_protection = true
-
- Thread.new { EventMachine.run } unless EventMachine.reactor_running?
- Thread.pass until EventMachine.reactor_running?
-
- # faye-websocket is warning-rich
- @previous_verbose, $VERBOSE = $VERBOSE, nil
- end
-
- def teardown
- $VERBOSE = @previous_verbose
end
def with_puma_server(rack_app = ActionCable.server, port = 3099)
@@ -73,44 +80,49 @@ class ClientTest < ActionCable::TestCase
attr_reader :pings
def initialize(port)
- @ws = Faye::WebSocket::Client.new("ws://127.0.0.1:#{port}/")
- @messages = Queue.new
- @closed = Concurrent::Event.new
- @has_messages = Concurrent::Semaphore.new(0)
- @pings = 0
-
- open = Concurrent::Event.new
- error = nil
-
- @ws.on(:error) do |event|
- if open.set?
- @messages << RuntimeError.new(event.message)
- else
- error = event.message
- open.set
+ messages = @messages = Queue.new
+ closed = @closed = Concurrent::Event.new
+ has_messages = @has_messages = Concurrent::Semaphore.new(0)
+ pings = @pings = Concurrent::AtomicFixnum.new(0)
+
+ open = Concurrent::Promise.new
+
+ @ws = WebSocket::Client::Simple.connect("ws://127.0.0.1:#{port}/") do |ws|
+ ws.on(:error) do |event|
+ event = RuntimeError.new(event.message) unless event.is_a?(Exception)
+
+ if open.pending?
+ open.fail(event)
+ else
+ messages << event
+ has_messages.release
+ end
end
- end
- @ws.on(:open) do |event|
- open.set
- end
+ ws.on(:open) do |event|
+ open.set(true)
+ end
- @ws.on(:message) do |event|
- message = JSON.parse(event.data)
- if message["type"] == "ping"
- @pings += 1
- else
- @messages << message
- @has_messages.release
+ ws.on(:message) do |event|
+ if event.type == :close
+ closed.set
+ else
+ message = JSON.parse(event.data)
+ if message["type"] == "ping"
+ pings.increment
+ else
+ messages << message
+ has_messages.release
+ end
+ end
end
- end
- @ws.on(:close) do |event|
- @closed.set
+ ws.on(:close) do |event|
+ closed.set
+ end
end
- open.wait(WAIT_WHEN_EXPECTING_EVENT)
- raise error if error
+ open.wait!(WAIT_WHEN_EXPECTING_EVENT)
end
def read_message
@@ -161,13 +173,17 @@ class ClientTest < ActionCable::TestCase
end
end
- def faye_client(port)
+ def websocket_client(port)
SyncClient.new(port)
end
+ def concurrently(enum)
+ enum.map { |*x| Concurrent::Future.execute { yield(*x) } }.map(&:value!)
+ end
+
def test_single_client
with_puma_server do |port|
- c = faye_client(port)
+ c = websocket_client(port)
assert_equal({ "type" => "welcome" }, c.read_message) # pop the first welcome message off the stack
c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel")
assert_equal({ "identifier"=>"{\"channel\":\"ClientTest::EchoChannel\"}", "type"=>"confirm_subscription" }, c.read_message)
@@ -179,12 +195,12 @@ class ClientTest < ActionCable::TestCase
def test_interacting_clients
with_puma_server do |port|
- clients = 10.times.map { faye_client(port) }
+ clients = concurrently(10.times) { websocket_client(port) }
barrier_1 = Concurrent::CyclicBarrier.new(clients.size)
barrier_2 = Concurrent::CyclicBarrier.new(clients.size)
- clients.map { |c| Concurrent::Future.execute {
+ concurrently(clients) do |c|
assert_equal({ "type" => "welcome" }, c.read_message) # pop the first welcome message off the stack
c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel")
assert_equal({ "identifier"=>'{"channel":"ClientTest::EchoChannel"}', "type"=>"confirm_subscription" }, c.read_message)
@@ -194,38 +210,38 @@ class ClientTest < ActionCable::TestCase
c.send_message command: "message", identifier: JSON.generate(channel: "ClientTest::EchoChannel"), data: JSON.generate(action: "bulk", message: "hello")
barrier_2.wait WAIT_WHEN_EXPECTING_EVENT
assert_equal clients.size, c.read_messages(clients.size).size
- } }.each(&:wait!)
+ end
- clients.map { |c| Concurrent::Future.execute { c.close } }.each(&:wait!)
+ concurrently(clients, &:close)
end
end
def test_many_clients
with_puma_server do |port|
- clients = 100.times.map { faye_client(port) }
+ clients = concurrently(100.times) { websocket_client(port) }
- clients.map { |c| Concurrent::Future.execute {
+ concurrently(clients) do |c|
assert_equal({ "type" => "welcome" }, c.read_message) # pop the first welcome message off the stack
c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel")
assert_equal({ "identifier"=>'{"channel":"ClientTest::EchoChannel"}', "type"=>"confirm_subscription" }, c.read_message)
c.send_message command: "message", identifier: JSON.generate(channel: "ClientTest::EchoChannel"), data: JSON.generate(action: "ding", message: "hello")
assert_equal({ "identifier"=>'{"channel":"ClientTest::EchoChannel"}', "message"=>{ "dong"=>"hello" } }, c.read_message)
- } }.each(&:wait!)
+ end
- clients.map { |c| Concurrent::Future.execute { c.close } }.each(&:wait!)
+ concurrently(clients, &:close)
end
end
def test_disappearing_client
with_puma_server do |port|
- c = faye_client(port)
+ c = websocket_client(port)
assert_equal({ "type" => "welcome" }, c.read_message) # pop the first welcome message off the stack
c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel")
assert_equal({ "identifier"=>"{\"channel\":\"ClientTest::EchoChannel\"}", "type"=>"confirm_subscription" }, c.read_message)
c.send_message command: "message", identifier: JSON.generate(channel: "ClientTest::EchoChannel"), data: JSON.generate(action: "delay", message: "hello")
c.close # disappear before write
- c = faye_client(port)
+ c = websocket_client(port)
assert_equal({ "type" => "welcome" }, c.read_message) # pop the first welcome message off the stack
c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel")
assert_equal({ "identifier"=>"{\"channel\":\"ClientTest::EchoChannel\"}", "type"=>"confirm_subscription" }, c.read_message)
@@ -240,7 +256,7 @@ class ClientTest < ActionCable::TestCase
app = ActionCable.server
identifier = JSON.generate(channel: "ClientTest::EchoChannel")
- c = faye_client(port)
+ c = websocket_client(port)
assert_equal({ "type" => "welcome" }, c.read_message)
c.send_message command: "subscribe", identifier: identifier
assert_equal({ "identifier"=>"{\"channel\":\"ClientTest::EchoChannel\"}", "type"=>"confirm_subscription" }, c.read_message)
@@ -261,7 +277,7 @@ class ClientTest < ActionCable::TestCase
def test_server_restart
with_puma_server do |port|
- c = faye_client(port)
+ c = websocket_client(port)
assert_equal({ "type" => "welcome" }, c.read_message)
c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel")
assert_equal({ "identifier"=>"{\"channel\":\"ClientTest::EchoChannel\"}", "type"=>"confirm_subscription" }, c.read_message)
diff --git a/actioncable/test/connection/client_socket_test.rb b/actioncable/test/connection/client_socket_test.rb
index 5043a63370..bc3ff6a3d7 100644
--- a/actioncable/test/connection/client_socket_test.rb
+++ b/actioncable/test/connection/client_socket_test.rb
@@ -33,8 +33,6 @@ class ActionCable::Connection::ClientSocketTest < ActionCable::TestCase
end
test "delegate socket errors to on_error handler" do
- skip if ENV["FAYE"].present?
-
run_in_eventmachine do
connection = open_connection
@@ -49,16 +47,16 @@ class ActionCable::Connection::ClientSocketTest < ActionCable::TestCase
end
test "closes hijacked i/o socket at shutdown" do
- skip if ENV["FAYE"].present?
-
run_in_eventmachine do
connection = open_connection
client = connection.websocket.send(:websocket)
+ event = Concurrent::Event.new
client.instance_variable_get("@stream")
.instance_variable_get("@rack_hijack_io")
- .expects(:close)
+ .define_singleton_method(:close) { event.set }
connection.close
+ event.wait
end
end
@@ -67,7 +65,13 @@ class ActionCable::Connection::ClientSocketTest < ActionCable::TestCase
env = Rack::MockRequest.env_for "/test",
"HTTP_CONNECTION" => "upgrade", "HTTP_UPGRADE" => "websocket",
"HTTP_HOST" => "localhost", "HTTP_ORIGIN" => "http://rubyonrails.com"
- env["rack.hijack"] = -> { env["rack.hijack_io"] = StringIO.new }
+ io = \
+ begin
+ Socket.pair(Socket::AF_UNIX, Socket::SOCK_STREAM, 0).first
+ rescue
+ StringIO.new
+ end
+ env["rack.hijack"] = -> { env["rack.hijack_io"] = io }
Connection.new(@server, env).tap do |connection|
connection.process
diff --git a/actioncable/test/connection/stream_test.rb b/actioncable/test/connection/stream_test.rb
index 4128b32f15..36e1d3c095 100644
--- a/actioncable/test/connection/stream_test.rb
+++ b/actioncable/test/connection/stream_test.rb
@@ -34,8 +34,6 @@ class ActionCable::Connection::StreamTest < ActionCable::TestCase
[ EOFError, Errno::ECONNRESET ].each do |closed_exception|
test "closes socket on #{closed_exception}" do
- skip if ENV["FAYE"].present?
-
run_in_eventmachine do
connection = open_connection
diff --git a/actioncable/test/server/base_test.rb b/actioncable/test/server/base_test.rb
new file mode 100644
index 0000000000..f0a51c5a7d
--- /dev/null
+++ b/actioncable/test/server/base_test.rb
@@ -0,0 +1,33 @@
+require "test_helper"
+require "stubs/test_server"
+require "active_support/core_ext/hash/indifferent_access"
+
+class BaseTest < ActiveSupport::TestCase
+ def setup
+ @server = ActionCable::Server::Base.new
+ @server.config.cable = { adapter: "async" }.with_indifferent_access
+ end
+
+ class FakeConnection
+ def close
+ end
+ end
+
+ test "#restart closes all open connections" do
+ conn = FakeConnection.new
+ @server.add_connection(conn)
+
+ conn.expects(:close)
+ @server.restart
+ end
+
+ test "#restart shuts down worker pool" do
+ @server.worker_pool.expects(:halt)
+ @server.restart
+ end
+
+ test "#restart shuts down pub/sub adapter" do
+ @server.pubsub.expects(:shutdown)
+ @server.restart
+ end
+end
diff --git a/actioncable/test/stubs/test_server.rb b/actioncable/test/stubs/test_server.rb
index b64ff33789..5bf2a151dc 100644
--- a/actioncable/test/stubs/test_server.rb
+++ b/actioncable/test/stubs/test_server.rb
@@ -10,12 +10,6 @@ class TestServer
@logger = ActiveSupport::TaggedLogging.new ActiveSupport::Logger.new(StringIO.new)
@config = OpenStruct.new(log_tags: [], subscription_adapter: subscription_adapter)
- @config.use_faye = ENV["FAYE"].present?
- @config.client_socket_class = if @config.use_faye
- ActionCable::Connection::FayeClientSocket
- else
- ActionCable::Connection::ClientSocket
- end
@mutex = Monitor.new
end
@@ -25,10 +19,8 @@ class TestServer
end
def event_loop
- @event_loop ||= if @config.use_faye
- ActionCable::Connection::FayeEventLoop.new
- else
- ActionCable::Connection::StreamEventLoop.new
+ @event_loop ||= ActionCable::Connection::StreamEventLoop.new.tap do |loop|
+ loop.instance_variable_set(:@executor, Concurrent.global_io_executor)
end
end
diff --git a/actioncable/test/subscription_adapter/common.rb b/actioncable/test/subscription_adapter/common.rb
index 1538157995..3aa88c2caa 100644
--- a/actioncable/test/subscription_adapter/common.rb
+++ b/actioncable/test/subscription_adapter/common.rb
@@ -11,7 +11,7 @@ module CommonSubscriptionAdapterTest
def setup
server = ActionCable::Server::Base.new
server.config.cable = cable_config.with_indifferent_access
- server.config.use_faye = ENV["FAYE"].present?
+ server.config.logger = Logger.new(StringIO.new).tap { |l| l.level = Logger::UNKNOWN }
adapter_klass = server.config.pubsub_adapter
@@ -20,7 +20,7 @@ module CommonSubscriptionAdapterTest
end
def teardown
- [@rx_adapter, @tx_adapter].uniq.each(&:shutdown)
+ [@rx_adapter, @tx_adapter].uniq.compact.each(&:shutdown)
end
def subscribe_as_queue(channel, adapter = @rx_adapter)
diff --git a/actioncable/test/subscription_adapter/evented_redis_test.rb b/actioncable/test/subscription_adapter/evented_redis_test.rb
index d6ca0e77cb..f316bc46ef 100644
--- a/actioncable/test/subscription_adapter/evented_redis_test.rb
+++ b/actioncable/test/subscription_adapter/evented_redis_test.rb
@@ -12,6 +12,14 @@ class EventedRedisAdapterTest < ActionCable::TestCase
end
def teardown
+ super
+
+ # Ensure EM is shut down before we re-enable warnings
+ EventMachine.reactor_thread.tap do |thread|
+ EventMachine.stop
+ thread.join
+ end
+
$VERBOSE = @previous_verbose
end
diff --git a/actioncable/test/test_helper.rb b/actioncable/test/test_helper.rb
index 39855ea252..a47032753b 100644
--- a/actioncable/test/test_helper.rb
+++ b/actioncable/test/test_helper.rb
@@ -13,41 +13,7 @@ end
# Require all the stubs and models
Dir[File.dirname(__FILE__) + "/stubs/*.rb"].each { |file| require file }
-if ENV["FAYE"].present?
- require "faye/websocket"
- class << Faye::WebSocket
- remove_method :ensure_reactor_running
-
- # We don't want Faye to start the EM reactor in tests because it makes testing much harder.
- # We want to be able to start and stop EM loop in tests to make things simpler.
- def ensure_reactor_running
- # no-op
- end
- end
-end
-
-module EventMachineConcurrencyHelpers
- def wait_for_async
- EM.run_deferred_callbacks
- end
-
- def run_in_eventmachine
- failure = nil
- EM.run do
- begin
- yield
- rescue => ex
- failure = ex
- ensure
- wait_for_async
- EM.stop if EM.reactor_running?
- end
- end
- raise failure if failure
- end
-end
-
-module ConcurrentRubyConcurrencyHelpers
+class ActionCable::TestCase < ActiveSupport::TestCase
def wait_for_async
wait_for_executor Concurrent.global_io_executor
end
@@ -56,18 +22,14 @@ module ConcurrentRubyConcurrencyHelpers
yield
wait_for_async
end
-end
-
-class ActionCable::TestCase < ActiveSupport::TestCase
- if ENV["FAYE"].present?
- include EventMachineConcurrencyHelpers
- else
- include ConcurrentRubyConcurrencyHelpers
- end
def wait_for_executor(executor)
+ # do not wait forever, wait 2s
+ timeout = 2
until executor.completed_task_count == executor.scheduled_task_count
sleep 0.1
+ timeout -= 0.1
+ raise "Executor could not complete all tasks in 2 seconds" unless timeout > 0
end
end
end