Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

decryption failed or bad record mac in multiprocessing

I am trying to get all the PC cores to work simultaneously while filling a PostgreSQL database, I have edited the code to make a reproducible error of what I am getting

Traceback (most recent call last):
  File "test2.py", line 50, in <module>
    download_all_sites(sites)
  File "test2.py", line 36, in download_all_sites
    pool.map(download_site, sites)
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 364, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 771, in get
    raise self._value
psycopg2.OperationalError: SSL error: decryption failed or bad record mac

The full code which makes the error

import requests
import multiprocessing
import time
import os
import psycopg2
session = None
conn = psycopg2.connect(user="user",
                        password="pass123",
                        host="127.0.0.1",
                        port="5432",
                        database="my_db")
cursor = conn.cursor()


def set_global_session():
    global session
    if not session:
        session = requests.Session()


def download_site(domain):
    url = "http://" + domain
    with session.get(url) as response:
        temp = response.text.lower()
        found = [i for i in keywords if i in temp]
        query = """INSERT INTO test (domain, keyword) VALUES (%s, %s)"""
        cursor.execute(query, (domain, found))


def download_all_sites(sites):
    with multiprocessing.Pool(processes=os.cpu_count(), initializer=set_global_session) as pool:
        pool.map(download_site, sites)


if __name__ == "__main__":
    sites = ['google.com'] * 10
    keywords = ['google', 'success']
    start_time = time.time()
    download_all_sites(sites)
    duration = time.time() - start_time
    conn.commit()
    print(f"Finished {len(sites)} in {duration} seconds")
like image 809
wishmaster Avatar asked May 21 '26 03:05

wishmaster


1 Answers

Create a new postgres connection for each multiprocess. Libpq connections shouldn’t be used with forked processes (what multiprocessing is doing), it is mentioned in the second warning box at the postgres docs.

import requests
import multiprocessing
import time
import os
import psycopg2
session = None    

def set_global_session():
    global session
    if not session:
        session = requests.Session()


def download_site(domain):
    url = "http://" + domain
    with session.get(url) as response:
        #temp = response.text.lower()
        #found = [i for i in keywords if i in temp]
        #query = """INSERT INTO test (domain, keyword) VALUES (%s, %s)"""
        conn = psycopg2.connect(
            "dbname=mf port=5959 host=localhost user=mf_usr"
        )
        cursor = conn.cursor()
        query = """INSERT INTO mytable (name) VALUES (%s)"""
        cursor.execute(query, (domain, ))
        conn.commit()
        conn.close()


def download_all_sites(sites):
    with multiprocessing.Pool(
        processes=os.cpu_count(), initializer=set_global_session
    ) as pool:
        pool.map(download_site, sites)


if __name__ == "__main__":
    sites = ['google.com'] * 10
    keywords = ['google', 'success']
    start_time = time.time()
    download_all_sites(sites)
    duration = time.time() - start_time
    print(f"Finished {len(sites)} in {duration} seconds")

    # make sure it worked!
    conn = psycopg2.connect("dbname=mf port=5959 host=localhost user=mf_usr")
    cursor = conn.cursor()
    cursor.execute('select count(name) from mytable')
    print(cursor.fetchall())  # verify 10 downloads == 10 records in database

Out:

Finished 10 in 0.9922008514404297 seconds
[(10,)]
like image 190
Maurice Meyer Avatar answered May 23 '26 16:05

Maurice Meyer



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!