diff options
Diffstat (limited to 'activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb')
-rw-r--r-- | activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb | 244 |
1 files changed, 185 insertions, 59 deletions
diff --git a/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb b/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb index 0a460bc086..e8a43e7bce 100644 --- a/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb +++ b/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb @@ -1,6 +1,6 @@ require 'active_record/connection_adapters/abstract_adapter' -require 'active_support/core_ext/kernel/requires' require 'active_support/core_ext/object/blank' +require 'active_record/connection_adapters/statement_pool' # Make sure we're using pg high enough for PGResult#values gem 'pg', '~> 0.11' @@ -199,7 +199,7 @@ module ActiveRecord # * <tt>:password</tt> - Defaults to nothing. # * <tt>:database</tt> - The name of the database. No default, must be provided. # * <tt>:schema_search_path</tt> - An optional schema search path for the connection given - # as a string of comma-separated schema names. This is backward-compatible with the <tt>:schema_order</tt> option. + # as a string of comma-separated schema names. This is backward-compatible with the <tt>:schema_order</tt> option. # * <tt>:encoding</tt> - An optional client encoding that is used in a <tt>SET client_encoding TO # <encoding></tt> call on the connection. # * <tt>:min_messages</tt> - An optional client min messages that is used in a @@ -247,6 +247,58 @@ module ActiveRecord true end + class StatementPool < ConnectionAdapters::StatementPool + def initialize(connection, max) + super + @counter = 0 + @cache = Hash.new { |h,pid| h[pid] = {} } + end + + def each(&block); cache.each(&block); end + def key?(key); cache.key?(key); end + def [](key); cache[key]; end + def length; cache.length; end + + def next_key + "a#{@counter + 1}" + end + + def []=(sql, key) + while @max <= cache.size + dealloc(cache.shift.last) + end + @counter += 1 + cache[sql] = key + end + + def clear + cache.each_value do |stmt_key| + dealloc stmt_key + end + cache.clear + end + + def delete(sql_key) + dealloc cache[sql_key] + cache.delete sql_key + end + + private + def cache + @cache[$$] + end + + def dealloc(key) + @connection.query "DEALLOCATE #{key}" if connection_active? + end + + def connection_active? + @connection.status == PGconn::CONNECTION_OK + rescue PGError + false + end + end + # Initializes and connects a PostgreSQL adapter. def initialize(connection, logger, connection_parameters, config) super(connection, logger) @@ -255,9 +307,10 @@ module ActiveRecord # @local_tz is initialized as nil to avoid warnings when connect tries to use it @local_tz = nil @table_alias_length = nil - @statements = {} connect + @statements = StatementPool.new @connection, + config.fetch(:statement_limit) { 1000 } if postgresql_version < 80200 raise "Your version of PostgreSQL (#{postgresql_version}) is too old, please upgrade!" @@ -266,11 +319,12 @@ module ActiveRecord @local_tz = execute('SHOW TIME ZONE', 'SCHEMA').first["TimeZone"] end + def self.visitor_for(pool) # :nodoc: + Arel::Visitors::PostgreSQL.new(pool) + end + # Clears the prepared statements cache. def clear_cache! - @statements.each_value do |value| - @connection.query "DEALLOCATE #{value}" - end @statements.clear end @@ -467,10 +521,11 @@ module ActiveRecord # Executes an INSERT query and returns the new record's ID def insert_sql(sql, name = nil, pk = nil, id_value = nil, sequence_name = nil) - # Extract the table from the insert sql. Yuck. - _, table = extract_schema_and_table(sql.split(" ", 4)[2]) - - pk ||= primary_key(table) + unless pk + # Extract the table from the insert sql. Yuck. + table_ref = extract_table_ref_from_insert_sql(sql) + pk = primary_key(table_ref) if table_ref + end if pk select_value("#{sql} RETURNING #{quote_column_name(pk)}") @@ -566,9 +621,9 @@ module ActiveRecord def sql_for_insert(sql, pk, id_value, sequence_name, binds) unless pk - _, table = extract_schema_and_table(sql.split(" ", 4)[2]) - - pk = primary_key(table) + # Extract the table from the insert sql. Yuck. + table_ref = extract_table_ref_from_insert_sql(sql) + pk = primary_key(table_ref) if table_ref end sql = "#{sql} RETURNING #{quote_column_name(pk)}" if pk @@ -614,12 +669,14 @@ module ActiveRecord # SCHEMA STATEMENTS ======================================== - def recreate_database(name) #:nodoc: + # Drops the database specified on the +name+ attribute + # and creates it again using the provided +options+. + def recreate_database(name, options = {}) #:nodoc: drop_database(name) - create_database(name) + create_database(name, options) end - # Create a new PostgreSQL database. Options include <tt>:owner</tt>, <tt>:template</tt>, + # Create a new PostgreSQL database. Options include <tt>:owner</tt>, <tt>:template</tt>, # <tt>:encoding</tt>, <tt>:tablespace</tt>, and <tt>:connection_limit</tt> (note that MySQL uses # <tt>:charset</tt> while PostgreSQL uses <tt>:encoding</tt>). # @@ -666,34 +723,33 @@ module ActiveRecord SQL end + # Returns true if table exists. + # If the schema is not specified as part of +name+ then it will only find tables within + # the current schema search path (regardless of permissions to access tables in other schemas) def table_exists?(name) - schema, table = extract_schema_and_table(name.to_s) + schema, table = Utils.extract_schema_and_table(name.to_s) + return false unless table - binds = [[nil, table.gsub(/(^"|"$)/,'')]] + binds = [[nil, table]] binds << [nil, schema] if schema exec_query(<<-SQL, 'SCHEMA', binds).rows.first[0].to_i > 0 SELECT COUNT(*) - FROM pg_tables - WHERE tablename = $1 - #{schema ? "AND schemaname = $2" : ''} + FROM pg_class c + LEFT JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE c.relkind in ('v','r') + AND c.relname = $1 + AND n.nspname = #{schema ? '$2' : 'ANY (current_schemas(false))'} SQL end - # Extracts the table and schema name from +name+ - def extract_schema_and_table(name) - schema, table = name.split('.', 2) - - unless table # A table was provided without a schema - table = schema - schema = nil - end - - if name =~ /^"/ # Handle quoted table names - table = name - schema = nil - end - [schema, table] + # Returns true if schema exists. + def schema_exists?(name) + exec_query(<<-SQL, 'SCHEMA', [[nil, name]]).rows.first[0].to_i > 0 + SELECT COUNT(*) + FROM pg_namespace + WHERE nspname = $1 + SQL end # Returns an array of indexes for the given table. @@ -743,6 +799,11 @@ module ActiveRecord query('select current_database()')[0][0] end + # Returns the current schema name. + def current_schema + query('SELECT current_schema', 'SCHEMA')[0][0] + end + # Returns the current database encoding format. def encoding query(<<-end_sql)[0][0] @@ -806,7 +867,7 @@ module ActiveRecord end if pk && sequence - quoted_sequence = quote_column_name(sequence) + quoted_sequence = quote_table_name(sequence) select_value <<-end_sql, 'Reset sequence' SELECT setval('#{quoted_sequence}', (SELECT COALESCE(MAX(#{quote_column_name pk})+(SELECT increment_by FROM #{quoted_sequence}), (SELECT min_value FROM #{quoted_sequence})) FROM #{quote_table_name(table)}), false) @@ -819,18 +880,25 @@ module ActiveRecord # First try looking for a sequence with a dependency on the # given table's primary key. result = exec_query(<<-end_sql, 'SCHEMA').rows.first - SELECT attr.attname, seq.relname + SELECT attr.attname, ns.nspname, seq.relname FROM pg_class seq INNER JOIN pg_depend dep ON seq.oid = dep.objid INNER JOIN pg_attribute attr ON attr.attrelid = dep.refobjid AND attr.attnum = dep.refobjsubid INNER JOIN pg_constraint cons ON attr.attrelid = cons.conrelid AND attr.attnum = cons.conkey[1] + INNER JOIN pg_namespace ns ON seq.relnamespace = ns.oid WHERE seq.relkind = 'S' AND cons.contype = 'p' AND dep.refobjid = '#{quote_table_name(table)}'::regclass end_sql # [primary_key, sequence] - [result.first, result.last] + if result.second == 'public' then + sequence = result.last + else + sequence = result.second+'.'+result.last + end + + [result.first, sequence] rescue nil end @@ -854,12 +922,14 @@ module ActiveRecord # Example: # rename_table('octopuses', 'octopi') def rename_table(name, new_name) + clear_cache! execute "ALTER TABLE #{quote_table_name(name)} RENAME TO #{quote_table_name(new_name)}" end # Adds a new column to the named table. # See TableDefinition#column for details of the options you can use. def add_column(table_name, column_name, type, options = {}) + clear_cache! add_column_sql = "ALTER TABLE #{quote_table_name(table_name)} ADD COLUMN #{quote_column_name(column_name)} #{type_to_sql(type, options[:limit], options[:precision], options[:scale])}" add_column_options!(add_column_sql, options) @@ -868,6 +938,7 @@ module ActiveRecord # Changes the column of a table. def change_column(table_name, column_name, type, options = {}) + clear_cache! quoted_table_name = quote_table_name(table_name) execute "ALTER TABLE #{quoted_table_name} ALTER COLUMN #{quote_column_name(column_name)} TYPE #{type_to_sql(type, options[:limit], options[:precision], options[:scale])}" @@ -878,10 +949,12 @@ module ActiveRecord # Changes the default value of a table column. def change_column_default(table_name, column_name, default) + clear_cache! execute "ALTER TABLE #{quote_table_name(table_name)} ALTER COLUMN #{quote_column_name(column_name)} SET DEFAULT #{quote(default)}" end def change_column_null(table_name, column_name, null, default = nil) + clear_cache! unless null || default.nil? execute("UPDATE #{quote_table_name(table_name)} SET #{quote_column_name(column_name)}=#{quote(default)} WHERE #{quote_column_name(column_name)} IS NULL") end @@ -890,6 +963,7 @@ module ActiveRecord # Renames a column in a table. def rename_column(table_name, column_name, new_column_name) + clear_cache! execute "ALTER TABLE #{quote_table_name(table_name)} RENAME COLUMN #{quote_column_name(column_name)} TO #{quote_column_name(new_column_name)}" end @@ -929,13 +1003,32 @@ module ActiveRecord # Construct a clean list of column names from the ORDER BY clause, removing # any ASC/DESC modifiers - order_columns = orders.collect { |s| s =~ /^(.+)\s+(ASC|DESC)\s*$/i ? $1 : s } + order_columns = orders.collect { |s| s.gsub(/\s+(ASC|DESC)\s*/i, '') } order_columns.delete_if { |c| c.blank? } order_columns = order_columns.zip((0...order_columns.size).to_a).map { |s,i| "#{s} AS alias_#{i}" } "DISTINCT #{columns}, #{order_columns * ', '}" end + module Utils + extend self + + # Returns an array of <tt>[schema_name, table_name]</tt> extracted from +name+. + # +schema_name+ is nil if not specified in +name+. + # +schema_name+ and +table_name+ exclude surrounding quotes (regardless of whether provided in +name+) + # +name+ supports the range of schema/table references understood by PostgreSQL, for example: + # + # * <tt>table_name</tt> + # * <tt>"table.name"</tt> + # * <tt>schema_name.table_name</tt> + # * <tt>schema_name."table.name"</tt> + # * <tt>"schema.name"."table name"</tt> + def extract_schema_and_table(name) + table, schema = name.scan(/[^".\s]+|"[^"]*"/)[0..1].collect{|m| m.gsub(/(^"|"$)/,'') }.reverse + [schema, table] + end + end + protected # Returns the version of the connected PostgreSQL server. def postgresql_version @@ -954,27 +1047,55 @@ module ActiveRecord end private - def exec_no_cache(sql, binds) - @connection.async_exec(sql) - end + FEATURE_NOT_SUPPORTED = "0A000" # :nodoc: - def exec_cache(sql, binds) - unless @statements.key? sql - nextkey = "a#{@statements.length + 1}" - @connection.prepare nextkey, sql - @statements[sql] = nextkey + def exec_no_cache(sql, binds) + @connection.async_exec(sql) end - key = @statements[sql] + def exec_cache(sql, binds) + begin + stmt_key = prepare_statement sql + + # Clear the queue + @connection.get_last_result + @connection.send_query_prepared(stmt_key, binds.map { |col, val| + type_cast(val, col) + }) + @connection.block + @connection.get_last_result + rescue PGError => e + # Get the PG code for the failure. Annoyingly, the code for + # prepared statements whose return value may have changed is + # FEATURE_NOT_SUPPORTED. Check here for more details: + # http://git.postgresql.org/gitweb/?p=postgresql.git;a=blob;f=src/backend/utils/cache/plancache.c#l573 + code = e.result.result_error_field(PGresult::PG_DIAG_SQLSTATE) + if FEATURE_NOT_SUPPORTED == code + @statements.delete sql_key(sql) + retry + else + raise e + end + end + end - # Clear the queue - @connection.get_last_result - @connection.send_query_prepared(key, binds.map { |col, val| - type_cast(val, col) - }) - @connection.block - @connection.get_last_result - end + # Returns the statement identifier for the client side cache + # of statements + def sql_key(sql) + "#{schema_search_path}-#{sql}" + end + + # Prepare the statement if it hasn't been prepared, return + # the statement key. + def prepare_statement(sql) + sql_key = sql_key(sql) + unless @statements.key? sql_key + nextkey = @statements.next_key + @connection.prepare nextkey, sql + @statements[sql_key] = nextkey + end + @statements[sql_key] + end # The internal PostgreSQL identifier of the money data type. MONEY_COLUMN_TYPE_OID = 790 #:nodoc: @@ -1074,9 +1195,14 @@ module ActiveRecord end end - def table_definition - TableDefinition.new(self) - end + def extract_table_ref_from_insert_sql(sql) + sql[/into\s+([^\(]*).*values\s*\(/i] + $1.strip if $1 + end + + def table_definition + TableDefinition.new(self) + end end end end |