From 02f25a226f6418f95d7ea1c62f68b2f8688ae37a Mon Sep 17 00:00:00 2001 From: Jon Leighton Date: Fri, 14 Sep 2012 13:35:24 +0100 Subject: Start to tease out transaction handling into a state machine --- .../abstract/database_statements.rb | 107 +++++------------- .../connection_adapters/abstract/transaction.rb | 124 +++++++++++++++++++++ 2 files changed, 152 insertions(+), 79 deletions(-) create mode 100644 activerecord/lib/active_record/connection_adapters/abstract/transaction.rb (limited to 'activerecord/lib/active_record/connection_adapters/abstract') diff --git a/activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb b/activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb index 4e1f0e1d62..7cfaf3b0e5 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb @@ -3,8 +3,8 @@ module ActiveRecord module DatabaseStatements def initialize super - @_current_transaction_records = [] - @transaction_joinable = nil + @transaction_joinable = nil + @transaction = Transaction::Closed.new(self) end # Converts an arel AST to SQL @@ -179,68 +179,53 @@ module ActiveRecord transaction_open = false begin - if requires_new || open_transactions == 0 - if open_transactions == 0 - begin_db_transaction - elsif requires_new - create_savepoint - end - increment_open_transactions + if @transaction.closed? || requires_new + @transaction = @transaction.begin transaction_open = true - @_current_transaction_records.push([]) end + yield - rescue Exception => database_transaction_rollback - if transaction_open && !outside_transaction? + rescue Exception => error + if !outside_transaction? && transaction_open + @transaction = @transaction.rollback transaction_open = false - decrement_open_transactions - if open_transactions == 0 - rollback_db_transaction - rollback_transaction_records(true) - else - rollback_to_savepoint - rollback_transaction_records(false) - end end - raise unless database_transaction_rollback.is_a?(ActiveRecord::Rollback) + + raise unless error.is_a?(ActiveRecord::Rollback) end + ensure @transaction_joinable = last_transaction_joinable if outside_transaction? @open_transactions = 0 - elsif transaction_open - decrement_open_transactions + @transaction = Transactions::Closed.new(self) + elsif @transaction.open? && transaction_open begin - if open_transactions == 0 - commit_db_transaction - commit_transaction_records - else - release_savepoint - save_point_records = @_current_transaction_records.pop - unless save_point_records.blank? - @_current_transaction_records.push([]) if @_current_transaction_records.empty? - @_current_transaction_records.last.concat(save_point_records) - end - end + @transaction = @transaction.commit rescue Exception - if open_transactions == 0 - rollback_db_transaction - rollback_transaction_records(true) - else - rollback_to_savepoint - rollback_transaction_records(false) - end + @transaction = @transaction.parent raise end end end + def transaction_state #:nodoc: + @transaction + end + + def begin_transaction #:nodoc: + @transaction = @transaction.begin + end + + def rollback_transaction #:nodoc: + @transaction = @transaction.rollback + end + # Register a record with the current transaction so that its after_commit and after_rollback callbacks # can be called. def add_transaction_record(record) - last_batch = @_current_transaction_records.last - last_batch << record if last_batch + @transaction.add_record(record) end # Begins the transaction (and turns off auto-committing). @@ -354,42 +339,6 @@ module ActiveRecord update_sql(sql, name) end - # Send a rollback message to all records after they have been rolled back. If rollback - # is false, only rollback records since the last save point. - def rollback_transaction_records(rollback) - if rollback - records = @_current_transaction_records.flatten - @_current_transaction_records.clear - else - records = @_current_transaction_records.pop - end - - unless records.blank? - records.uniq.each do |record| - begin - record.rolledback!(rollback) - rescue => e - record.logger.error(e) if record.respond_to?(:logger) && record.logger - end - end - end - end - - # Send a commit message to all records after they have been committed. - def commit_transaction_records - records = @_current_transaction_records.flatten - @_current_transaction_records.clear - unless records.blank? - records.uniq.each do |record| - begin - record.committed! - rescue => e - record.logger.error(e) if record.respond_to?(:logger) && record.logger - end - end - end - end - def sql_for_insert(sql, pk, id_value, sequence_name, binds) [sql, binds] end diff --git a/activerecord/lib/active_record/connection_adapters/abstract/transaction.rb b/activerecord/lib/active_record/connection_adapters/abstract/transaction.rb new file mode 100644 index 0000000000..0a4649ffa7 --- /dev/null +++ b/activerecord/lib/active_record/connection_adapters/abstract/transaction.rb @@ -0,0 +1,124 @@ +module ActiveRecord + module ConnectionAdapters + module Transaction # :nodoc: + class State + attr_reader :connection + + def initialize(connection) + @connection = connection + end + end + + class Closed < State + def begin + Open.new(connection, self) + end + + def closed? + true + end + + def open? + false + end + + # This is a noop when there are no open transactions + def add_record(record) + end + end + + class Open < State + attr_reader :parent, :records + + def initialize(connection, parent) + super connection + + @parent = parent + @records = [] + + if parent.open? + connection.create_savepoint + else + connection.begin_db_transaction + end + + connection.increment_open_transactions + end + + def begin + Open.new(connection, self) + end + + def rollback + connection.decrement_open_transactions + + if parent.open? + connection.rollback_to_savepoint + else + connection.rollback_db_transaction + end + + rollback_records + parent + end + + def commit + connection.decrement_open_transactions + + begin + if parent.open? + connection.release_savepoint + records.each { |r| parent.add_record(r) } + else + connection.commit_db_transaction + commit_records + end + rescue Exception + if parent.open? + connection.rollback_to_savepoint + else + connection.rollback_db_transaction + end + + rollback_records + raise + end + + parent + end + + def add_record(record) + records << record + end + + def rollback_records + records.uniq.each do |record| + begin + record.rolledback!(parent.closed?) + rescue => e + record.logger.error(e) if record.respond_to?(:logger) && record.logger + end + end + end + + def commit_records + records.uniq.each do |record| + begin + record.committed! + rescue => e + record.logger.error(e) if record.respond_to?(:logger) && record.logger + end + end + end + + def closed? + false + end + + def open? + true + end + end + end + end +end -- cgit v1.2.3