I have a Flask Python application where I'm using the multiprocessing library to execute a function in parallel with the main thread. The purpose of the function is to upload files to OneDrive. The function also needs to read and write from my DB.
My app uses a PostgreSQL database and SQLAlchemy. In order to access the DB from the temp threads, I'm creating a new SQLALchemy engine every time the function is executed. I'm getting a couple different errors when these temp threads are executing their function. The errors are very intermittent.
psycopg2.OperationalError: SSL error: decryption failed or bad record mac
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) SSL SYSCALL error: EOF detected
I've tried to research these errors, but many of the posts I've found have different enough setups that I'm not sure if they apply in my case. I've tried some of the solutions suggested in those posts but they are not preventing the errors. I'm not very experienced with SQLAlchemy so I'm flying blind here.
# Initialize the class that contains my upload function
vmb_client = vmb_controller()
# Upload files asynchronously
from multiprocessing import Process
p = Process(target=vmb_client.upload_file, args=(<arguments>))
p.start()
def upload_file(self, corp_index, filename):
#
# Set up new DB connection
#
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
engine = create_engine(db_uri)
Session = sessionmaker(bind=engine)
sess = Session()
#
# This does the API call to upload the file. It doesn't involve any DB interaction
#
results = self.call_upload_file(<arguments>)
#
# Track the uploaded file in the DB
#
command = f"""
-- Insert new item in DB
INSERT INTO corporate.vmb_items (corporation_key, onedrive_item_id, parent_id, type, name, created_datetime,
modified_datetime, path, web_url, child_count, size)
VALUES ({corp_index}, '{res.get('id')}', '{res.get('parent_id')}', 'file', '{res.get('name').replace("'", "''")}',
'{res.get('created_datetime')}', '{res.get('modified_datetime')}', '{res.get('path').replace("'", "''")}',
'{res.get('web_url').replace("'", "''")}', {res.get('child_count')}, '{res.get('size')}');"""
sess.execute(command)
#
# Update parent folder
#
command = f"""-- Update child count of parent folder
UPDATE corporate.vmb_items AS i SET child_count = (SELECT COUNT(vmb_item_key) FROM corporate.vmb_items AS sub WHERE sub.parent_id = '{res.get('parent_id')}')
WHERE i.onedrive_item_id = '{res.get('parent_id')}';"""
sess.execute(command)
#
# Cleanup
#
sess.commit()
sess.close()
engine.dispose()
return results
Based on other posts I saw related to the errors I'm getting, I tried a couple different parameters in my create_engine call, but those didn't make any difference:
from sqlalchemy.pool import NullPool
corporate_engine = create_engine(db_uri, pool_pre_ping=True, poolclass=NullPool)
I recommend you to enable debug logging for connection pool events. This may help you to understand which event your problem occurred in:
from sqlalchemy import create_engine
engine = create_engine("postgresql://scott:tiger@localhost/test", echo_pool="debug")
In my case, it was about resetting on rollback. Many processes request connection from pool at the same time. SQLAlchemy's default value for pool_reset_on_return parameter is True. Session makes a reset operation at returning the pool. I fixed it by doing these steps:
add engine.dispose() right before forking another process. It will release the parent process's connection and request a new one from pool.
when you add parameter pool_reset_on_return=None, in create_engine function (False also works on me), it won't reset on checkout.
This fix may cause other crucial problems, like leaving some uncommitted statements. That's why you should make sure your code is fine with this problem. Here is official documentation: SQLAlchemy Official Doc.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With