Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Bulk update of rows in Postgres DB using psycopg2

We need to do bulk updates of many rows in our Postgres DB, and want to use the SQL syntax below. How do we do that using psycopg2?

UPDATE table_to_be_updated
SET msg = update_payload.msg
FROM (VALUES %(update_payload)s) AS update_payload(id, msg)
WHERE table_to_be_updated.id = update_payload.id
RETURNING *

Attempt 1 - Passing values

We need to pass a nested iterable format to the psycopg2 query. For the update_payload, I've tried passing a list of lists, list of tuples, and tuples of tuples. It all fails with various errors.

Attempt 2 - Writing custom class with __conform__

I've tried to write a custom class that we can use for these operations, which would return

(VALUES (row1_col1, row1_col2), (row2_col1, row2_col2), (...))

I've coded up like this following instructions here, but it's clear that I'm doing something wrong. For instance, in this approach I'll have to handle quoting of all values inside the table, which would be cumbersome and prone to errors.

class ValuesTable(list):
    def __init__(self, *args, **kwargs):
        super(ValuesTable, self).__init__(*args, **kwargs)

    def __repr__(self):
        data_in_sql = ""
        for row in self:
            str_values = ", ".join([str(value) for value in row])
            data_in_sql += "({})".format(str_values)
        return "(VALUES {})".format(data_in_sql)

    def __conform__(self, proto):
        return self.__repr__()

    def getquoted(self):
        return self.__repr__()

    def __str__(self):
        return self.__repr__()

EDIT: If doing a bulk update can be done in a faster/cleaner way using another syntax than the one in my original question, then I'm all ears!

like image 708
pir Avatar asked Sep 20 '19 17:09

pir


1 Answers

Requirements:

  • Postgres table, consisting of the fields id and msg (and potentially other fields)
  • Python data containing new values for msg
  • Postgres table should be updated via psycopg2

Example Table

CREATE TABLE einstein(
   id CHAR(5) PRIMARY KEY,
   msg VARCHAR(1024) NOT NULL
);

Test data

INSERT INTO einstein VALUES ('a', 'empty');
INSERT INTO einstein VALUES ('b', 'empty');
INSERT INTO einstein VALUES ('c', 'empty');

Python Program

Hypothetical, self-contained example program with quotations of a famous physicist.

import sys
import psycopg2
from psycopg2.extras import execute_values


def print_table(con):
    cur = con.cursor()
    cur.execute("SELECT * FROM einstein")
    rows = cur.fetchall()
    for row in rows:
        print(f"{row[0]} {row[1]}")


def update(con, einstein_quotes):
    cur = con.cursor()
    execute_values(cur, """UPDATE einstein 
                           SET msg = update_payload.msg 
                           FROM (VALUES %s) AS update_payload (id, msg) 
                           WHERE einstein.id = update_payload.id""", einstein_quotes)
    con.commit()


def main():
    con = None
    einstein_quotes = [("a", "Few are those who see with their own eyes and feel with their own hearts."),
                       ("b", "I have no special talent. I am only passionately curious."),
                       ("c", "Life is like riding a bicycle. To keep your balance you must keep moving.")]

    try:
        con = psycopg2.connect("dbname='stephan' user='stephan' host='localhost' password=''")
        print_table(con)
        update(con, einstein_quotes)
        print("rows updated:")
        print_table(con)

    except psycopg2.DatabaseError as e:

        print(f'Error {e}')
        sys.exit(1)

    finally:

        if con:
            con.close()


if __name__ == '__main__':
    main()

Prepared Statements Alternative

import sys
import psycopg2
from psycopg2.extras import execute_batch


def print_table(con):
    cur = con.cursor()
    cur.execute("SELECT * FROM einstein")
    rows = cur.fetchall()
    for row in rows:
        print(f"{row[0]} {row[1]}")


def update(con, einstein_quotes, page_size):
    cur = con.cursor()
    cur.execute("PREPARE updateStmt AS UPDATE einstein SET msg=$1 WHERE id=$2")
    execute_batch(cur, "EXECUTE updateStmt (%(msg)s, %(id)s)", einstein_quotes, page_size=page_size)
    cur.execute("DEALLOCATE updateStmt")
    con.commit()


def main():
    con = None
    einstein_quotes = ({"id": "a", "msg": "Few are those who see with their own eyes and feel with their own hearts."},
                       {"id": "b", "msg": "I have no special talent. I am only passionately curious."},
                       {"id": "c", "msg": "Life is like riding a bicycle. To keep your balance you must keep moving."})

    try:
        con = psycopg2.connect("dbname='stephan' user='stephan' host='localhost' password=''")
        print_table(con)
        update(con, einstein_quotes, 100)  #choose some meaningful page_size here
        print("rows updated:")
        print_table(con)

    except psycopg2.DatabaseError as e:

        print(f'Error {e}')
        sys.exit(1)

    finally:

        if con:
            con.close()


if __name__ == '__main__':
    main()

Output

The above program would output the following to the debug console:

a     empty
b     empty
c     empty
rows updated:
a     Few are those who see with their own eyes and feel with their own hearts.
b     I have no special talent. I am only passionately curious.
c     Life is like riding a bicycle. To keep your balance you must keep moving.
like image 78
Stephan Schlecht Avatar answered Nov 06 '22 09:11

Stephan Schlecht