I am trying to get all the PC cores to work simultaneously while filling a PostgreSQL database, I have edited the code to make a reproducible error of what I am getting
Traceback (most recent call last):
File "test2.py", line 50, in <module>
download_all_sites(sites)
File "test2.py", line 36, in download_all_sites
pool.map(download_site, sites)
File "/usr/lib/python3.8/multiprocessing/pool.py", line 364, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "/usr/lib/python3.8/multiprocessing/pool.py", line 771, in get
raise self._value
psycopg2.OperationalError: SSL error: decryption failed or bad record mac
The full code which makes the error
import requests
import multiprocessing
import time
import os
import psycopg2
session = None
conn = psycopg2.connect(user="user",
password="pass123",
host="127.0.0.1",
port="5432",
database="my_db")
cursor = conn.cursor()
def set_global_session():
global session
if not session:
session = requests.Session()
def download_site(domain):
url = "http://" + domain
with session.get(url) as response:
temp = response.text.lower()
found = [i for i in keywords if i in temp]
query = """INSERT INTO test (domain, keyword) VALUES (%s, %s)"""
cursor.execute(query, (domain, found))
def download_all_sites(sites):
with multiprocessing.Pool(processes=os.cpu_count(), initializer=set_global_session) as pool:
pool.map(download_site, sites)
if __name__ == "__main__":
sites = ['google.com'] * 10
keywords = ['google', 'success']
start_time = time.time()
download_all_sites(sites)
duration = time.time() - start_time
conn.commit()
print(f"Finished {len(sites)} in {duration} seconds")
Create a new postgres connection for each multiprocess. Libpq connections shouldn’t be used with forked processes (what multiprocessing is doing), it is mentioned in the second warning box at the postgres docs.
import requests
import multiprocessing
import time
import os
import psycopg2
session = None
def set_global_session():
global session
if not session:
session = requests.Session()
def download_site(domain):
url = "http://" + domain
with session.get(url) as response:
#temp = response.text.lower()
#found = [i for i in keywords if i in temp]
#query = """INSERT INTO test (domain, keyword) VALUES (%s, %s)"""
conn = psycopg2.connect(
"dbname=mf port=5959 host=localhost user=mf_usr"
)
cursor = conn.cursor()
query = """INSERT INTO mytable (name) VALUES (%s)"""
cursor.execute(query, (domain, ))
conn.commit()
conn.close()
def download_all_sites(sites):
with multiprocessing.Pool(
processes=os.cpu_count(), initializer=set_global_session
) as pool:
pool.map(download_site, sites)
if __name__ == "__main__":
sites = ['google.com'] * 10
keywords = ['google', 'success']
start_time = time.time()
download_all_sites(sites)
duration = time.time() - start_time
print(f"Finished {len(sites)} in {duration} seconds")
# make sure it worked!
conn = psycopg2.connect("dbname=mf port=5959 host=localhost user=mf_usr")
cursor = conn.cursor()
cursor.execute('select count(name) from mytable')
print(cursor.fetchall()) # verify 10 downloads == 10 records in database
Out:
Finished 10 in 0.9922008514404297 seconds
[(10,)]
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With