From d6466beb9fff9f2ba4f73673e65f087dd6bba488 Mon Sep 17 00:00:00 2001 From: eileencodes Date: Mon, 20 Feb 2017 13:35:19 -0500 Subject: Ensure test threads share a DB connection This ensures multiple threads inside a transactional test to see consistent database state. When a system test starts Puma spins up one thread and Capybara spins up another thread. Because of this when tests are run the database cannot see what was inserted into the database on teardown. This is because there are two threads using two different connections. This change uses the statement cache to lock the threads to using a single connection ID instead of each not being able to see each other. This code only runs in the fixture setup and teardown so it does not affect real production databases. When a transaction is opened we set `lock_thread` to `Thread.current` so we can keep track of which connection the thread is using. When we rollback the transaction we unlock the thread and then there will be no left-over data in the database because the transaction will roll back the correct connections. [ Eileen M. Uchitelle, Matthew Draper ] --- .../abstract/connection_pool.rb | 12 +++++++- .../connection_adapters/abstract/query_cache.rb | 36 ++++++++++++---------- .../connection_adapters/abstract_adapter.rb | 7 ++++- .../connection_adapters/postgresql_adapter.rb | 34 +++++++++++--------- activerecord/lib/active_record/fixtures.rb | 3 ++ activerecord/test/cases/fixtures_test.rb | 6 ++++ activerecord/test/cases/query_cache_test.rb | 12 ++++++++ .../test/cases/scoping/default_scoping_test.rb | 2 ++ 8 files changed, 80 insertions(+), 32 deletions(-) (limited to 'activerecord') diff --git a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb index ce4721c99d..3f2e86a98d 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb @@ -353,6 +353,16 @@ module ActiveRecord @threads_blocking_new_connections = 0 @available = ConnectionLeasingQueue.new self + + @lock_thread = false + end + + def lock_thread=(lock_thread) + if lock_thread + @lock_thread = Thread.current + else + @lock_thread = nil + end end # Retrieve the connection associated with the current thread, or call @@ -361,7 +371,7 @@ module ActiveRecord # #connection can be called any number of times; the connection is # held in a cache keyed by a thread. def connection - @thread_cached_conns[connection_cache_key(Thread.current)] ||= checkout + @thread_cached_conns[connection_cache_key(@lock_thread || Thread.current)] ||= checkout end # Returns true if there is an open connection being used for the current thread. diff --git a/activerecord/lib/active_record/connection_adapters/abstract/query_cache.rb b/activerecord/lib/active_record/connection_adapters/abstract/query_cache.rb index 7eab7de5d3..e53ba4e666 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/query_cache.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/query_cache.rb @@ -83,7 +83,9 @@ module ActiveRecord # the same SQL query and repeatedly return the same result each time, silently # undermining the randomness you were expecting. def clear_query_cache - @query_cache.clear + @lock.synchronize do + @query_cache.clear + end end def select_all(arel, name = nil, binds = [], preparable: nil) @@ -99,21 +101,23 @@ module ActiveRecord private def cache_sql(sql, name, binds) - result = - if @query_cache[sql].key?(binds) - ActiveSupport::Notifications.instrument( - "sql.active_record", - sql: sql, - binds: binds, - name: name, - connection_id: object_id, - cached: true, - ) - @query_cache[sql][binds] - else - @query_cache[sql][binds] = yield - end - result.dup + @lock.synchronize do + result = + if @query_cache[sql].key?(binds) + ActiveSupport::Notifications.instrument( + "sql.active_record", + sql: sql, + binds: binds, + name: name, + connection_id: object_id, + cached: true, + ) + @query_cache[sql][binds] + else + @query_cache[sql][binds] = yield + end + result.dup + end end # If arel is locked this is a SELECT ... FOR UPDATE or somesuch. Such diff --git a/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb b/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb index 6b14a498df..b31ce0a181 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb @@ -107,6 +107,7 @@ module ActiveRecord @schema_cache = SchemaCache.new self @quoted_column_names, @quoted_table_names = {}, {} @visitor = arel_visitor + @lock = Monitor.new if self.class.type_cast_config_to_boolean(config.fetch(:prepared_statements) { true }) @prepared_statements = true @@ -605,7 +606,11 @@ module ActiveRecord binds: binds, type_casted_binds: type_casted_binds, statement_name: statement_name, - connection_id: object_id) { yield } + connection_id: object_id) do + @lock.synchronize do + yield + end + end rescue => e raise translate_exception_class(e, sql) end diff --git a/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb b/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb index 36c9815547..c89e29ba44 100644 --- a/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb +++ b/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb @@ -236,7 +236,9 @@ module ActiveRecord # Clears the prepared statements cache. def clear_cache! - @statements.clear + @lock.synchronize do + @statements.clear + end end def truncate(table_name, name = nil) @@ -637,8 +639,10 @@ module ActiveRecord if in_transaction? raise ActiveRecord::PreparedStatementCacheExpired.new(e.cause.message) else - # outside of transactions we can simply flush this query and retry - @statements.delete sql_key(sql) + @lock.synchronize do + # outside of transactions we can simply flush this query and retry + @statements.delete sql_key(sql) + end retry end end @@ -674,19 +678,21 @@ module ActiveRecord # Prepare the statement if it hasn't been prepared, return # the statement key. def prepare_statement(sql) - sql_key = sql_key(sql) - unless @statements.key? sql_key - nextkey = @statements.next_key - begin - @connection.prepare nextkey, sql - rescue => e - raise translate_exception_class(e, sql) + @lock.synchronize do + sql_key = sql_key(sql) + unless @statements.key? sql_key + nextkey = @statements.next_key + begin + @connection.prepare nextkey, sql + rescue => e + raise translate_exception_class(e, sql) + end + # Clear the queue + @connection.get_last_result + @statements[sql_key] = nextkey end - # Clear the queue - @connection.get_last_result - @statements[sql_key] = nextkey + @statements[sql_key] end - @statements[sql_key] end # Connects to a PostgreSQL server and sets up the adapter depending on the diff --git a/activerecord/lib/active_record/fixtures.rb b/activerecord/lib/active_record/fixtures.rb index 91d8054ef2..e79167d568 100644 --- a/activerecord/lib/active_record/fixtures.rb +++ b/activerecord/lib/active_record/fixtures.rb @@ -970,6 +970,7 @@ module ActiveRecord @fixture_connections = enlist_fixture_connections @fixture_connections.each do |connection| connection.begin_transaction joinable: false + connection.pool.lock_thread = true end # When connections are established in the future, begin a transaction too @@ -985,6 +986,7 @@ module ActiveRecord if connection && !@fixture_connections.include?(connection) connection.begin_transaction joinable: false + connection.pool.lock_thread = true @fixture_connections << connection end end @@ -1007,6 +1009,7 @@ module ActiveRecord ActiveSupport::Notifications.unsubscribe(@connection_subscriber) if @connection_subscriber @fixture_connections.each do |connection| connection.rollback_transaction if connection.transaction_open? + connection.pool.lock_thread = false end @fixture_connections.clear else diff --git a/activerecord/test/cases/fixtures_test.rb b/activerecord/test/cases/fixtures_test.rb index 61e596e208..afe761cb55 100644 --- a/activerecord/test/cases/fixtures_test.rb +++ b/activerecord/test/cases/fixtures_test.rb @@ -640,6 +640,8 @@ class TransactionalFixturesOnConnectionNotification < ActiveRecord::TestCase def test_transaction_created_on_connection_notification connection = stub(transaction_open?: false) connection.expects(:begin_transaction).with(joinable: false) + pool = connection.stubs(:pool).returns(ActiveRecord::ConnectionAdapters::ConnectionPool.new(ActiveRecord::Base.connection_pool.spec)) + pool.stubs(:lock_thread=).with(false) fire_connection_notification(connection) end @@ -647,12 +649,16 @@ class TransactionalFixturesOnConnectionNotification < ActiveRecord::TestCase # Mocha is not thread-safe so define our own stub to test connection = Class.new do attr_accessor :rollback_transaction_called + attr_accessor :pool def transaction_open?; true; end def begin_transaction(*args); end def rollback_transaction(*args) @rollback_transaction_called = true end end.new + connection.pool = Class.new do + def lock_thread=(lock_thread); false; end + end.new fire_connection_notification(connection) teardown_fixtures assert(connection.rollback_transaction_called, "Expected #rollback_transaction to be called but was not") diff --git a/activerecord/test/cases/query_cache_test.rb b/activerecord/test/cases/query_cache_test.rb index d8cf235000..494663eb04 100644 --- a/activerecord/test/cases/query_cache_test.rb +++ b/activerecord/test/cases/query_cache_test.rb @@ -532,4 +532,16 @@ class QueryCacheExpiryTest < ActiveRecord::TestCase end end end + + test "threads use the same connection" do + @connection_1 = ActiveRecord::Base.connection.object_id + + thread_a = Thread.new do + @connection_2 = ActiveRecord::Base.connection.object_id + end + + thread_a.join + + assert_equal @connection_1, @connection_2 + end end diff --git a/activerecord/test/cases/scoping/default_scoping_test.rb b/activerecord/test/cases/scoping/default_scoping_test.rb index 3a04f4bf7d..14fb2fbbfa 100644 --- a/activerecord/test/cases/scoping/default_scoping_test.rb +++ b/activerecord/test/cases/scoping/default_scoping_test.rb @@ -10,6 +10,8 @@ require "concurrent/atomic/cyclic_barrier" class DefaultScopingTest < ActiveRecord::TestCase fixtures :developers, :posts, :comments + self.use_transactional_tests = false + def test_default_scope expected = Developer.all.merge!(order: "salary DESC").to_a.collect(&:salary) received = DeveloperOrderedBySalary.all.collect(&:salary) -- cgit v1.2.3