I'm using SQLAlchemy to connect to write a pandas DataFrame to a MySQL database. Early on in my code I create an SQLAlchemy engine:
engine = create_my_sqlalchemy_connection()
I execute some queries, do some calculations, and then try to use that same engine to write to the database a little later:
df.to_sql('my_table', engine, if_exists='append', index=False)
Sometimes this works, and sometimes the connection is lost by the time the code is ready to write to the DB, and there is an error.
I could do a try, except and create a new connection if needed:
try:
df.to_sql('my_table', engine, if_exists='append', index=False)
except:
engine = create_my_sqlalchemy_connection()
df.to_sql('my_table', engine, if_exists='append', index=False)
However, I thought I'd reach out and see if anyone knows of a better way (e.g. if there is some SQLAlchemy method that I am unaware of for testing to see if the connection still exists).
connect() method returns a Connection object, and by using it in a Python context manager (e.g. the with: statement) the Connection. close() method is automatically invoked at the end of the block.
SQLAlchemy includes several connection pool implementations which integrate with the Engine . They can also be used directly for applications that want to add pooling to an otherwise plain DBAPI approach.
Import necessary functions from the SQLAlchemy package. Establish connection with the PostgreSQL database using create_engine() function as shown below, create a table called books with columns book_id and book_price. Insert record into the tables using insert() and values() function as shown.
You can have SQLAlchemy check for the liveness of the connection with the parameter pool_pre_ping
: https://docs.sqlalchemy.org/en/13/core/engines.html#sqlalchemy.create_engine.params.pool_pre_ping
if True will enable the connection pool “pre-ping” feature that tests connections for liveness upon each checkout.
Simply enable it by using when you create your engine.
If you have problems with time outs when writing a Pandas Dataframe to a SQL server your Dataframe is probably quite large or there are many constraints that the database has to check when you insert.
To get around this you need to set the chunksize argument in the Pandas command:DataFrame.to_sql(name, con, schema=None, if_exists='fail', index=True, index_label=None, chunksize=None, dtype=None)
chunksize : int, optional
Rows will be written in batches of this size at a time. By default, all rows will be written at once.
I don't know how many rows you have but 10000 is probably a fine value. The problem with this is that if your write fails now you will have inserted some rows but not all =( and you won't know which.
Note: I test this 'connection_str' for docker container.
import sqlalchemy
You can test if you know table names, I always use this method.
connection_str = f'mysql+pymysql://{user}:{passwd}@{host}:{port}/{db}'
engine = sqlalchemy.create_engine(connection_str)
engine.connect()
metadata = sqlalchemy.MetaData(bind=engine)
metadata.reflect(only=['test_table'])
print(metadata.tables)
Correct Output >> Should be if connection is OK and table name is correct:
FacadeDict({'test_table': Table('test_table', MetaData(bind=Engine(mysql+pymysql://root:***@localhost:3306/test_db)), Column('id', INTEGER(), table=<test_table>, primary_key=True, nullable=False), Column('DATAORA', VARCHAR(length=100), table=<test_table>), Column('SPORT', VARCHAR(length=100), table=<test_table>), Column('PARTITA', VARCHAR(length=100), table=<test_table>), Column('NAZIONE', VARCHAR(length=100), table=<test_table>), Column('SCOMMESSA', VARCHAR(length=100), table=<test_table>), Column('RATING', VARCHAR(length=100), table=<test_table>), Column('BOOKMAKER', VARCHAR(length=100), table=<test_table>), Column('QUOTA1', VARCHAR(length=100), table=<test_table>), Column('EXCHANGE', VARCHAR(length=100), table=<test_table>), Column('QUOTA2', VARCHAR(length=100), table=<test_table>), Column('LIQUIDITA', VARCHAR(length=100), table=<test_table>), schema=None)})
Wrong Output >> If anything wrong (changed table name: test_table -> test_tables:
sqlalchemy.exc.InvalidRequestError: Could not reflect: requested table(s) not available in Engine(mysql+pymysql://root:***@localhost:3306/test_db): (test_tables)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With