I have a SQLite3 DB. I need to parse 10000 files. I read some data from each file, and then query the DB with this data to get a result. My code works fine in a single process environment. But I get an error when trying to use the mulitprocessing Pool.
My approach without multiprocessing (works OK):
1. Open DB connection object
2. for f in files:
foo(f, x1=x1, x2=x2, ..., db=DB)
3. Close DB
My approach with multiprocessing (does NOT work):
1. Open DB
2. pool = multiprocessing.Pool(processes=4)
3. pool.map(functools.partial(foo, x1=x1, x2=x2, ..., db=DB), [files])
4. pool.close()
5. Close DB
I get the following error: sqlite3.ProgrammingError: Base Cursor.__init__ not called.
My DB class is implemented as follows:
def open_db(sqlite_file):
"""Open SQLite database connection.
Args:
sqlite_file -- File path
Return:
Connection
"""
log.info('Open SQLite database %s', sqlite_file)
try:
conn = sqlite3.connect(sqlite_file)
except sqlite3.Error, e:
log.error('Unable to open SQLite database %s', e.args[0])
sys.exit(1)
return conn
def close_db(conn, sqlite_file):
"""Close SQLite database connection.
Args:
conn -- Connection
"""
if conn:
log.info('Close SQLite database %s', sqlite_file)
conn.close()
class MapDB:
def __init__(self, sqlite_file):
"""Initialize.
Args:
sqlite_file -- File path
"""
# 1. Open database.
# 2. Setup to receive data as dict().
# 3. Get cursor to execute queries.
self._sqlite_file = sqlite_file
self._conn = open_db(sqlite_file)
self._conn.row_factory = sqlite3.Row
self._cursor = self._conn.cursor()
def close(self):
"""Close DB connection."""
if self._cursor:
self._cursor.close()
close_db(self._conn, self._sqlite_file)
def check(self):
...
def get_driver_net(self, net):
...
def get_cell_id(self, net):
...
Function foo() looks like this:
def foo(f, x1, x2, db):
extract some data from file f
r1 = db.get_driver_net(...)
r2 = db.get_cell_id(...)
The overall not working implementation is as follows:
mapdb = MapDB(sqlite_file)
log.info('Create NetInfo objects')
pool = multiprocessing.Pool(processes=4)
files = [get list of files to process]
pool.map(functools.partial(foo, x1=x1, x2=x2, db=mapdb), files)
pool.close()
mapdb.close()
To fix this, I think I need to create the MapDB() object inside each pool worker (so have 4 parallel/independent connections). But I'm not sure how to do this. Can someone show me an example of how to accomplish this with Pool?
SQLite allows multiple processes to have the database file open at once, and for multiple processes to read the database at once. When any process wants to write, it must lock the entire database file for the duration of its update.
In serialized mode, SQLite can be safely used by multiple threads with no restriction.
Python SQLite3 module is used to integrate the SQLite database with Python. It is a standardized Python DBI API 2.0 and provides a straightforward and simple-to-use interface for interacting with SQLite databases. There is no need to install this module separately as it comes along with Python after the 2.5x version.
Connect To Database#!/usr/bin/python import sqlite3 conn = sqlite3. connect('test. db') print "Opened database successfully"; Here, you can also supply database name as the special name :memory: to create a database in RAM.
What about defining foo
like this:
def foo(f, x1, x2, db_path):
mapdb = MapDB(db_path)
... open mapdb
... process data ...
... close mapdb
and then change your pool.map call to:
pool.map(functools.partial(foo, x1=x1, x2=x2, db_path="path-to-sqlite3-db"), files)
Update
Another option is to handle the worker threads yourself and distribute work via a Queue
.
from Queue import Queue
from threading import Thread
q = Queue()
def worker():
mapdb = ...open the sqlite database
while True:
item = q.get()
if item[0] == "file":
file = item[1]
... process file ...
q.task_done()
else:
q.task_done()
break
...close sqlite connection...
# Start up the workers
nworkers = 4
for i in range(nworkers):
worker = Thread(target=worker)
worker.daemon = True
worker.start()
# Place work on the Queue
for x in ...list of files...:
q.put(("file",x))
# Place termination tokens onto the Queue
for i in range(nworkers):
q.put(("end",))
# Wait for all work to be done.
q.join()
The termination tokens are used to ensure that the sqlite connections are closed - in case that matters.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With