diff options
35 files changed, 493 insertions, 376 deletions
diff --git a/.travis.yml b/.travis.yml index f01b58ecb3..8bebd3f484 100644 --- a/.travis.yml +++ b/.travis.yml @@ -36,9 +36,7 @@ env: - "GEM=railties" - "GEM=ap" - "GEM=ac" - - "GEM=ac FAYE=1" - "GEM=ac:integration" - - "GEM=ac:integration FAYE=1" - "GEM=am,amo,as,av,aj" - "GEM=as PRESERVE_TIMEZONES=1" - "GEM=ar:mysql2" @@ -69,7 +67,6 @@ matrix: - rvm: ruby-head - rvm: jruby-9.0.5.0 - env: "GEM=ac:integration" - - env: "GEM=ac:integration FAYE=1" fast_finish: true notifications: @@ -23,9 +23,6 @@ task default: %w(test test:isolated) FRAMEWORKS.each do |project| system(%(cd #{project} && #{$0} #{task_name} --trace)) || errors << project end - if task_name =~ /test/ - system(%(cd actioncable && env FAYE=1 #{$0} #{task_name} --trace)) || errors << "actioncable-faye" - end fail("Errors in #{errors.join(', ')}") unless errors.empty? end end @@ -36,7 +33,6 @@ task :smoke do system %(cd #{project} && #{$0} test:isolated --trace) end system %(cd activerecord && #{$0} sqlite3:isolated_test --trace) - system %(cd actioncable && env FAYE=1 #{$0} test:isolated --trace) end desc "Install gems for all projects." diff --git a/actioncable/CHANGELOG.md b/actioncable/CHANGELOG.md index dec6f7c027..137c88d91b 100644 --- a/actioncable/CHANGELOG.md +++ b/actioncable/CHANGELOG.md @@ -1,3 +1,16 @@ +* Prevent race where the client could receive and act upon a + subscription confirmation before the channel's `subscribed` method + completed. + + Fixes #25381. + + *Vladimir Dementyev* + +* Buffer writes to websocket connections, to avoid blocking threads + that could be doing more useful things. + + *Matthew Draper*, *Tinco Andringa* + * Protect against concurrent writes to a websocket connection from multiple threads; the underlying OS write is not always threadsafe. diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb index 2e589a2cfa..a866044f95 100644 --- a/actioncable/lib/action_cable/channel/base.rb +++ b/actioncable/lib/action_cable/channel/base.rb @@ -144,13 +144,14 @@ module ActionCable # When a channel is streaming via pubsub, we want to delay the confirmation # transmission until pubsub subscription is confirmed. - @defer_subscription_confirmation = false + # + # The counter starts at 1 because it's awaiting a call to #subscribe_to_channel + @defer_subscription_confirmation_counter = Concurrent::AtomicFixnum.new(1) @reject_subscription = nil @subscription_confirmation_sent = nil delegate_connection_identifiers - subscribe_to_channel end # Extract the action name from the passed data and process it via the channel. The process will ensure @@ -169,6 +170,17 @@ module ActionCable end end + # This method is called after subscription has been added to the connection + # and confirms or rejects the subscription. + def subscribe_to_channel + run_callbacks :subscribe do + subscribed + end + + reject_subscription if subscription_rejected? + ensure_confirmation_sent + end + # Called by the cable connection when it's cut, so the channel has a chance to cleanup with callbacks. # This method is not intended to be called directly by the user. Instead, overwrite the #unsubscribed callback. def unsubscribe_from_channel # :nodoc: @@ -201,12 +213,18 @@ module ActionCable end end + def ensure_confirmation_sent + return if subscription_rejected? + @defer_subscription_confirmation_counter.decrement + transmit_subscription_confirmation unless defer_subscription_confirmation? + end + def defer_subscription_confirmation! - @defer_subscription_confirmation = true + @defer_subscription_confirmation_counter.increment end def defer_subscription_confirmation? - @defer_subscription_confirmation + @defer_subscription_confirmation_counter.value > 0 end def subscription_confirmation_sent? @@ -230,18 +248,6 @@ module ActionCable end end - def subscribe_to_channel - run_callbacks :subscribe do - subscribed - end - - if subscription_rejected? - reject_subscription - else - transmit_subscription_confirmation unless defer_subscription_confirmation? - end - end - def extract_action(data) (data["action"].presence || :receive).to_sym end diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index 13deb62662..dbba333353 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -84,7 +84,7 @@ module ActionCable connection.server.event_loop.post do pubsub.subscribe(broadcasting, handler, lambda do - transmit_subscription_confirmation + ensure_confirmation_sent logger.info "#{self.class.name} is streaming from #{broadcasting}" end) end diff --git a/actioncable/lib/action_cable/connection.rb b/actioncable/lib/action_cable/connection.rb index 5f813cf8e0..902efb07e2 100644 --- a/actioncable/lib/action_cable/connection.rb +++ b/actioncable/lib/action_cable/connection.rb @@ -8,8 +8,6 @@ module ActionCable autoload :ClientSocket autoload :Identification autoload :InternalChannel - autoload :FayeClientSocket - autoload :FayeEventLoop autoload :MessageBuffer autoload :Stream autoload :StreamEventLoop diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb index 4c7fcc1434..06f4f5edd3 100644 --- a/actioncable/lib/action_cable/connection/base.rb +++ b/actioncable/lib/action_cable/connection/base.rb @@ -57,7 +57,7 @@ module ActionCable @worker_pool = server.worker_pool @logger = new_tagged_logger - @websocket = ActionCable::Connection::WebSocket.new(env, self, event_loop, server.config.client_socket_class) + @websocket = ActionCable::Connection::WebSocket.new(env, self, event_loop) @subscriptions = ActionCable::Connection::Subscriptions.new(self) @message_buffer = ActionCable::Connection::MessageBuffer.new(self) diff --git a/actioncable/lib/action_cable/connection/faye_client_socket.rb b/actioncable/lib/action_cable/connection/faye_client_socket.rb deleted file mode 100644 index 06e92c5d52..0000000000 --- a/actioncable/lib/action_cable/connection/faye_client_socket.rb +++ /dev/null @@ -1,48 +0,0 @@ -require "faye/websocket" - -module ActionCable - module Connection - class FayeClientSocket - def initialize(env, event_target, stream_event_loop, protocols) - @env = env - @event_target = event_target - @protocols = protocols - - @faye = nil - end - - def alive? - @faye && @faye.ready_state == Faye::WebSocket::API::OPEN - end - - def transmit(data) - connect - @faye.send data - end - - def close - @faye && @faye.close - end - - def protocol - @faye && @faye.protocol - end - - def rack_response - connect - @faye.rack_response - end - - private - def connect - return if @faye - @faye = Faye::WebSocket.new(@env, @protocols) - - @faye.on(:open) { |event| @event_target.on_open } - @faye.on(:message) { |event| @event_target.on_message(event.data) } - @faye.on(:close) { |event| @event_target.on_close(event.reason, event.code) } - @faye.on(:error) { |event| @event_target.on_error(event.message) } - end - end - end -end diff --git a/actioncable/lib/action_cable/connection/faye_event_loop.rb b/actioncable/lib/action_cable/connection/faye_event_loop.rb deleted file mode 100644 index cfbe26ee6a..0000000000 --- a/actioncable/lib/action_cable/connection/faye_event_loop.rb +++ /dev/null @@ -1,44 +0,0 @@ -require "thread" - -require "eventmachine" -EventMachine.epoll if EventMachine.epoll? -EventMachine.kqueue if EventMachine.kqueue? - -module ActionCable - module Connection - class FayeEventLoop - @@mutex = Mutex.new - - def timer(interval, &block) - ensure_reactor_running - EMTimer.new(::EM::PeriodicTimer.new(interval, &block)) - end - - def post(task = nil, &block) - task ||= block - - ensure_reactor_running - ::EM.next_tick(&task) - end - - private - def ensure_reactor_running - return if EventMachine.reactor_running? - @@mutex.synchronize do - Thread.new { EventMachine.run } unless EventMachine.reactor_running? - Thread.pass until EventMachine.reactor_running? - end - end - - class EMTimer - def initialize(inner) - @inner = inner - end - - def shutdown - @inner.cancel - end - end - end - end -end diff --git a/actioncable/lib/action_cable/connection/stream.rb b/actioncable/lib/action_cable/connection/stream.rb index 5a2aace0ba..d66e1b4e41 100644 --- a/actioncable/lib/action_cable/connection/stream.rb +++ b/actioncable/lib/action_cable/connection/stream.rb @@ -14,6 +14,9 @@ module ActionCable @rack_hijack_io = nil @write_lock = Mutex.new + + @write_head = nil + @write_buffer = Queue.new end def each(&callback) @@ -30,14 +33,62 @@ module ActionCable end def write(data) - @write_lock.synchronize do - return @rack_hijack_io.write(data) if @rack_hijack_io - return @stream_send.call(data) if @stream_send + if @stream_send + return @stream_send.call(data) end + + if @write_lock.try_lock + begin + if @write_head.nil? && @write_buffer.empty? + written = @rack_hijack_io.write_nonblock(data, exception: false) + + case written + when :wait_writable + # proceed below + when data.bytesize + return data.bytesize + else + @write_head = data.byteslice(written, data.bytesize) + @event_loop.writes_pending @rack_hijack_io + + return data.bytesize + end + end + ensure + @write_lock.unlock + end + end + + @write_buffer << data + @event_loop.writes_pending @rack_hijack_io + + data.bytesize rescue EOFError, Errno::ECONNRESET @socket_object.client_gone end + def flush_write_buffer + @write_lock.synchronize do + loop do + if @write_head.nil? + return true if @write_buffer.empty? + @write_head = @write_buffer.pop + end + + written = @rack_hijack_io.write_nonblock(@write_head, exception: false) + case written + when :wait_writable + return false + when @write_head.bytesize + @write_head = nil + else + @write_head = @write_head.byteslice(written, @write_head.bytesize) + return false + end + end + end + end + def receive(data) @socket_object.parse(data) end diff --git a/actioncable/lib/action_cable/connection/stream_event_loop.rb b/actioncable/lib/action_cable/connection/stream_event_loop.rb index 106b948c45..eec24638b6 100644 --- a/actioncable/lib/action_cable/connection/stream_event_loop.rb +++ b/actioncable/lib/action_cable/connection/stream_event_loop.rb @@ -5,7 +5,7 @@ module ActionCable module Connection class StreamEventLoop def initialize - @nio = @thread = nil + @nio = @executor = @thread = nil @map = {} @stopping = false @todo = Queue.new @@ -20,13 +20,14 @@ module ActionCable def post(task = nil, &block) task ||= block - Concurrent.global_io_executor << task + spawn + @executor << task end def attach(io, stream) @todo << lambda do - @map[io] = stream - @nio.register(io, :r) + @map[io] = @nio.register(io, :r) + @map[io].value = stream end wakeup end @@ -39,6 +40,15 @@ module ActionCable wakeup end + def writes_pending(io) + @todo << lambda do + if monitor = @map[io] + monitor.interests = :rw + end + end + wakeup + end + def stop @stopping = true wakeup if @nio @@ -52,6 +62,13 @@ module ActionCable return if @thread && @thread.status @nio ||= NIO::Selector.new + + @executor ||= Concurrent::ThreadPoolExecutor.new( + min_threads: 1, + max_threads: 10, + max_queue: 0, + ) + @thread = Thread.new { run } return true @@ -77,12 +94,25 @@ module ActionCable monitors.each do |monitor| io = monitor.io - stream = @map[io] + stream = monitor.value begin - stream.receive io.read_nonblock(4096) - rescue IO::WaitReadable - next + if monitor.writable? + if stream.flush_write_buffer + monitor.interests = :r + end + next unless monitor.readable? + end + + incoming = io.read_nonblock(4096, exception: false) + case incoming + when :wait_readable + next + when nil + stream.close + else + stream.receive incoming + end rescue # We expect one of EOFError or Errno::ECONNRESET in # normal operation (when the client goes away). But if diff --git a/actioncable/lib/action_cable/connection/subscriptions.rb b/actioncable/lib/action_cable/connection/subscriptions.rb index 9060183249..00511aead5 100644 --- a/actioncable/lib/action_cable/connection/subscriptions.rb +++ b/actioncable/lib/action_cable/connection/subscriptions.rb @@ -26,10 +26,14 @@ module ActionCable id_key = data["identifier"] id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access + return if subscriptions.key?(id_key) + subscription_klass = id_options[:channel].safe_constantize if subscription_klass && ActionCable::Channel::Base >= subscription_klass - subscriptions[id_key] ||= subscription_klass.new(connection, id_key, id_options) + subscription = subscription_klass.new(connection, id_key, id_options) + subscriptions[id_key] = subscription + subscription.subscribe_to_channel else logger.error "Subscription class not found: #{id_options[:channel].inspect}" end diff --git a/actioncable/lib/action_cable/connection/web_socket.rb b/actioncable/lib/action_cable/connection/web_socket.rb index 52d8daad4b..382141b89f 100644 --- a/actioncable/lib/action_cable/connection/web_socket.rb +++ b/actioncable/lib/action_cable/connection/web_socket.rb @@ -4,8 +4,8 @@ module ActionCable module Connection # Wrap the real socket to minimize the externally-presented API class WebSocket - def initialize(env, event_target, event_loop, client_socket_class, protocols: ActionCable::INTERNAL[:protocols]) - @websocket = ::WebSocket::Driver.websocket?(env) ? client_socket_class.new(env, event_target, event_loop, protocols) : nil + def initialize(env, event_target, event_loop, protocols: ActionCable::INTERNAL[:protocols]) + @websocket = ::WebSocket::Driver.websocket?(env) ? ClientSocket.new(env, event_target, event_loop, protocols) : nil end def possible? diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb index dd059a553b..67ada7cc2e 100644 --- a/actioncable/lib/action_cable/server/base.rb +++ b/actioncable/lib/action_cable/server/base.rb @@ -49,7 +49,7 @@ module ActionCable end def event_loop - @event_loop || @mutex.synchronize { @event_loop ||= config.event_loop_class.new } + @event_loop || @mutex.synchronize { @event_loop ||= ActionCable::Connection::StreamEventLoop.new } end # The worker pool is where we run connection callbacks and channel actions. We do as little as possible on the server's main thread. diff --git a/actioncable/lib/action_cable/server/configuration.rb b/actioncable/lib/action_cable/server/configuration.rb index 7153593d4c..dc146f07b0 100644 --- a/actioncable/lib/action_cable/server/configuration.rb +++ b/actioncable/lib/action_cable/server/configuration.rb @@ -4,7 +4,7 @@ module ActionCable # in a Rails config initializer. class Configuration attr_accessor :logger, :log_tags - attr_accessor :use_faye, :connection_class, :worker_pool_size + attr_accessor :connection_class, :worker_pool_size attr_accessor :disable_request_forgery_protection, :allowed_request_origins attr_accessor :cable, :url, :mount_path @@ -35,22 +35,6 @@ module ActionCable adapter = "PostgreSQL" if adapter == "Postgresql" "ActionCable::SubscriptionAdapter::#{adapter}".constantize end - - def event_loop_class - if use_faye - ActionCable::Connection::FayeEventLoop - else - ActionCable::Connection::StreamEventLoop - end - end - - def client_socket_class - if use_faye - ActionCable::Connection::FayeClientSocket - else - ActionCable::Connection::ClientSocket - end - end end end end diff --git a/actioncable/test/channel/base_test.rb b/actioncable/test/channel/base_test.rb index 2bb3214f74..9a3a3581e6 100644 --- a/actioncable/test/channel/base_test.rb +++ b/actioncable/test/channel/base_test.rb @@ -77,11 +77,13 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase @channel = ChatChannel.new @connection, "{id: 1}", id: 1 end - test "should subscribe to a channel on initialize" do + test "should subscribe to a channel" do + @channel.subscribe_to_channel assert_equal 1, @channel.room.id end test "on subscribe callbacks" do + @channel.subscribe_to_channel assert @channel.subscribed end @@ -90,6 +92,8 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase end test "unsubscribing from a channel" do + @channel.subscribe_to_channel + assert @channel.room assert @channel.subscribed? @@ -150,8 +154,13 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase assert_equal expected, @connection.last_transmission end - test "subscription confirmation" do + test "do not send subscription confirmation on initialize" do + assert_nil @connection.last_transmission + end + + test "subscription confirmation on subscribe_to_channel" do expected = { "identifier" => "{id: 1}", "type" => "confirm_subscription" } + @channel.subscribe_to_channel assert_equal expected, @connection.last_transmission end @@ -208,6 +217,8 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase test "notification for transmit_subscription_confirmation" do begin + @channel.subscribe_to_channel + events = [] ActiveSupport::Notifications.subscribe "transmit_subscription_confirmation.action_cable" do |*args| events << ActiveSupport::Notifications::Event.new(*args) diff --git a/actioncable/test/channel/periodic_timers_test.rb b/actioncable/test/channel/periodic_timers_test.rb index 529abb9535..2ee711fd29 100644 --- a/actioncable/test/channel/periodic_timers_test.rb +++ b/actioncable/test/channel/periodic_timers_test.rb @@ -62,6 +62,7 @@ class ActionCable::Channel::PeriodicTimersTest < ActiveSupport::TestCase @connection.server.event_loop.expects(:timer).times(3).returns(stub(shutdown: nil)) channel = ChatChannel.new @connection, "{id: 1}", id: 1 + channel.subscribe_to_channel channel.unsubscribe_from_channel assert_equal [], channel.send(:active_periodic_timers) end diff --git a/actioncable/test/channel/rejection_test.rb b/actioncable/test/channel/rejection_test.rb index faf35ad048..99c4a7603a 100644 --- a/actioncable/test/channel/rejection_test.rb +++ b/actioncable/test/channel/rejection_test.rb @@ -20,6 +20,7 @@ class ActionCable::Channel::RejectionTest < ActiveSupport::TestCase test "subscription rejection" do @connection.expects(:subscriptions).returns mock().tap { |m| m.expects(:remove_subscription).with instance_of(SecretChannel) } @channel = SecretChannel.new @connection, "{id: 1}", id: 1 + @channel.subscribe_to_channel expected = { "identifier" => "{id: 1}", "type" => "reject_subscription" } assert_equal expected, @connection.last_transmission @@ -28,6 +29,7 @@ class ActionCable::Channel::RejectionTest < ActiveSupport::TestCase test "does not execute action if subscription is rejected" do @connection.expects(:subscriptions).returns mock().tap { |m| m.expects(:remove_subscription).with instance_of(SecretChannel) } @channel = SecretChannel.new @connection, "{id: 1}", id: 1 + @channel.subscribe_to_channel expected = { "identifier" => "{id: 1}", "type" => "reject_subscription" } assert_equal expected, @connection.last_transmission diff --git a/actioncable/test/channel/stream_test.rb b/actioncable/test/channel/stream_test.rb index da26f81a5c..31dcde2e29 100644 --- a/actioncable/test/channel/stream_test.rb +++ b/actioncable/test/channel/stream_test.rb @@ -53,6 +53,7 @@ module ActionCable::StreamTests connection = TestConnection.new connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("test_room_1", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } channel = ChatChannel.new connection, "{id: 1}", id: 1 + channel.subscribe_to_channel connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe) } channel.unsubscribe_from_channel @@ -64,6 +65,7 @@ module ActionCable::StreamTests connection = TestConnection.new connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("channel", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } channel = SymbolChannel.new connection, "" + channel.subscribe_to_channel connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe) } channel.unsubscribe_from_channel @@ -76,6 +78,7 @@ module ActionCable::StreamTests connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:stream_tests:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } channel = ChatChannel.new connection, "" + channel.subscribe_to_channel channel.stream_for Room.new(1) end end @@ -84,7 +87,9 @@ module ActionCable::StreamTests run_in_eventmachine do connection = TestConnection.new - ChatChannel.new connection, "{id: 1}", id: 1 + channel = ChatChannel.new connection, "{id: 1}", id: 1 + channel.subscribe_to_channel + assert_nil connection.last_transmission wait_for_async @@ -114,7 +119,7 @@ module ActionCable::StreamTests end end - require "action_cable/subscription_adapter/inline" + require "action_cable/subscription_adapter/async" class UserCallbackChannel < ActionCable::Channel::Base def subscribed @@ -124,9 +129,16 @@ module ActionCable::StreamTests end end - class StreamEncodingTest < ActionCable::TestCase + class MultiChatChannel < ActionCable::Channel::Base + def subscribed + stream_from "main_room" + stream_from "test_all_rooms" + end + end + + class StreamFromTest < ActionCable::TestCase setup do - @server = TestServer.new(subscription_adapter: ActionCable::SubscriptionAdapter::Inline) + @server = TestServer.new(subscription_adapter: ActionCable::SubscriptionAdapter::Async) @server.config.allowed_request_origins = %w( http://rubyonrails.com ) end @@ -153,6 +165,17 @@ module ActionCable::StreamTests end end + test "subscription confirmation should only be sent out once with muptiple stream_from" do + run_in_eventmachine do + connection = open_connection + expected = { "identifier" => { "channel" => MultiChatChannel.name }.to_json, "type" => "confirm_subscription" } + connection.websocket.expects(:transmit).with(expected.to_json) + receive(connection, command: "subscribe", channel: MultiChatChannel.name, identifiers: {}) + + wait_for_async + end + end + private def subscribe_to(connection, identifiers:) receive connection, command: "subscribe", identifiers: identifiers diff --git a/actioncable/test/client_test.rb b/actioncable/test/client_test.rb index 2e3821828f..9f97e08f1f 100644 --- a/actioncable/test/client_test.rb +++ b/actioncable/test/client_test.rb @@ -39,7 +39,6 @@ class ClientTest < ActionCable::TestCase server.config.logger = Logger.new(StringIO.new).tap { |l| l.level = Logger::UNKNOWN } server.config.cable = ActiveSupport::HashWithIndifferentAccess.new(adapter: "async") - server.config.use_faye = ENV["FAYE"].present? # and now the "real" setup for our test: server.config.disable_request_forgery_protection = true diff --git a/actioncable/test/connection/client_socket_test.rb b/actioncable/test/connection/client_socket_test.rb index 5043a63370..dff7fefbfb 100644 --- a/actioncable/test/connection/client_socket_test.rb +++ b/actioncable/test/connection/client_socket_test.rb @@ -33,8 +33,6 @@ class ActionCable::Connection::ClientSocketTest < ActionCable::TestCase end test "delegate socket errors to on_error handler" do - skip if ENV["FAYE"].present? - run_in_eventmachine do connection = open_connection @@ -49,8 +47,6 @@ class ActionCable::Connection::ClientSocketTest < ActionCable::TestCase end test "closes hijacked i/o socket at shutdown" do - skip if ENV["FAYE"].present? - run_in_eventmachine do connection = open_connection diff --git a/actioncable/test/connection/stream_test.rb b/actioncable/test/connection/stream_test.rb index 4128b32f15..36e1d3c095 100644 --- a/actioncable/test/connection/stream_test.rb +++ b/actioncable/test/connection/stream_test.rb @@ -34,8 +34,6 @@ class ActionCable::Connection::StreamTest < ActionCable::TestCase [ EOFError, Errno::ECONNRESET ].each do |closed_exception| test "closes socket on #{closed_exception}" do - skip if ENV["FAYE"].present? - run_in_eventmachine do connection = open_connection diff --git a/actioncable/test/stubs/test_server.rb b/actioncable/test/stubs/test_server.rb index b64ff33789..5bf2a151dc 100644 --- a/actioncable/test/stubs/test_server.rb +++ b/actioncable/test/stubs/test_server.rb @@ -10,12 +10,6 @@ class TestServer @logger = ActiveSupport::TaggedLogging.new ActiveSupport::Logger.new(StringIO.new) @config = OpenStruct.new(log_tags: [], subscription_adapter: subscription_adapter) - @config.use_faye = ENV["FAYE"].present? - @config.client_socket_class = if @config.use_faye - ActionCable::Connection::FayeClientSocket - else - ActionCable::Connection::ClientSocket - end @mutex = Monitor.new end @@ -25,10 +19,8 @@ class TestServer end def event_loop - @event_loop ||= if @config.use_faye - ActionCable::Connection::FayeEventLoop.new - else - ActionCable::Connection::StreamEventLoop.new + @event_loop ||= ActionCable::Connection::StreamEventLoop.new.tap do |loop| + loop.instance_variable_set(:@executor, Concurrent.global_io_executor) end end diff --git a/actioncable/test/subscription_adapter/common.rb b/actioncable/test/subscription_adapter/common.rb index 1538157995..ee62b9c088 100644 --- a/actioncable/test/subscription_adapter/common.rb +++ b/actioncable/test/subscription_adapter/common.rb @@ -11,7 +11,6 @@ module CommonSubscriptionAdapterTest def setup server = ActionCable::Server::Base.new server.config.cable = cable_config.with_indifferent_access - server.config.use_faye = ENV["FAYE"].present? adapter_klass = server.config.pubsub_adapter diff --git a/actioncable/test/test_helper.rb b/actioncable/test/test_helper.rb index 39855ea252..a47032753b 100644 --- a/actioncable/test/test_helper.rb +++ b/actioncable/test/test_helper.rb @@ -13,41 +13,7 @@ end # Require all the stubs and models Dir[File.dirname(__FILE__) + "/stubs/*.rb"].each { |file| require file } -if ENV["FAYE"].present? - require "faye/websocket" - class << Faye::WebSocket - remove_method :ensure_reactor_running - - # We don't want Faye to start the EM reactor in tests because it makes testing much harder. - # We want to be able to start and stop EM loop in tests to make things simpler. - def ensure_reactor_running - # no-op - end - end -end - -module EventMachineConcurrencyHelpers - def wait_for_async - EM.run_deferred_callbacks - end - - def run_in_eventmachine - failure = nil - EM.run do - begin - yield - rescue => ex - failure = ex - ensure - wait_for_async - EM.stop if EM.reactor_running? - end - end - raise failure if failure - end -end - -module ConcurrentRubyConcurrencyHelpers +class ActionCable::TestCase < ActiveSupport::TestCase def wait_for_async wait_for_executor Concurrent.global_io_executor end @@ -56,18 +22,14 @@ module ConcurrentRubyConcurrencyHelpers yield wait_for_async end -end - -class ActionCable::TestCase < ActiveSupport::TestCase - if ENV["FAYE"].present? - include EventMachineConcurrencyHelpers - else - include ConcurrentRubyConcurrencyHelpers - end def wait_for_executor(executor) + # do not wait forever, wait 2s + timeout = 2 until executor.completed_task_count == executor.scheduled_task_count sleep 0.1 + timeout -= 0.1 + raise "Executor could not complete all tasks in 2 seconds" unless timeout > 0 end end end diff --git a/actionpack/lib/action_controller/metal/renderers.rb b/actionpack/lib/action_controller/metal/renderers.rb index 15377ddcb9..f8a037189c 100644 --- a/actionpack/lib/action_controller/metal/renderers.rb +++ b/actionpack/lib/action_controller/metal/renderers.rb @@ -71,8 +71,6 @@ module ActionController # format.csv { render csv: @csvable, filename: @csvable.name } # end # end - # To use renderers and their mime types in more concise ways, see - # <tt>ActionController::MimeResponds::ClassMethods.respond_to</tt> def self.add(key, &block) define_method(_render_with_renderer_method_name(key), &block) RENDERERS << key.to_sym diff --git a/activerecord/lib/active_record/connection_adapters/abstract/query_cache.rb b/activerecord/lib/active_record/connection_adapters/abstract/query_cache.rb index 6ca53c72ce..2f8a89e88e 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/query_cache.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/query_cache.rb @@ -65,7 +65,7 @@ module ActiveRecord if @query_cache_enabled && !locked?(arel) arel, binds = binds_from_relation arel, binds sql = to_sql(arel, binds) - cache_sql(sql, binds) { super(sql, name, binds, preparable: preparable) } + cache_sql(sql, name, binds) { super(sql, name, binds, preparable: preparable) } else super end @@ -73,11 +73,17 @@ module ActiveRecord private - def cache_sql(sql, binds) + def cache_sql(sql, name, binds) result = if @query_cache[sql].key?(binds) - ActiveSupport::Notifications.instrument("sql.active_record", - sql: sql, binds: binds, name: "CACHE", connection_id: object_id) + ActiveSupport::Notifications.instrument( + "sql.active_record", + sql: sql, + binds: binds, + name: name, + connection_id: object_id, + cached: true, + ) @query_cache[sql][binds] else @query_cache[sql][binds] = yield diff --git a/activerecord/lib/active_record/explain_subscriber.rb b/activerecord/lib/active_record/explain_subscriber.rb index 706b57842f..abd8cfc8f2 100644 --- a/activerecord/lib/active_record/explain_subscriber.rb +++ b/activerecord/lib/active_record/explain_subscriber.rb @@ -18,10 +18,13 @@ module ActiveRecord # # On the other hand, we want to monitor the performance of our real database # queries, not the performance of the access to the query cache. - IGNORED_PAYLOADS = %w(SCHEMA EXPLAIN CACHE) + IGNORED_PAYLOADS = %w(SCHEMA EXPLAIN) EXPLAINED_SQLS = /\A\s*(with|select|update|delete|insert)\b/i def ignore_payload?(payload) - payload[:exception] || IGNORED_PAYLOADS.include?(payload[:name]) || payload[:sql] !~ EXPLAINED_SQLS + payload[:exception] || + payload[:cached] || + IGNORED_PAYLOADS.include?(payload[:name]) || + payload[:sql] !~ EXPLAINED_SQLS end ActiveSupport::Notifications.subscribe("sql.active_record", new) diff --git a/activerecord/lib/active_record/log_subscriber.rb b/activerecord/lib/active_record/log_subscriber.rb index f31931316c..b27e84a5be 100644 --- a/activerecord/lib/active_record/log_subscriber.rb +++ b/activerecord/lib/active_record/log_subscriber.rb @@ -35,6 +35,7 @@ module ActiveRecord return if IGNORE_PAYLOAD_NAMES.include?(payload[:name]) name = "#{payload[:name]} (#{event.duration.round(1)}ms)" + name = "CACHE #{name}" if payload[:cached] sql = payload[:sql] binds = nil diff --git a/activerecord/lib/active_record/migration.rb b/activerecord/lib/active_record/migration.rb index 05568039d8..627e93b5b6 100644 --- a/activerecord/lib/active_record/migration.rb +++ b/activerecord/lib/active_record/migration.rb @@ -1,4 +1,5 @@ require "set" +require "zlib" require "active_support/core_ext/module/attribute_accessors" require "active_support/core_ext/regexp" diff --git a/activerecord/test/cases/adapters/postgresql/transaction_test.rb b/activerecord/test/cases/adapters/postgresql/transaction_test.rb index d992e22305..00119f13bb 100644 --- a/activerecord/test/cases/adapters/postgresql/transaction_test.rb +++ b/activerecord/test/cases/adapters/postgresql/transaction_test.rb @@ -1,5 +1,6 @@ require "cases/helper" require "support/connection_helper" +require "concurrent/atomic/cyclic_barrier" module ActiveRecord class PostgresqlTransactionTest < ActiveRecord::PostgreSQLTestCase @@ -61,26 +62,28 @@ module ActiveRecord test "raises Deadlocked when a deadlock is encountered" do with_warning_suppression do assert_raises(ActiveRecord::Deadlocked) do + barrier = Concurrent::CyclicBarrier.new(2) + s1 = Sample.create value: 1 s2 = Sample.create value: 2 thread = Thread.new do Sample.transaction do s1.lock! - sleep 1 + barrier.wait s2.update_attributes value: 1 end end - sleep 0.5 - - Sample.transaction do - s2.lock! - sleep 1 - s1.update_attributes value: 2 + begin + Sample.transaction do + s2.lock! + barrier.wait + s1.update_attributes value: 2 + end + ensure + thread.join end - - thread.join end end end diff --git a/activerecord/test/cases/test_case.rb b/activerecord/test/cases/test_case.rb index 60ac3e08a1..8eddc5a9ed 100644 --- a/activerecord/test/cases/test_case.rb +++ b/activerecord/test/cases/test_case.rb @@ -125,12 +125,9 @@ module ActiveRecord end def call(name, start, finish, message_id, values) - sql = values[:sql] - - # FIXME: this seems bad. we should probably have a better way to indicate - # the query was cached - return if "CACHE" == values[:name] + return if values[:cached] + sql = values[:sql] self.class.log_all << sql self.class.log << sql unless ignore.match?(sql) end diff --git a/activesupport/lib/active_support/callbacks.rb b/activesupport/lib/active_support/callbacks.rb index 6d39be1c1f..890b1cd73b 100644 --- a/activesupport/lib/active_support/callbacks.rb +++ b/activesupport/lib/active_support/callbacks.rb @@ -63,6 +63,8 @@ module ActiveSupport included do extend ActiveSupport::DescendantsTracker + class_attribute :__callbacks, instance_writer: false + self.__callbacks ||= {} end CALLBACK_FILTER_TYPES = [:before, :after, :around] @@ -86,21 +88,57 @@ module ActiveSupport # run_callbacks :save do # save # end - def run_callbacks(kind, &block) - send "_run_#{kind}_callbacks", &block - end - - private + # + #-- + # + # As this method is used in many places, and often wraps large portions of + # user code, it has an additional design goal of minimizing its impact on + # the visible call stack. An exception from inside a :before or :after + # callback can be as noisy as it likes -- but when control has passed + # smoothly through and into the supplied block, we want as little evidence + # as possible that we were here. + def run_callbacks(kind) + callbacks = __callbacks[kind.to_sym] + + if callbacks.empty? + yield if block_given? + else + env = Filters::Environment.new(self, false, nil) + next_sequence = callbacks.compile + + invoke_sequence = Proc.new do + skipped = nil + while true + current, next_sequence = next_sequence, next_sequence.nested + current.invoke_before(env) + if current.final? + env.value = !env.halted && (!block_given? || yield) + elsif current.skip?(env) + (skipped ||= []) << current + next + else + expanded = current.expand_call_template(env, invoke_sequence) + expanded.shift.send(*expanded, &expanded.shift) + end + current.invoke_after(env) + skipped.pop.invoke_after(env) while skipped && skipped.first + break env.value + end + end - def __run_callbacks__(callbacks, &block) - if callbacks.empty? - yield if block_given? + # Common case: no 'around' callbacks defined + if next_sequence.final? + next_sequence.invoke_before(env) + env.value = !env.halted && (!block_given? || yield) + next_sequence.invoke_after(env) + env.value else - runner = callbacks.compile - e = Filters::Environment.new(self, false, nil, block) - runner.call(e).value + invoke_sequence.call end end + end + + private # A hook invoked every time a before callback is halted. # This can be overridden in ActiveSupport::Callbacks implementors in order @@ -118,16 +156,7 @@ module ActiveSupport end module Filters - Environment = Struct.new(:target, :halted, :value, :run_block) - - class End - def call(env) - block = env.run_block - env.value = !env.halted && (!block || block.call) - env - end - end - ENDING = End.new + Environment = Struct.new(:target, :halted, :value) class Before def self.build(callback_sequence, user_callback, user_conditions, chain_config, filter) @@ -246,51 +275,6 @@ module ActiveSupport end private_class_method :simple end - - class Around - def self.build(callback_sequence, user_callback, user_conditions, chain_config) - if user_conditions.any? - halting_and_conditional(callback_sequence, user_callback, user_conditions) - else - halting(callback_sequence, user_callback) - end - end - - def self.halting_and_conditional(callback_sequence, user_callback, user_conditions) - callback_sequence.around do |env, &run| - target = env.target - value = env.value - halted = env.halted - - if !halted && user_conditions.all? { |c| c.call(target, value) } - user_callback.call(target, value) { - run.call.value - } - env - else - run.call - end - end - end - private_class_method :halting_and_conditional - - def self.halting(callback_sequence, user_callback) - callback_sequence.around do |env, &run| - target = env.target - value = env.value - - if env.halted - run.call - else - user_callback.call(target, value) { - run.call.value - } - env - end - end - end - private_class_method :halting - end end class Callback #:nodoc:# @@ -349,64 +333,23 @@ module ActiveSupport # Wraps code with filter def apply(callback_sequence) user_conditions = conditions_lambdas - user_callback = make_lambda @filter + user_callback = CallTemplate.build(@filter, self) case kind when :before - Filters::Before.build(callback_sequence, user_callback, user_conditions, chain_config, @filter) + Filters::Before.build(callback_sequence, user_callback.make_lambda, user_conditions, chain_config, @filter) when :after - Filters::After.build(callback_sequence, user_callback, user_conditions, chain_config) + Filters::After.build(callback_sequence, user_callback.make_lambda, user_conditions, chain_config) when :around - Filters::Around.build(callback_sequence, user_callback, user_conditions, chain_config) + callback_sequence.around(user_callback, user_conditions) end end - private - - def invert_lambda(l) - lambda { |*args, &blk| !l.call(*args, &blk) } - end - - # Filters support: - # - # Symbols:: A method to call. - # Strings:: Some content to evaluate. - # Procs:: A proc to call with the object. - # Objects:: An object with a <tt>before_foo</tt> method on it to call. - # - # All of these objects are converted into a lambda and handled - # the same after this point. - def make_lambda(filter) - case filter - when Symbol - lambda { |target, _, &blk| target.send filter, &blk } - when String - l = eval "lambda { |value| #{filter} }" - lambda { |target, value| target.instance_exec(value, &l) } - when Conditionals::Value then filter - when ::Proc - if filter.arity > 1 - return lambda { |target, _, &block| - raise ArgumentError unless block - target.instance_exec(target, block, &filter) - } - end - - if filter.arity <= 0 - lambda { |target, _| target.instance_exec(&filter) } - else - lambda { |target, _| target.instance_exec(target, &filter) } - end - else - scopes = Array(chain_config[:scope]) - method_to_call = scopes.map { |s| public_send(s) }.join("_") - - lambda { |target, _, &blk| - filter.public_send method_to_call, target, &blk - } - end - end + def current_scopes + Array(chain_config[:scope]).map { |s| public_send(s) } + end + private def compute_identifier(filter) case filter when String, ::Proc @@ -417,17 +360,116 @@ module ActiveSupport end def conditions_lambdas - @if.map { |c| make_lambda c } + - @unless.map { |c| invert_lambda make_lambda c } + @if.map { |c| CallTemplate.build(c, self).make_lambda } + + @unless.map { |c| CallTemplate.build(c, self).inverted_lambda } end end + # A future invocation of user-supplied code (either as a callback, + # or a condition filter). + class CallTemplate # :nodoc: + def initialize(target, method, arguments, block) + @override_target = target + @method_name = method + @arguments = arguments + @override_block = block + end + + # Return the parts needed to make this call, with the given + # input values. + # + # Returns an array of the form: + # + # [target, block, method, *arguments] + # + # This array can be used as such: + # + # target.send(method, *arguments, &block) + # + # The actual invocation is left up to the caller to minimize + # call stack pollution. + def expand(target, value, block) + result = @arguments.map { |arg| + case arg + when :value; value + when :target; target + when :block; block || raise(ArgumentError) + end + } + + result.unshift @method_name + result.unshift @override_block || block + result.unshift @override_target || target + + # target, block, method, *arguments = result + # target.send(method, *arguments, &block) + result + end + + # Return a lambda that will make this call when given the input + # values. + def make_lambda + lambda do |target, value, &block| + c = expand(target, value, block) + c.shift.send(*c, &c.shift) + end + end + + # Return a lambda that will make this call when given the input + # values, but then return the boolean inverse of that result. + def inverted_lambda + lambda do |target, value, &block| + c = expand(target, value, block) + ! c.shift.send(*c, &c.shift) + end + end + + # Filters support: + # + # Symbols:: A method to call. + # Strings:: Some content to evaluate. + # Procs:: A proc to call with the object. + # Objects:: An object with a <tt>before_foo</tt> method on it to call. + # + # All of these objects are converted into a CallTemplate and handled + # the same after this point. + def self.build(filter, callback) + case filter + when Symbol + new(nil, filter, [], nil) + when String + new(nil, :instance_exec, [:value], compile_lambda(filter)) + when Conditionals::Value + new(filter, :call, [:target, :value], nil) + when ::Proc + if filter.arity > 1 + new(nil, :instance_exec, [:target, :block], filter) + elsif filter.arity > 0 + new(nil, :instance_exec, [:target], filter) + else + new(nil, :instance_exec, [], filter) + end + else + method_to_call = callback.current_scopes.join("_") + + new(filter, method_to_call, [:target], nil) + end + end + + def self.compile_lambda(filter) + eval("lambda { |value| #{filter} }") + end + end + # Execute before and after filters in a sequence instead of # chaining them with nested lambda calls, see: # https://github.com/rails/rails/issues/18011 - class CallbackSequence - def initialize(&call) - @call = call + class CallbackSequence # :nodoc: + def initialize(nested = nil, call_template = nil, user_conditions = nil) + @nested = nested + @call_template = call_template + @user_conditions = user_conditions + @before = [] @after = [] end @@ -442,19 +484,32 @@ module ActiveSupport self end - def around(&around) - CallbackSequence.new do |arg| - around.call(arg) { - call(arg) - } - end + def around(call_template, user_conditions) + CallbackSequence.new(self, call_template, user_conditions) + end + + def skip?(arg) + arg.halted || !@user_conditions.all? { |c| c.call(arg.target, arg.value) } + end + + def nested + @nested end - def call(arg) + def final? + !@call_template + end + + def expand_call_template(arg, block) + @call_template.expand(arg.target, arg.value, block) + end + + def invoke_before(arg) @before.each { |b| b.call(arg) } - value = @call.call(arg) + end + + def invoke_after(arg) @after.each { |a| a.call(arg) } - value end end @@ -503,7 +558,7 @@ module ActiveSupport def compile @callbacks || @mutex.synchronize do - final_sequence = CallbackSequence.new { |env| Filters::ENDING.call(env) } + final_sequence = CallbackSequence.new @callbacks ||= @chain.reverse.inject(final_sequence) do |callback_sequence, callback| callback.apply callback_sequence end @@ -747,12 +802,25 @@ module ActiveSupport options = names.extract_options! names.each do |name| - class_attribute "_#{name}_callbacks", instance_writer: false + name = name.to_sym + set_callbacks name, CallbackChain.new(name, options) module_eval <<-RUBY, __FILE__, __LINE__ + 1 def _run_#{name}_callbacks(&block) - __run_callbacks__(_#{name}_callbacks, &block) + run_callbacks #{name.inspect}, &block + end + + def self._#{name}_callbacks + get_callbacks(#{name.inspect}) + end + + def self._#{name}_callbacks=(value) + set_callbacks(#{name.inspect}, value) + end + + def _#{name}_callbacks + __callbacks[#{name.inspect}] end RUBY end @@ -761,11 +829,11 @@ module ActiveSupport protected def get_callbacks(name) # :nodoc: - send "_#{name}_callbacks" + __callbacks[name.to_sym] end def set_callbacks(name, callbacks) # :nodoc: - send "_#{name}_callbacks=", callbacks + self.__callbacks = __callbacks.merge(name.to_sym => callbacks) end def deprecated_false_terminator # :nodoc: diff --git a/activesupport/test/callbacks_test.rb b/activesupport/test/callbacks_test.rb index b4e98edd84..783952c8c7 100644 --- a/activesupport/test/callbacks_test.rb +++ b/activesupport/test/callbacks_test.rb @@ -56,6 +56,8 @@ module CallbacksTest end class Person < Record + attr_accessor :save_fails + [:before_save, :after_save].each do |callback_method| callback_method_sym = callback_method.to_sym send(callback_method, callback_symbol(callback_method_sym)) @@ -67,7 +69,9 @@ module CallbacksTest end def save - run_callbacks :save + run_callbacks :save do + raise "inside save" if save_fails + end end end @@ -222,6 +226,7 @@ module CallbacksTest class AroundPerson < MySuper attr_reader :history + attr_accessor :save_fails set_callback :save, :before, :nope, if: :no set_callback :save, :before, :nope, unless: :yes @@ -285,6 +290,7 @@ module CallbacksTest def save run_callbacks :save do + raise "inside save" if save_fails @history << "running" end end @@ -402,6 +408,71 @@ module CallbacksTest end end + class CallStackTest < ActiveSupport::TestCase + def test_tidy_call_stack + around = AroundPerson.new + around.save_fails = true + + exception = (around.save rescue $!) + + # Make sure we have the exception we're expecting + assert_equal "inside save", exception.message + + call_stack = exception.backtrace_locations + call_stack.pop caller_locations(0).size + + # Yes, this looks like an implementation test, but it's the least + # obtuse way of asserting that there aren't a load of entries in + # the call stack for each callback. + # + # If you've renamed a method, or squeezed more lines out, go ahead + # and update this assertion. But if you're here because a + # refactoring added new lines, please reconsider. + + # As shown here, our current budget is one line for run_callbacks + # itself, plus N+1 lines where N is the number of :around + # callbacks that have been invoked, if there are any (plus + # whatever the callbacks do themselves, of course). + + assert_equal [ + "block in save", + "block in run_callbacks", + "tweedle_deedle", + "block in run_callbacks", + "w0tyes", + "block in run_callbacks", + "tweedle_dum", + "block in run_callbacks", + ("call" if RUBY_VERSION < "2.3"), + "run_callbacks", + "save" + ].compact, call_stack.map(&:label) + end + + def test_short_call_stack + person = Person.new + person.save_fails = true + + exception = (person.save rescue $!) + + # Make sure we have the exception we're expecting + assert_equal "inside save", exception.message + + call_stack = exception.backtrace_locations + call_stack.pop caller_locations(0).size + + # This budget much simpler: with no :around callbacks invoked, + # there should be just one line. run_callbacks yields directly + # back to its caller. + + assert_equal [ + "block in save", + "run_callbacks", + "save" + ], call_stack.map(&:label) + end + end + class AroundCallbackResultTest < ActiveSupport::TestCase def test_save_around around = AroundPersonResult.new diff --git a/guides/source/active_record_querying.md b/guides/source/active_record_querying.md index bbe1b0decc..38b1ffc4c8 100644 --- a/guides/source/active_record_querying.md +++ b/guides/source/active_record_querying.md @@ -81,7 +81,6 @@ The methods are: * `reorder` * `reverse_order` * `select` -* `distinct` * `where` Finder methods that return a collection, such as `where` and `group`, return an instance of `ActiveRecord::Relation`. Methods that find a single entity, such as `find` and `first`, return a single instance of the model. |