Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

python multiprocessing + peewee + postgresql fails with SSL error

I am trying to write a Python model which is capable of doing some processing in a PostgreSQL database using the multi-threading module and peewee.

In single core mode the code works, however, when I try to run the code with multiple cores I am running into a SSL error.

I would like to post the structure of my model in the hope that somebody can advice how to set of my model in a proper way. Currently, I have chosen to use an object oriented approach in which I make one connection which is shared in a pool. To clarify what I have done, I will now show the source code I have so far

I have three files: main.py, models.py and parser.py. The contents is the following

models.py defines the peewee postgresql table and makes a connection to the postgres server

import peewee as pw
from playhouse.pool import PooledPostgresqlExtDatabase

KVK_KEY = "id_number"
NAME_KEY = "name"
N_VOWELS_KEY = "n_vowels"

# initialise the data base
database = PooledPostgresqlExtDatabase(
    "testdb", user="postgres", host="localhost", port=5432, password="xxxx",
    max_connections=8, stale_timeout=300 )


class BaseModel(pw.Model):
    class Meta:
        database = database
        only_save_dirty = True


# this class describes the format of the sql data base
class Company(BaseModel):
    id_number = pw.IntegerField(primary_key=True)
    name = pw.CharField(null=True)
    n_vowels = pw.IntegerField(default=-1)
    processor = pw.IntegerField(default=-1)


def connect_database(database_name, reset_database=False):
    """ connect the database """
    database.connect()
    if reset_database:
        database.drop_tables([Company])
    database.create_tables([Company])

parser.py contains the CompanyParser class which is used as the engine of the code to do all the processing. It generates some artificial data which is stored to the postgresql database and then the run method is used to do some processing with the data already stored in the database

import pandas as pd
import numpy as np
import random
import string
import peewee as pw
from models import (Company, database, KVK_KEY, NAME_KEY)
import multiprocessing as mp

MAX_SQL_CHUNK = 1000

np.random.seed(0)


def random_name(size=8, chars=string.ascii_lowercase):
    """ Create a random character string of 'size' characters """
    return "".join(random.choice(chars) for _ in range(size))


def vowel_count(characters):
    """
    Count the number of vowels in the string 'characters' and return as an integer
    """
    count = 0
    for char in characters:
        if char in list("aeiou"):
            count += 1
    return count


class CompanyParser(mp.Process):
    def __init__(self, number_of_companies=100, i_proc=None,
                 number_of_procs=1,
                 first_id=None, last_id=None):
        if i_proc is not None and number_of_procs > 1:
            mp.Process.__init__(self)

        self.i_proc = i_proc
        self.number_of_procs = number_of_procs
        self.n_companies = number_of_companies
        self.data_df: pd.DataFrame = None

        self.first_id = first_id
        self.last_id = last_id

    def generate_data(self):
        """ Create a dataframe with fake company data and id's """
        id_list = np.random.randint(1000000, 9999999, self.n_companies)
        company_list = np.array([random_name() for _ in range(self.n_companies)])
        self.data_df = pd.DataFrame(data=np.vstack([id_list, company_list]).T,
                                    columns=[KVK_KEY, NAME_KEY])
        self.data_df.sort_values([KVK_KEY], inplace=True)

    def store_to_database(self):
        """
        Store the company data to a sql database
        """
        record_list = list(self.data_df.to_dict(orient="index").values())

        n_batch = int(len(record_list) / MAX_SQL_CHUNK) + 1

        with database.atomic():
            for cnt, batch in enumerate(pw.chunked(record_list, MAX_SQL_CHUNK)):
                print(f"writing {cnt}/{n_batch}")
                Company.insert_many(batch).execute()

    def run(self):
        print("Making query at {}".format(self.i_proc))
        query = (Company.
                 select().
                 where(Company.id_number.between(self.first_id, self.last_id)))
        print("Found {} companies".format(query.count()))

        for cnt, company in enumerate(query):
            print("Processing @ {} - {}:  company {}/{}".format(self.i_proc, cnt,
                                                                company.id_number,
                                                                company.name))
            number_of_vowels = vowel_count(company.name)
            company.n_vowels = number_of_vowels
            company.processor = self.i_proc
            print(f"storing number of vowels: {number_of_vowels}")
            company.save()

