aboutsummaryrefslogtreecommitdiffstats
path: root/activerecord/lib/active_record/insert_all.rb
blob: 6d14ff70b6d6c28ce255cb21961b631846b76e4d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# frozen_string_literal: true

module ActiveRecord
  class InsertAll
    attr_reader :model, :connection, :inserts, :on_duplicate, :returning, :unique_by

    def initialize(model, inserts, on_duplicate:, returning: nil, unique_by: nil)
      raise ArgumentError, "Empty list of attributes passed" if inserts.blank?

      @model, @connection, @inserts, @on_duplicate, @returning, @unique_by = model, model.connection, inserts, on_duplicate, returning, unique_by
      @returning = (connection.supports_insert_returning? ? primary_keys : false) if @returning.nil?
      @returning = false if @returning == []
      @on_duplicate = :skip if @on_duplicate == :update && updatable_columns.empty?

      ensure_valid_options_for_connection!
    end

    def execute
      connection.exec_query to_sql, "Bulk Insert"
    end

    def keys
      inserts.first.keys.map(&:to_s)
    end

    def updatable_columns
      keys - readonly_columns - unique_by_columns
    end

    def skip_duplicates?
      on_duplicate == :skip
    end

    def update_duplicates?
      on_duplicate == :update
    end

    private
      def ensure_valid_options_for_connection!
        if returning && !connection.supports_insert_returning?
          raise ArgumentError, "#{connection.class} does not support :returning"
        end

        unless %i{ raise skip update }.member?(on_duplicate)
          raise NotImplementedError, "#{on_duplicate.inspect} is an unknown value for :on_duplicate. Valid values are :raise, :skip, and :update"
        end

        if on_duplicate == :skip && !connection.supports_insert_on_duplicate_skip?
          raise ArgumentError, "#{connection.class} does not support skipping duplicates"
        end

        if on_duplicate == :update && !connection.supports_insert_on_duplicate_update?
          raise ArgumentError, "#{connection.class} does not support upsert"
        end

        if unique_by && !connection.supports_insert_conflict_target?
          raise ArgumentError, "#{connection.class} does not support :unique_by"
        end
      end

      def to_sql
        connection.build_insert_sql(ActiveRecord::InsertAll::Builder.new(self))
      end

      def readonly_columns
        primary_keys + model.readonly_attributes.to_a
      end

      def unique_by_columns
        unique_by ? unique_by.fetch(:columns).map(&:to_s) : []
      end

      def primary_keys
        Array.wrap(model.primary_key)
      end


      class Builder
        attr_reader :model

        delegate :skip_duplicates?, :update_duplicates?, to: :insert_all

        def initialize(insert_all)
          @insert_all, @model, @connection = insert_all, insert_all.model, insert_all.connection
        end

        def into
          "INTO #{model.quoted_table_name}(#{columns_list})"
        end

        def values_list
          columns = connection.schema_cache.columns_hash(model.table_name)
          keys = insert_all.keys.to_set
          types = keys.map { |key| [ key, connection.lookup_cast_type_from_column(columns[key]) ] }.to_h

          values_list = insert_all.inserts.map do |attributes|
            attributes = attributes.stringify_keys

            unless attributes.keys.to_set == keys
              raise ArgumentError, "All objects being inserted must have the same keys"
            end

            keys.map do |key|
              bind = Relation::QueryAttribute.new(key, attributes[key], types[key])
              connection.with_yaml_fallback(bind.value_for_database)
            end
          end

          Arel::InsertManager.new.create_values_list(values_list).to_sql
        end

        def returning
          quote_columns(insert_all.returning).join(",") if insert_all.returning
        end

        def conflict_target
          return unless conflict_columns
          sql = +"(#{quote_columns(conflict_columns).join(',')})"
          sql << " WHERE #{where}" if where
          sql
        end

        def updatable_columns
          quote_columns(insert_all.updatable_columns)
        end

        private
          attr_reader :connection, :insert_all

          def columns_list
            quote_columns(insert_all.keys).join(",")
          end

          def quote_columns(columns)
            columns.map(&connection.method(:quote_column_name))
          end

          def conflict_columns
            @conflict_columns ||= begin
              conflict_columns = insert_all.unique_by.fetch(:columns) if insert_all.unique_by
              conflict_columns ||= Array.wrap(model.primary_key) if update_duplicates?
              conflict_columns
            end
          end

          def where
            insert_all.unique_by && insert_all.unique_by[:where]
          end
      end
  end
end