I have a function that parses a file and inserts the data into MySQL using SQLAlchemy. I've been running the function sequentially on the result of os.listdir()
and everything works perfectly.
Because most of the time is spent reading the file and writing to the DB, I wanted to use multiprocessing to speed things up. Here is my pseduocode as the actual code is too long:
def parse_file(filename):
f = open(filename, 'rb')
data = f.read()
f.close()
soup = BeautifulSoup(data,features="lxml", from_encoding='utf-8')
# parse file here
db_record = MyDBRecord(parsed_data)
session.add(db_record)
session.commit()
pool = mp.Pool(processes=8)
pool.map(parse_file, ['my_dir/' + filename for filename in os.listdir("my_dir")])
The problem I'm seeing is that the script hangs and never finishes. I usually get 48 of 63 records into the database. Sometimes it's more, sometimes it's less.
I've tried using pool.close()
and in combination with pool.join()
and neither seems to help.
How do I get this script to complete? What am I doing wrong? I'm using Python 2.7.8 on a Linux box.
The problem was a combination of 2 things:
I made the following changes and everything works now: Original File
def parse_file(filename):
f = open(filename, 'rb')
data = f.read()
f.close()
soup = BeautifulSoup(data,features="lxml", from_encoding='utf-8')
# parse file here
db_record = MyDBRecord(parsed_data)
session = get_session() # see below
session.add(db_record)
session.commit()
pool = mp.Pool(processes=8)
pool.map(parse_file, ['my_dir/' + filename for filename in os.listdir("my_dir")])
DB File
def get_session():
engine = create_engine('mysql://root:root@localhost/my_db')
Base.metadata.create_all(engine)
Base.metadata.bind = engine
db_session = sessionmaker(bind=engine)
return db_session()
You need to put all code which uses multiprocessing, inside its own function. This stops it recursively launching new pools when multiprocessing re-imports your module in separate processes:
def parse_file(filename):
...
def main():
pool = mp.Pool(processes=8)
pool.map(parse_file, ['my_dir/' + filename for filename in os.listdir("my_dir")])
if __name__ == '__main__:
main()
See the documentation about making sure your module is importable, also the advice for running on Windows(tm)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With