I have an importer which takes a list of emails and saves them into a postgres database. Here is a snippet of code within a tableless importer class:
query_temporary_table = "CREATE TEMPORARY TABLE subscriber_imports (email CHARACTER VARYING(255)) ON COMMIT DROP;"
query_copy = "COPY subscriber_imports(email) FROM STDIN WITH CSV;"
query_delete = "DELETE FROM subscriber_imports WHERE email IN (SELECT email FROM subscribers WHERE suppressed_at IS NOT NULL OR list_id = #{list.id}) RETURNING email;"
query_insert = "INSERT INTO subscribers(email, list_id, created_at, updated_at) SELECT email, #{list.id}, NOW(), NOW() FROM subscriber_imports RETURNING id;"
conn = ActiveRecord::Base.connection_pool.checkout
conn.transaction do
raw = conn.raw_connection
raw.exec(query_temporary_table)
raw.exec(query_copy)
CSV.read(csv.path, headers: true).each do |row|
raw.put_copy_data row['email']+"\n" unless row.nil?
end
raw.put_copy_end
while res = raw.get_result do; end # very important to do this after a copy
result_delete = raw.exec(query_delete)
result_insert = raw.exec(query_insert)
ActiveRecord::Base.connection_pool.checkin(conn)
{
deleted: result_delete.count,
inserted: result_insert.count,
updated: 0
}
end
The issue I am having is that when I try to upload I get an exception:
PG::ERROR: another command is already in progress: ROLLBACK
This is all done in one action, the only other queries I am making are user validation and I have a DB mutex preventing overlapping imports. This query worked fine up until my latest push which included updating my pg gem to 0.14.1 from 0.13.2 (along with other "unrelated" code).
The error initially started on our staging server, but I was then able to reproduce it locally and am out of ideas.
If I need to be more clear with my question, let me know.
Thanks
Found my own answer, and this might be useful if anyone finds the same issue when importing loads of data using "COPY"
An exception is being thrown within the CSV.read() block, and I do catch it, but I was not ending the process correctly.
begin
CSV.read(csv.path, headers: true).each do |row|
raw.put_copy_data row['email']+"\n" unless row.nil?
end
ensure
raw.put_copy_end
while res = raw.get_result do; end # very important to do this after a copy
end
This block ensures that the COPY command is completed. I also added this at the end to release the connection back into the pool, without disrupting the flow in the case of a successful import:
rescue
ActiveRecord::Base.connection_pool.checkin(conn)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With