I am attempting to query a subset of a MySql database table, feed the results into a Pandas DataFrame, alter some data, and then write the updated rows back to the same table. My table size is ~1MM rows, and the number of rows I will be altering will be relatively small (<50,000) so bringing back the entire table and performing a df.to_sql(tablename,engine, if_exists='replace')
isn't a viable option. Is there a straightforward way to UPDATE the rows that have been altered without iterating over every row in the DataFrame?
I am aware of this project, which attempts to simulate an "upsert" workflow, but it seems it only accomplishes the task of inserting new non-duplicate rows rather than updating parts of existing rows:
GitHub Pandas-to_sql-upsert
Here is a skeleton of what I'm attempting to accomplish on a much larger scale:
import pandas as pd
from sqlalchemy import create_engine
import threading
#Get sample data
d = {'A' : [1, 2, 3, 4], 'B' : [4, 3, 2, 1]}
df = pd.DataFrame(d)
engine = create_engine(SQLALCHEMY_DATABASE_URI)
#Create a table with a unique constraint on A.
engine.execute("""DROP TABLE IF EXISTS test_upsert """)
engine.execute("""CREATE TABLE test_upsert (
A INTEGER,
B INTEGER,
PRIMARY KEY (A))
""")
#Insert data using pandas.to_sql
df.to_sql('test_upsert', engine, if_exists='append', index=False)
#Alter row where 'A' == 2
df_in_db.loc[df_in_db['A'] == 2, 'B'] = 6
Now I would like to write df_in_db
back to my 'test_upsert'
table with the updated data reflected.
This SO question is very similar, and one of the comments recommends using an "sqlalchemy table class" to perform the task.
Update table using sqlalchemy table class
Can anyone expand on how I would implement this for my specific case above if that is the best (only?) way to implement it?
Use the T attribute or the transpose() method to swap (= transpose) the rows and columns of pandas. DataFrame . Neither method changes the original object but returns a new object with the rows and columns swapped (= transposed object).
It is possible to update multiple rows in a single SQL Query. You can also call it a bulk update. Use the cursor. executemany() method of cursor object to update multiple rows of a table.
I think the easiest way would be to:
first delete those rows that are going to be "upserted". This can be done in a loop, but it's not very efficient for bigger data sets (5K+ rows), so i'd save this slice of the DF into a temporary MySQL table:
# assuming we have already changed values in the rows and saved those changed rows in a separate DF: `x`
x = df[mask] # `mask` should help us to find changed rows...
# make sure `x` DF has a Primary Key column as index
x = x.set_index('a')
# dump a slice with changed rows to temporary MySQL table
x.to_sql('my_tmp', engine, if_exists='replace', index=True)
conn = engine.connect()
trans = conn.begin()
try:
# delete those rows that we are going to "upsert"
engine.execute('delete from test_upsert where a in (select a from my_tmp)')
trans.commit()
# insert changed rows
x.to_sql('test_upsert', engine, if_exists='append', index=True)
except:
trans.rollback()
raise
PS i didn't test this code so it might have some small bugs, but it should give you an idea...
A MySQL specific solution using Panda's to_sql "method" arg and sqlalchemy's mysql insert on_duplicate_key_update features:
def create_method(meta):
def method(table, conn, keys, data_iter):
sql_table = db.Table(table.name, meta, autoload=True)
insert_stmt = db.dialects.mysql.insert(sql_table).values([dict(zip(keys, data)) for data in data_iter])
upsert_stmt = insert_stmt.on_duplicate_key_update({x.name: x for x in insert_stmt.inserted})
conn.execute(upsert_stmt)
return method
engine = db.create_engine(...)
conn = engine.connect()
with conn.begin():
meta = db.MetaData(conn)
method = create_method(meta)
df.to_sql(table_name, conn, if_exists='append', method=method)
I was struggling with this before and now I've found a way.
Basically create a separate data frame in which you keep data that you only have to update.
df #updating data in dataframe
s_update = "" #String of updations
Loop through data frame.
for i in range(len(df)):
s_update += "update your_table_name set column_name = '%s' where column_name = '%s';"%(df[col_name1][i], df[col_name2][i])
Now pass s_update to cursor.execute or engine.execute (wherever you execute SQL query)
This will update your data instantly.
Here is a general function that will update each row (but all values in the row simultaneously)
def update_table_from_df(df, table, where):
'''Will take a dataframe and update each specified row in the SQL table
with the DF values -- DF columns MUST match SQL columns
WHERE statement should be triple-quoted string
Will not update any columns contained in the WHERE statement'''
update_string = f'UPDATE {table} set '
for idx, row in df.iterrows():
upstr = update_string
for col in list(df.columns):
if (col != 'datetime') & (col not in where):
if col != df.columns[-1]:
if type(row[col] == str):
upstr += f'''{col} = '{row[col]}', '''
else:
upstr += f'''{col} = {row[col]}, '''
else:
if type(row[col] == str):
upstr += f'''{col} = '{row[col]}' '''
else:
upstr += f'''{col} = {row[col]} '''
upstr += where
cursor.execute(upstr)
cursor.commit()```
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With