I've loaded some data and modified one column in the dataframe and would like to update the DB to reflect the changes.
I tried:
db.session.query(sqlTableName).update({sqlTableName.sql_col_name: pdDataframe.pd_col_name})
But that just wiped out the column in the database (set every value to '0', the default). I tried a few other dataformats with no luck. I'm guessing that there is something funky going on with datatypes that I've mixed up, or you just aren't allowed to update a column with a variable like this directly.
I could do this with a loop but... that would be genuinely awful. Sorry for the basic question, after a long break from a project, my grasp of sqlalchemy has certainly waned.
Update table elements in SQLAlchemy. Get the books to table from the Metadata object initialized while connecting to the database. Pass the update query to the execute() function and get all the results using fetchall() function. Use a for loop to iterate through the results.
Pandas in Python uses a module known as SQLAlchemy to connect to various databases and perform database operations.
The select() method of table object enables us to construct SELECT expression. The resultant variable is an equivalent of cursor in DBAPI. We can now fetch records using fetchone() method. Here, we have to note that select object can also be obtained by select() function in sqlalchemy.
For uploading the DataFrame to a temporary table and then performing an UPDATE you don't need to write the SQL yourself, you can have SQLAlchemy Core do it for you:
import pandas as pd
import sqlalchemy as sa
def update_table_columns_from_df(engine, df, table_name, cols_to_update):
metadata = sa.MetaData()
main_table = sa.Table(table_name, metadata, autoload_with=engine)
pk_columns = [x.name for x in main_table.primary_key.columns]
df.to_sql("temp_table", engine, index=False, if_exists="replace")
temp_table = sa.Table("temp_table", metadata, autoload_with=engine)
with engine.begin() as conn:
values_clause = {x: temp_table.columns[x] for x in cols_to_update}
where_clause = sa.and_(
main_table.columns[x] == temp_table.columns[x] for x in pk_columns
)
conn.execute(
main_table.update().values(values_clause).where(where_clause)
)
temp_table.drop(engine)
if __name__ == "__main__":
test_engine = sa.create_engine(
"postgresql+psycopg2://scott:[email protected]/test",
echo=True, # (for demonstration purposes)
)
with test_engine.begin() as test_conn:
test_conn.exec_driver_sql("DROP TABLE IF EXISTS main_table")
test_conn.exec_driver_sql(
"""\
CREATE TABLE main_table (
id1 integer NOT NULL,
id2 integer NOT NULL,
txt1 varchar(50),
txt2 varchar(50),
CONSTRAINT main_table_pkey PRIMARY KEY (id1, id2)
)
"""
)
test_conn.exec_driver_sql(
"""\
INSERT INTO main_table (id1, id2, txt1, txt2)
VALUES (1, 1, 'foo', 'x'), (1, 2, 'bar', 'y'), (1, 3, 'baz', 'z')
"""
)
df_updates = pd.DataFrame(
[
(1, 1, "new_foo", "new_x"),
(1, 3, "new_baz", "new_z"),
],
columns=["id1", "id2", "txt1", "txt2"],
)
update_table_columns_from_df(
test_engine, df_updates, "main_table", ["txt1", "txt2"]
)
"""SQL emitted:
UPDATE main_table
SET txt1=temp_table.txt1, txt2=temp_table.txt2
FROM temp_table
WHERE main_table.id1 = temp_table.id1 AND main_table.id2 = temp_table.id2
"""
df_result = pd.read_sql_query(
"SELECT * FROM main_table ORDER BY id1, id2", test_engine
)
print(df_result)
"""
id1 id2 txt1 txt2
0 1 1 new_foo new_x
1 1 2 bar y
2 1 3 new_baz new_z
"""
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With