Finally, my main script load the class stored in the models.py and parser.py and launches the code.

from models import (Company, connect_database)
from parser import CompanyParser

number_of_processors = 2
connect_database(None, reset_database=True)

# init an object of the CompanyParser and use the create database 
parser = CompanyParser()

company_ids = Company.select(Company.id_number)
parser.generate_data()
parser.store_to_database()

n_companies = company_ids.count()
n_comp_per_proc = int(n_companies / number_of_processors)
print("Found {} companies: {} per proc".format(n_companies, n_comp_per_proc))

for i_proc in range(number_of_processors):
    i_start = i_proc * n_comp_per_proc
    first_id = company_ids[i_start]
    last_id = company_ids[i_start + n_comp_per_proc - 1]

    print(f"Running proc {i_proc} for id {first_id} until id {last_id}")
    sub_parser = CompanyParser(first_id=first_id, last_id=last_id,
                               i_proc=i_proc,
                               number_of_procs=number_of_processors)

    if number_of_processors > 1:
        sub_parser.start()
    else:
        sub_parser.run()

In case that the number_of_processors = 1 this script works perfectly fine. It generates artificial data, stores it to the PostgreSQL database and does some processing on the data (it counts the number of vowels in the name and stores it to the n_vowels column)

However, in case I am trying to run this with 2 cores with number_of_processors = 2, I run into the following error

/opt/miniconda3/bin/python /home/eelco/PycharmProjects/multiproc_peewee/main.py
writing 0/1
Found 100 companies: 50 per proc
Running proc 0 for id 1020737 until id 5295565
Running proc 1 for id 5302405 until id 9891087
Making query at 0
Found 50 companies
Processing @ 0 - 0:  company 1020737/wqrbgxiu
storing number of vowels: 2
Making query at 1
Process CompanyParser-1:
Processing @ 0 - 1:  company 1086107/lkbagrbc
storing number of vowels: 1
Processing @ 0 - 2:  company 1298367/nsdjsqio
storing number of vowels: 2
Traceback (most recent call last):
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 2714, in execute_sql
    cursor.execute(sql, params or ())
psycopg2.OperationalError: SSL error: sslv3 alert bad record mac


During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/miniconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/home/eelco/PycharmProjects/multiproc_peewee/parser.py", line 82, in run
    company.save()
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 5748, in save
    rows = self.update(**field_dict).where(self._pk_expr()).execute()
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1625, in inner
    return method(self, database, *args, **kwargs)
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1696, in execute
    return self._execute(database)
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 2121, in _execute
    cursor = database.execute(self)
  File "/opt/miniconda3/lib/python3.7/site-packages/playhouse/postgres_ext.py", line 468, in execute
    cursor = self.execute_sql(sql, params, commit=commit)
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 2721, in execute_sql
    self.commit()
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 2512, in __exit__
    reraise(new_type, new_type(*exc_args), traceback)
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 186, in reraise
    raise value.with_traceback(tb)
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 2714, in execute_sql
    cursor.execute(sql, params or ())
peewee.OperationalError: SSL error: sslv3 alert bad record mac

Process CompanyParser-2:
Traceback (most recent call last):
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 2714, in execute_sql
    cursor.execute(sql, params or ())
psycopg2.OperationalError: SSL error: decryption failed or bad record mac


