Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is there a way to test an SQLAlchemy Connection?

Tags:

I'm using SQLAlchemy to connect to write a pandas DataFrame to a MySQL database. Early on in my code I create an SQLAlchemy engine:

engine = create_my_sqlalchemy_connection()

I execute some queries, do some calculations, and then try to use that same engine to write to the database a little later:

df.to_sql('my_table', engine, if_exists='append', index=False)

Sometimes this works, and sometimes the connection is lost by the time the code is ready to write to the DB, and there is an error.

I could do a try, except and create a new connection if needed:

try:
    df.to_sql('my_table', engine, if_exists='append', index=False)
except:
    engine = create_my_sqlalchemy_connection()
    df.to_sql('my_table', engine, if_exists='append', index=False)

However, I thought I'd reach out and see if anyone knows of a better way (e.g. if there is some SQLAlchemy method that I am unaware of for testing to see if the connection still exists).

like image 441
mgig Avatar asked Oct 04 '15 02:10

mgig


People also ask

Does SQLAlchemy close connection automatically?

connect() method returns a Connection object, and by using it in a Python context manager (e.g. the with: statement) the Connection. close() method is automatically invoked at the end of the block.

Does SQLAlchemy use connection pool?

SQLAlchemy includes several connection pool implementations which integrate with the Engine . They can also be used directly for applications that want to add pooling to an otherwise plain DBAPI approach.

How do I run SQLAlchemy?

Import necessary functions from the SQLAlchemy package. Establish connection with the PostgreSQL database using create_engine() function as shown below, create a table called books with columns book_id and book_price. Insert record into the tables using insert() and values() function as shown.


3 Answers

You can have SQLAlchemy check for the liveness of the connection with the parameter pool_pre_ping: https://docs.sqlalchemy.org/en/13/core/engines.html#sqlalchemy.create_engine.params.pool_pre_ping

if True will enable the connection pool “pre-ping” feature that tests connections for liveness upon each checkout.

Simply enable it by using when you create your engine.

like image 190
user787267 Avatar answered Sep 23 '22 16:09

user787267


If you have problems with time outs when writing a Pandas Dataframe to a SQL server your Dataframe is probably quite large or there are many constraints that the database has to check when you insert.
To get around this you need to set the chunksize argument in the Pandas command:
DataFrame.to_sql(name, con, schema=None, if_exists='fail', index=True, index_label=None, chunksize=None, dtype=None)

chunksize : int, optional
Rows will be written in batches of this size at a time. By default, all rows will be written at once.

I don't know how many rows you have but 10000 is probably a fine value. The problem with this is that if your write fails now you will have inserted some rows but not all =( and you won't know which.

like image 29
grofte Avatar answered Sep 25 '22 16:09

grofte


Note: I test this 'connection_str' for docker container.

import sqlalchemy

You can test if you know table names, I always use this method.

connection_str = f'mysql+pymysql://{user}:{passwd}@{host}:{port}/{db}'

engine = sqlalchemy.create_engine(connection_str)
engine.connect()

metadata = sqlalchemy.MetaData(bind=engine)
metadata.reflect(only=['test_table'])

print(metadata.tables)

Correct Output >> Should be if connection is OK and table name is correct:

FacadeDict({'test_table': Table('test_table', MetaData(bind=Engine(mysql+pymysql://root:***@localhost:3306/test_db)), Column('id', INTEGER(), table=<test_table>, primary_key=True, nullable=False), Column('DATAORA', VARCHAR(length=100), table=<test_table>), Column('SPORT', VARCHAR(length=100), table=<test_table>), Column('PARTITA', VARCHAR(length=100), table=<test_table>), Column('NAZIONE', VARCHAR(length=100), table=<test_table>), Column('SCOMMESSA', VARCHAR(length=100), table=<test_table>), Column('RATING', VARCHAR(length=100), table=<test_table>), Column('BOOKMAKER', VARCHAR(length=100), table=<test_table>), Column('QUOTA1', VARCHAR(length=100), table=<test_table>), Column('EXCHANGE', VARCHAR(length=100), table=<test_table>), Column('QUOTA2', VARCHAR(length=100), table=<test_table>), Column('LIQUIDITA', VARCHAR(length=100), table=<test_table>), schema=None)})

Wrong Output >> If anything wrong (changed table name: test_table -> test_tables:

sqlalchemy.exc.InvalidRequestError: Could not reflect: requested table(s) not available in Engine(mysql+pymysql://root:***@localhost:3306/test_db): (test_tables)
like image 45
Ömer ÇELEBİ Avatar answered Sep 23 '22 16:09

Ömer ÇELEBİ