aboutsummaryrefslogtreecommitdiffstats
path: root/activerecord/lib/active_record/relation/batches.rb
blob: 49b01909c6b3570cc2be5a406d546d31facc78f7 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
module ActiveRecord
  module Batches
    # Looping through a collection of records from the database
    # (using the +all+ method, for example) is very inefficient
    # since it will try to instantiate all the objects at once.
    #
    # In that case, batch processing methods allow you to work
    # with the records in batches, thereby greatly reducing memory consumption.
    #
    # The #find_each method uses #find_in_batches with a batch size of 1000 (or as
    # specified by the +:batch_size+ option).
    #
    #   Person.find_each do |person|
    #     person.do_awesome_stuff
    #   end
    #
    #   Person.where("age > 21").find_each do |person|
    #     person.party_all_night!
    #   end
    #
    # If you do not provide a block to #find_each, it will return an Enumerator
    # for chaining with other methods:
    #
    #   Person.find_each.with_index do |person, index|
    #     person.award_trophy(index + 1)
    #   end
    #
    # ==== Options
    # * <tt>:batch_size</tt> - Specifies the size of the batch. Default to 1000.
    # * <tt>:start</tt> - Specifies the starting point for the batch processing.
    # This is especially useful if you want multiple workers dealing with
    # the same processing queue. You can make worker 1 handle all the records
    # between id 0 and 10,000 and worker 2 handle from 10,000 and beyond
    # (by setting the +:start+ option on that worker).
    #
    #   # Let's process for a batch of 2000 records, skipping the first 2000 rows
    #   Person.find_each(start: 2000, batch_size: 2000) do |person|
    #     person.party_all_night!
    #   end
    #
    # NOTE: It's not possible to set the order. That is automatically set to
    # ascending on the primary key ("id ASC") to make the batch ordering
    # work. This also means that this method only works with integer-based
    # primary keys.
    #
    # NOTE: You can't set the limit either, that's used to control
    # the batch sizes.
    def find_each(options = {})
      if block_given?
        find_in_batches(options) do |records|
          records.each { |record| yield record }
        end
      else
        enum_for :find_each, options
      end
    end

    # Yields each batch of records that was found by the find +options+ as
    # an array.
    #
    #   Person.where("age > 21").find_in_batches do |group|
    #     sleep(50) # Make sure it doesn't get too crowded in there!
    #     group.each { |person| person.party_all_night! }
    #   end
    #
    # ==== Options
    # * <tt>:batch_size</tt> - Specifies the size of the batch. Default to 1000.
    # * <tt>:start</tt> - Specifies the starting point for the batch processing.
    # This is especially useful if you want multiple workers dealing with
    # the same processing queue. You can make worker 1 handle all the records
    # between id 0 and 10,000 and worker 2 handle from 10,000 and beyond
    # (by setting the +:start+ option on that worker).
    #
    #   # Let's process the next 2000 records
    #   Person.find_in_batches(start: 2000, batch_size: 2000) do |group|
    #     group.each { |person| person.party_all_night! }
    #   end
    #
    # NOTE: It's not possible to set the order. That is automatically set to
    # ascending on the primary key ("id ASC") to make the batch ordering
    # work. This also means that this method only works with integer-based
    # primary keys.
    #
    # NOTE: You can't set the limit either, that's used to control
    # the batch sizes.
    def find_in_batches(options = {})
      options.assert_valid_keys(:start, :batch_size)

      relation = self

      if logger && (arel.orders.present? || arel.taken.present?)
        logger.warn("Scoped order and limit are ignored, it's forced to be batch order and batch size")
      end

      start = options.delete(:start)
      batch_size = options.delete(:batch_size) || 1000

      relation = relation.reorder(batch_order).limit(batch_size)
      records = start ? relation.where(table[primary_key].gteq(start)).to_a : relation.to_a

      while records.any?
        records_size = records.size
        primary_key_offset = records.last.id

        yield records

        break if records_size < batch_size

        if primary_key_offset
          records = relation.where(table[primary_key].gt(primary_key_offset)).to_a
        else
          raise "Primary key not included in the custom select clause"
        end
      end
    end

    private

    def batch_order
      "#{quoted_table_name}.#{quoted_primary_key} ASC"
    end
  end
end