Hi I have a setup where I'm using Celery Flask SqlAlchemy and I am intermittently getting this error:
(psycopg2.DatabaseError) SSL error: decryption failed or bad record mac
I followed this post:
Celery + SQLAlchemy : DatabaseError: (DatabaseError) SSL error: decryption failed or bad record mac
and also a few more and added a prerun and postrun methods:
@task_postrun.connect
def close_session(*args, **kwargs):
# Flask SQLAlchemy will automatically create new sessions for you from
# a scoped session factory, given that we are maintaining the same app
# context, this ensures tasks have a fresh session (e.g. session errors
# won't propagate across tasks)
d.session.remove()
@task_prerun.connect
def on_task_init(*args, **kwargs):
d.engine.dispose()
But I'm still seeing this error. Anyone solved this?
Note that I'm running this on AWS (with two servers accessing same database). Database itself is hosted on it's own server (not RDS). I believe the total celery background tasks running are 6 (2+4). Flask frontend is running using gunicorn.
My related thread: https://github.com/celery/celery/issues/3238#issuecomment-225975220
Here is my comment along with additional information:
I use Celery, SQLAlchemy and PostgreSQL on AWS and there is no such problem. The only difference I can think of is that I have the database on RDS. I think you can try switching to RDS temporary, just to test if the issue will be still present or not. If it disappered with RDS then you'll need to look into PostgreSQL settings.
According to the RDS paramters, I have SSL enabled:
ssl = 1, Enables SSL connections.
ssl_ca_file = /rdsdbdata/rds-metadata/ca-cert.pem
ssl_cert_file = /rdsdbdata/rds-metadata/server-cert.pem
ssl_ciphers = false, Sets the list of allowed SSL ciphers.
ssl_key_file = /rdsdbdata/rds-metadata/server-key.pem
ssl_renegotiation_limit = 0, integer, (kB) Set the amount of traffic to send and receive before renegotiating the encryption keys.
As for Celery initialization code, it is roughly this
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
import sqldb
engine = sqldb.get_engine()
cached_data = None
def do_the_work():
global engine, ruckus_data
if cached_data is not None:
return cached_data
db_session = None
try:
db_session = scoped_session(sessionmaker(
autocommit=False, autoflush=False, bind=engine))
data = sqldb.get_session().query(
sqldb.system.MyModel).filter_by(
my_type = sqldb.system.MyModel.TYPEA).all()
cached_data = {}
for row in data:
... # put row into cached_data
finally:
if db_session is not None:
db_session.remove()
return cached_data
This do_the_work
function is then called from the celery task.
The sqldb.get_engine
looks like this:
from sqlalchemy import create_engine
_engine = None
def get_engine():
global _engine
if _engine:
return _engine
_engine = create_engine(config.SQL_DB_URL, echo=config.SQL_DB_ECHO)
return _engine
Finally, the SQL_DB_URI and SQL_DB_ECHO in the config module are these:
SQL_DB_URL = 'postgresql+psycopg2://%s:%s@%s/%s' % (
POSTGRES_USER, POSTGRES_PASSWORD, POSTGRES_HOST, POSTGRES_DB_NAME)
SQL_DB_ECHO = False
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With