Merge pull request #5698 from dougcole/support_postgresql_partitioning
Support postgresql partitioning by making INSERT RETURNING optional
This commit is contained in:
commit
8de4d71f5d
@ -59,7 +59,7 @@ def exec_query(sql, name = 'SQL', binds = [])
|
||||
# Executes insert +sql+ statement in the context of this connection using
|
||||
# +binds+ as the bind substitutes. +name+ is logged along with
|
||||
# the executed +sql+ statement.
|
||||
def exec_insert(sql, name, binds)
|
||||
def exec_insert(sql, name, binds, pk = nil, sequence_name = nil)
|
||||
exec_query(sql, name, binds)
|
||||
end
|
||||
|
||||
@ -87,7 +87,7 @@ def exec_update(sql, name, binds)
|
||||
# passed in as +id_value+.
|
||||
def insert(arel, name = nil, pk = nil, id_value = nil, sequence_name = nil, binds = [])
|
||||
sql, binds = sql_for_insert(to_sql(arel, binds), pk, id_value, sequence_name, binds)
|
||||
value = exec_insert(sql, name, binds)
|
||||
value = exec_insert(sql, name, binds, pk, sequence_name)
|
||||
id_value || last_inserted_id(value)
|
||||
end
|
||||
|
||||
|
@ -226,7 +226,7 @@ def insert_sql(sql, name = nil, pk = nil, id_value = nil, sequence_name = nil)
|
||||
end
|
||||
alias :create :insert_sql
|
||||
|
||||
def exec_insert(sql, name, binds)
|
||||
def exec_insert(sql, name, binds, pk = nil, sequence_name = nil)
|
||||
execute to_sql(sql, binds), name
|
||||
end
|
||||
|
||||
|
@ -17,7 +17,7 @@ def postgresql_connection(config) # :nodoc:
|
||||
# Forward any unused config params to PGconn.connect.
|
||||
[:statement_limit, :encoding, :min_messages, :schema_search_path,
|
||||
:schema_order, :adapter, :pool, :wait_timeout, :template,
|
||||
:reaping_frequency].each do |key|
|
||||
:reaping_frequency, :insert_returning].each do |key|
|
||||
conn_params.delete key
|
||||
end
|
||||
conn_params.delete_if { |k,v| v.nil? }
|
||||
@ -258,6 +258,8 @@ def simplified_type(field_type)
|
||||
# <encoding></tt> call on the connection.
|
||||
# * <tt>:min_messages</tt> - An optional client min messages that is used in a
|
||||
# <tt>SET client_min_messages TO <min_messages></tt> call on the connection.
|
||||
# * <tt>:insert_returning</tt> - An optional boolean to control the use or <tt>RETURNING</tt> for <tt>INSERT<tt> statements
|
||||
# defaults to true.
|
||||
#
|
||||
# Any further options are used as connection parameters to libpq. See
|
||||
# http://www.postgresql.org/docs/9.1/static/libpq-connect.html for the
|
||||
@ -405,6 +407,7 @@ def initialize(connection, logger, connection_parameters, config)
|
||||
|
||||
initialize_type_map
|
||||
@local_tz = execute('SHOW TIME ZONE', 'SCHEMA').first["TimeZone"]
|
||||
@use_insert_returning = @config.key?(:insert_returning) ? @config[:insert_returning] : true
|
||||
end
|
||||
|
||||
# Clears the prepared statements cache.
|
||||
@ -666,8 +669,11 @@ def insert_sql(sql, name = nil, pk = nil, id_value = nil, sequence_name = nil)
|
||||
pk = primary_key(table_ref) if table_ref
|
||||
end
|
||||
|
||||
if pk
|
||||
if pk && use_insert_returning?
|
||||
select_value("#{sql} RETURNING #{quote_column_name(pk)}")
|
||||
elsif pk
|
||||
super
|
||||
last_insert_id_value(sequence_name || default_sequence_name(table_ref, pk))
|
||||
else
|
||||
super
|
||||
end
|
||||
@ -782,11 +788,27 @@ def sql_for_insert(sql, pk, id_value, sequence_name, binds)
|
||||
pk = primary_key(table_ref) if table_ref
|
||||
end
|
||||
|
||||
sql = "#{sql} RETURNING #{quote_column_name(pk)}" if pk
|
||||
if pk && use_insert_returning?
|
||||
sql = "#{sql} RETURNING #{quote_column_name(pk)}"
|
||||
end
|
||||
|
||||
[sql, binds]
|
||||
end
|
||||
|
||||
def exec_insert(sql, name, binds, pk = nil, sequence_name = nil)
|
||||
val = exec_query(sql, name, binds)
|
||||
if !use_insert_returning? && pk
|
||||
unless sequence_name
|
||||
table_ref = extract_table_ref_from_insert_sql(sql)
|
||||
sequence_name = default_sequence_name(table_ref, pk)
|
||||
return val unless sequence_name
|
||||
end
|
||||
last_insert_id_result(sequence_name)
|
||||
else
|
||||
val
|
||||
end
|
||||
end
|
||||
|
||||
# Executes an UPDATE query and returns the number of affected tuples.
|
||||
def update_sql(sql, name = nil)
|
||||
super.cmd_tuples
|
||||
@ -1027,7 +1049,9 @@ def client_min_messages=(level)
|
||||
|
||||
# Returns the sequence name for a table's primary key or some other specified key.
|
||||
def default_sequence_name(table_name, pk = nil) #:nodoc:
|
||||
serial_sequence(table_name, pk || 'id').split('.').last
|
||||
result = serial_sequence(table_name, pk || 'id')
|
||||
return nil unless result
|
||||
result.split('.').last
|
||||
rescue ActiveRecord::StatementInvalid
|
||||
"#{table_name}_#{pk || 'id'}_seq"
|
||||
end
|
||||
@ -1235,6 +1259,10 @@ def extract_schema_and_table(name)
|
||||
end
|
||||
end
|
||||
|
||||
def use_insert_returning?
|
||||
@use_insert_returning
|
||||
end
|
||||
|
||||
protected
|
||||
# Returns the version of the connected PostgreSQL server.
|
||||
def postgresql_version
|
||||
@ -1364,8 +1392,15 @@ def configure_connection
|
||||
|
||||
# Returns the current ID of a table's sequence.
|
||||
def last_insert_id(sequence_name) #:nodoc:
|
||||
r = exec_query("SELECT currval($1)", 'SQL', [[nil, sequence_name]])
|
||||
Integer(r.rows.first.first)
|
||||
Integer(last_insert_id_value(sequence_name))
|
||||
end
|
||||
|
||||
def last_insert_id_value(sequence_name)
|
||||
last_insert_id_result(sequence_name).rows.first.first
|
||||
end
|
||||
|
||||
def last_insert_id_result(sequence_name) #:nodoc:
|
||||
exec_query("SELECT currval($1)", 'SQL', [[nil, sequence_name]])
|
||||
end
|
||||
|
||||
# Executes a SELECT query and returns the results, performing any data type
|
||||
|
@ -49,6 +49,33 @@ def test_insert_sql_with_no_space_after_table_name
|
||||
assert_equal expect, id
|
||||
end
|
||||
|
||||
def test_insert_sql_with_returning_disabled
|
||||
connection = connection_without_insert_returning
|
||||
id = connection.insert_sql("insert into postgresql_partitioned_table_parent (number) VALUES (1)")
|
||||
expect = connection.query('select max(id) from postgresql_partitioned_table_parent').first.first
|
||||
assert_equal expect, id
|
||||
end
|
||||
|
||||
def test_exec_insert_with_returning_disabled
|
||||
connection = connection_without_insert_returning
|
||||
result = connection.exec_insert("insert into postgresql_partitioned_table_parent (number) VALUES (1)", nil, [], 'id', 'postgresql_partitioned_table_parent_id_seq')
|
||||
expect = connection.query('select max(id) from postgresql_partitioned_table_parent').first.first
|
||||
assert_equal expect, result.rows.first.first
|
||||
end
|
||||
|
||||
def test_exec_insert_with_returning_disabled_and_no_sequence_name_given
|
||||
connection = connection_without_insert_returning
|
||||
result = connection.exec_insert("insert into postgresql_partitioned_table_parent (number) VALUES (1)", nil, [], 'id')
|
||||
expect = connection.query('select max(id) from postgresql_partitioned_table_parent').first.first
|
||||
assert_equal expect, result.rows.first.first
|
||||
end
|
||||
|
||||
def test_sql_for_insert_with_returning_disabled
|
||||
connection = connection_without_insert_returning
|
||||
result = connection.sql_for_insert('sql', nil, nil, nil, 'binds')
|
||||
assert_equal ['sql', 'binds'], result
|
||||
end
|
||||
|
||||
def test_serial_sequence
|
||||
assert_equal 'public.accounts_id_seq',
|
||||
@connection.serial_sequence('accounts', 'id')
|
||||
@ -204,6 +231,10 @@ def insert(ctx, data)
|
||||
|
||||
ctx.exec_insert(sql, 'SQL', binds)
|
||||
end
|
||||
|
||||
def connection_without_insert_returning
|
||||
ActiveRecord::Base.postgresql_connection(ActiveRecord::Base.configurations['arunit'].merge(:insert_returning => false))
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -1,8 +1,8 @@
|
||||
ActiveRecord::Schema.define do
|
||||
|
||||
%w(postgresql_tsvectors postgresql_hstores postgresql_arrays postgresql_moneys postgresql_numbers postgresql_times postgresql_network_addresses postgresql_bit_strings
|
||||
postgresql_oids postgresql_xml_data_type defaults geometrics postgresql_timestamp_with_zones).each do |table_name|
|
||||
execute "DROP TABLE IF EXISTS #{quote_table_name table_name}"
|
||||
postgresql_oids postgresql_xml_data_type defaults geometrics postgresql_timestamp_with_zones postgresql_partitioned_table postgresql_partitioned_table_parent).each do |table_name|
|
||||
execute "DROP TABLE IF EXISTS #{quote_table_name table_name}"
|
||||
end
|
||||
|
||||
execute 'DROP SEQUENCE IF EXISTS companies_nonstd_seq CASCADE'
|
||||
@ -10,6 +10,8 @@
|
||||
execute "ALTER TABLE companies ALTER COLUMN id SET DEFAULT nextval('companies_nonstd_seq')"
|
||||
execute 'DROP SEQUENCE IF EXISTS companies_id_seq'
|
||||
|
||||
execute 'DROP FUNCTION IF EXISTS partitioned_insert_trigger()'
|
||||
|
||||
%w(accounts_id_seq developers_id_seq projects_id_seq topics_id_seq customers_id_seq orders_id_seq).each do |seq_name|
|
||||
execute "SELECT setval('#{seq_name}', 100)"
|
||||
end
|
||||
@ -125,6 +127,29 @@
|
||||
);
|
||||
_SQL
|
||||
|
||||
execute <<_SQL
|
||||
CREATE TABLE postgresql_partitioned_table_parent (
|
||||
id SERIAL PRIMARY KEY,
|
||||
number integer
|
||||
);
|
||||
CREATE TABLE postgresql_partitioned_table ( )
|
||||
INHERITS (postgresql_partitioned_table_parent);
|
||||
|
||||
CREATE OR REPLACE FUNCTION partitioned_insert_trigger()
|
||||
RETURNS TRIGGER AS $$
|
||||
BEGIN
|
||||
INSERT INTO postgresql_partitioned_table VALUES (NEW.*);
|
||||
RETURN NULL;
|
||||
END;
|
||||
$$
|
||||
LANGUAGE plpgsql;
|
||||
|
||||
CREATE TRIGGER insert_partitioning_trigger
|
||||
BEFORE INSERT ON postgresql_partitioned_table_parent
|
||||
FOR EACH ROW EXECUTE PROCEDURE partitioned_insert_trigger();
|
||||
_SQL
|
||||
|
||||
|
||||
begin
|
||||
execute <<_SQL
|
||||
CREATE TABLE postgresql_xml_data_type (
|
||||
|
Loading…
Reference in New Issue
Block a user