Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python multiprocessing pool hangs on map call

I have a function that parses a file and inserts the data into MySQL using SQLAlchemy. I've been running the function sequentially on the result of os.listdir() and everything works perfectly.

Because most of the time is spent reading the file and writing to the DB, I wanted to use multiprocessing to speed things up. Here is my pseduocode as the actual code is too long:

def parse_file(filename):
    f = open(filename, 'rb')
    data = f.read()
    f.close()

    soup = BeautifulSoup(data,features="lxml", from_encoding='utf-8')

    # parse file here

    db_record = MyDBRecord(parsed_data)

    session.add(db_record)
    session.commit()

pool = mp.Pool(processes=8)

pool.map(parse_file, ['my_dir/' + filename for filename in os.listdir("my_dir")])

The problem I'm seeing is that the script hangs and never finishes. I usually get 48 of 63 records into the database. Sometimes it's more, sometimes it's less.

I've tried using pool.close() and in combination with pool.join() and neither seems to help.

How do I get this script to complete? What am I doing wrong? I'm using Python 2.7.8 on a Linux box.

like image 325
wspeirs Avatar asked Oct 07 '15 15:10

wspeirs


2 Answers

The problem was a combination of 2 things:

  1. my pool code being called multiple times (thanks @Peter Wood)
  2. my DB code opening too many sessions (and/or) sharing sessions

I made the following changes and everything works now: Original File

def parse_file(filename):
    f = open(filename, 'rb')
    data = f.read()
    f.close()

    soup = BeautifulSoup(data,features="lxml", from_encoding='utf-8')

    # parse file here

    db_record = MyDBRecord(parsed_data)

    session = get_session() # see below
    session.add(db_record)
    session.commit()

pool = mp.Pool(processes=8)

pool.map(parse_file, ['my_dir/' + filename for filename in os.listdir("my_dir")])

DB File

def get_session():
    engine = create_engine('mysql://root:root@localhost/my_db')

    Base.metadata.create_all(engine)
    Base.metadata.bind = engine

    db_session = sessionmaker(bind=engine)

    return db_session()
like image 178
wspeirs Avatar answered Oct 11 '22 18:10

wspeirs


You need to put all code which uses multiprocessing, inside its own function. This stops it recursively launching new pools when multiprocessing re-imports your module in separate processes:

def parse_file(filename):
    ...

def main():
    pool = mp.Pool(processes=8)
    pool.map(parse_file, ['my_dir/' + filename for filename in os.listdir("my_dir")])

if __name__ == '__main__:
    main()

See the documentation about making sure your module is importable, also the advice for running on Windows(tm)

like image 21
Peter Wood Avatar answered Oct 11 '22 18:10

Peter Wood