During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/miniconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/home/eelco/PycharmProjects/multiproc_peewee/parser.py", line 72, in run
    print("Found {} companies".format(query.count()))
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1625, in inner
    return method(self, database, *args, **kwargs)
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1881, in count
    return Select([clone], [fn.COUNT(SQL('1'))]).scalar(database)
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1625, in inner
    return method(self, database, *args, **kwargs)
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1866, in scalar
    row = self.tuples().peek(database)
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1625, in inner
    return method(self, database, *args, **kwargs)
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1853, in peek
    rows = self.execute(database)[:n]
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1625, in inner
    return method(self, database, *args, **kwargs)
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1696, in execute
    return self._execute(database)
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1847, in _execute
    cursor = database.execute(self)
  File "/opt/miniconda3/lib/python3.7/site-packages/playhouse/postgres_ext.py", line 468, in execute
    cursor = self.execute_sql(sql, params, commit=commit)
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 2721, in execute_sql
    self.commit()
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 2512, in __exit__
    reraise(new_type, new_type(*exc_args), traceback)
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 186, in reraise
    raise value.with_traceback(tb)
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 2714, in execute_sql
    cursor.execute(sql, params or ())
peewee.OperationalError: SSL error: decryption failed or bad record mac


Process finished with exit code 0

Somehow something goes wrong as soon as the second thread start to do something with the database. Does somebody has advice to get this code working. I have tried the following already

  • Try the PooledPostgresDatabase and normal PostgresqlDatabase to connect to the database. This leads to the same error
  • Try using sqlite in stead of postgres. This works for 2 cores, but only if the two processes are not interfering too much; otherwise I can some locking problems. I was in the impression that postgres would be better for doing multiprocessing then sqlite (is that true?)
  • When putting a break after launching the first process(so effectively using only one core), the code works, showing that the start method is called correctly.

Hopefully somebody can advise.

Regards Eelco

like image 426
Eelco van Vliet Avatar asked Jan 22 '19 22:01

Eelco van Vliet


1 Answers

After some searching on the internet today I found the solution for my problem here:github.com/coleifer. As coleifer mentions: you apparently first have to set up all the forks before you start connecting to the database. Based on this idea I have modified my code and it is working now.

For those interested I will post my python scripts again so you can see how I did it. This because I there is not so much explicit examples out there, so perhaps it may help others.

First of all, all the database and peewee modules are now moved into initialization functions which are only called inside the constructor of the CompanyParser class. So models.py looks like

import peewee as pw
from playhouse.pool import PooledPostgresqlExtDatabase, PostgresqlDatabase, PooledPostgresqlDatabase

KVK_KEY = "id_number"
NAME_KEY = "name"
N_VOWELS_KEY = "n_vowels"


def init_database():
    db = PooledPostgresqlDatabase(
        "testdb", user="postgres", host="localhost", port=5432, password="xxxxx",
        max_connections=8, stale_timeout=300)
    return db


def init_models(db, reset_tables=False):

    class BaseModel(pw.Model):
        class Meta:
            database = db

    # this class describes the format of the sql data base
    class Company(BaseModel):
        id_number = pw.IntegerField(primary_key=True)
        name = pw.CharField(null=True)
        n_vowels = pw.IntegerField(default=-1)
        processor = pw.IntegerField(default=-1)

    if db.is_closed():
        db.connect()
    if reset_tables and Company.table_exists():
        db.drop_tables([Company])
    db.create_tables([Company])

    return Company

Then, the worker class CompanyParser is defined in the parser.py script and looks like this

import multiprocessing as mp
import random
import string

import numpy as np
import pandas as pd
import peewee as pw

from models import (KVK_KEY, NAME_KEY, init_database, init_models)

MAX_SQL_CHUNK = 1000

np.random.seed(0)


def random_name(size=32, chars=string.ascii_lowercase):
    """ Create a random character string of 'size' characters """
    return "".join(random.choice(chars) for _ in range(size))


def vowel_count(characters):
    """
    Count the number of vowels in the string 'characters' and return as an integer
    """
    count = 0
    for char in characters:
        if char in list("aeiou"):
            count += 1
    return count


class CompanyParser(mp.Process):
    def __init__(self, reset_tables=False,
                 number_of_companies=100, i_proc=None,
                 number_of_procs=1, first_id=None, last_id=None):
        if i_proc is not None and number_of_procs > 1:
            mp.Process.__init__(self)

        self.i_proc = i_proc
        self.reset_tables = reset_tables

        self.number_of_procs = number_of_procs
        self.n_companies = number_of_companies
        self.data_df: pd.DataFrame = None

        self.first_id = first_id
        self.last_id = last_id

        # initialise the database and models
        self.database = init_database()
        self.Company = init_models(self.database, reset_tables=self.reset_tables)

    def generate_data(self):
        """ Create a dataframe with fake company data and id's and return the array of id's"""
        id_list = np.random.randint(1000000, 9999999, self.n_companies)
        company_list = np.array([random_name() for _ in range(self.n_companies)])
        self.data_df = pd.DataFrame(data=np.vstack([id_list, company_list]).T,
                                    columns=[KVK_KEY, NAME_KEY])
        self.data_df.drop_duplicates([KVK_KEY], inplace=True)
        self.data_df.sort_values([KVK_KEY], inplace=True)
        return self.data_df[KVK_KEY].values

    def store_to_database(self):
        """
        Store the company data to a sql database
        """
        record_list = list(self.data_df.to_dict(orient="index").values())

        n_batch = int(len(record_list) / MAX_SQL_CHUNK) + 1

        with self.database.atomic():
            for cnt, batch in enumerate(pw.chunked(record_list, MAX_SQL_CHUNK)):
                print(f"writing {cnt}/{n_batch}")
                self.Company.insert_many(batch).execute()

    def run(self):
        query = (self.Company.
                 select().
                 where(self.Company.id_number.between(self.first_id, self.last_id)))

        for cnt, company in enumerate(query):
            print("Processing @ {} - {}:  company {}/{}".format(self.i_proc, cnt, company.id_number,
                                                                company.name))
            number_of_vowels = vowel_count(company.name)
            company.n_vowels = number_of_vowels
            company.processor = self.i_proc
            try:
                company.save()
            except (pw.OperationalError, pw.InterfaceError) as err:
                print("failed save for {} {}: {}".format(self.i_proc, cnt, err))
            else:
                pass

Finally, the main.py script which launches the processes:

from parser import CompanyParser
import time


def main():
    number_of_processors = 2
    number_of_companies = 10000

    parser = CompanyParser(number_of_companies=number_of_companies, reset_tables=True)
    company_ids = parser.generate_data()
    parser.store_to_database()

    n_companies = company_ids.size
    n_comp_per_proc = int(n_companies / number_of_processors)
    print("Found {} companies: {} per proc".format(n_companies, n_comp_per_proc))
    if not parser.database.is_closed():
        parser.database.close()

    processes = list()
    for i_proc in range(number_of_processors):
        i_start = i_proc * n_comp_per_proc
        first_id = company_ids[i_start]
        last_id = company_ids[i_start + n_comp_per_proc - 1]

        print(f"Running proc {i_proc} for id {first_id} until id {last_id}")

        sub_parser = CompanyParser(first_id=first_id, last_id=last_id, i_proc=i_proc,
                                   number_of_procs=number_of_processors)

        if number_of_processors > 1:
            sub_parser.start()
        else:
            sub_parser.run()

        processes.append(sub_parser)

    # this blocks the script until all processes are done
    for job in processes:
        job.join()

    # make sure all the connections are closed
    for i_proc in range(number_of_processors):
        db = processes[i_proc].database
        if not db.is_closed():
            db.close()
    print("Goodbye!")


if __name__ == "__main__":

    start = time.time()
    main()
    duration = time.time() - start
    print(f"Done in {duration} s")

As you can see, the database connection is done per process inside the class. This example works and is a full example of multiprocessing + peewee and PostgreSQL. Hopefully this may help others. In case you have any comments or suggestions for improvement please let me know.

like image 171
Eelco van Vliet Avatar answered Sep 18 '22 22:09

Eelco van Vliet