Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python: Using sqlite3 with multiprocessing

Tags:

python

sqlite

I have a SQLite3 DB. I need to parse 10000 files. I read some data from each file, and then query the DB with this data to get a result. My code works fine in a single process environment. But I get an error when trying to use the mulitprocessing Pool.

My approach without multiprocessing (works OK):
1. Open DB connection object
2. for f in files: 
     foo(f, x1=x1, x2=x2, ..., db=DB)
3. Close DB

My approach with multiprocessing (does NOT work):
1. Open DB
2. pool = multiprocessing.Pool(processes=4) 
3. pool.map(functools.partial(foo, x1=x1, x2=x2, ..., db=DB), [files])
4. pool.close()
5. Close DB 

I get the following error: sqlite3.ProgrammingError: Base Cursor.__init__ not called.

My DB class is implemented as follows:

def open_db(sqlite_file):
    """Open SQLite database connection.

    Args:
    sqlite_file -- File path

    Return:
    Connection
    """

    log.info('Open SQLite database %s', sqlite_file)
    try:
        conn = sqlite3.connect(sqlite_file)
    except sqlite3.Error, e:
        log.error('Unable to open SQLite database %s', e.args[0])
        sys.exit(1)

    return conn

def close_db(conn, sqlite_file):
    """Close SQLite database connection.

    Args:
    conn -- Connection
    """

    if conn:
        log.info('Close SQLite database %s', sqlite_file)
        conn.close()

class MapDB:

    def __init__(self, sqlite_file):
        """Initialize.

        Args:
        sqlite_file -- File path
        """

        # 1. Open database.
        # 2. Setup to receive data as dict().
        # 3. Get cursor to execute queries.
        self._sqlite_file      = sqlite_file
        self._conn             = open_db(sqlite_file)
        self._conn.row_factory = sqlite3.Row
        self._cursor           = self._conn.cursor()

    def close(self):
        """Close DB connection."""

        if self._cursor:
            self._cursor.close()
        close_db(self._conn, self._sqlite_file)

    def check(self):
        ...

    def get_driver_net(self, net):
        ...

    def get_cell_id(self, net):
       ...

Function foo() looks like this:

def foo(f, x1, x2, db):

  extract some data from file f
  r1 = db.get_driver_net(...)
  r2 = db.get_cell_id(...)

The overall not working implementation is as follows:

mapdb = MapDB(sqlite_file)

log.info('Create NetInfo objects')
pool = multiprocessing.Pool(processes=4)
files = [get list of files to process]                 
pool.map(functools.partial(foo, x1=x1, x2=x2, db=mapdb), files)    
pool.close()
mapdb.close()

To fix this, I think I need to create the MapDB() object inside each pool worker (so have 4 parallel/independent connections). But I'm not sure how to do this. Can someone show me an example of how to accomplish this with Pool?

like image 621
user4979733 Avatar asked May 13 '16 01:05

user4979733


People also ask

Does SQLite support multiprocessing?

SQLite allows multiple processes to have the database file open at once, and for multiple processes to read the database at once. When any process wants to write, it must lock the entire database file for the duration of its update.

Is SQLite multithreaded?

In serialized mode, SQLite can be safely used by multiple threads with no restriction.

Is SQLite good for Python?

Python SQLite3 module is used to integrate the SQLite database with Python. It is a standardized Python DBI API 2.0 and provides a straightforward and simple-to-use interface for interacting with SQLite databases. There is no need to install this module separately as it comes along with Python after the 2.5x version.

How does Python integrate with SQLite?

Connect To Database#!/usr/bin/python import sqlite3 conn = sqlite3. connect('test. db') print "Opened database successfully"; Here, you can also supply database name as the special name :memory: to create a database in RAM.


1 Answers

What about defining foo like this:

def foo(f, x1, x2, db_path):
    mapdb = MapDB(db_path)
    ... open mapdb
    ... process data ...
    ... close mapdb

and then change your pool.map call to:

pool.map(functools.partial(foo, x1=x1, x2=x2, db_path="path-to-sqlite3-db"), files)    

Update

Another option is to handle the worker threads yourself and distribute work via a Queue.

from Queue import Queue
from threading import Thread

q = Queue()

def worker():
  mapdb = ...open the sqlite database
  while True:
    item = q.get()
    if item[0] == "file":
      file = item[1]
      ... process file ...
      q.task_done()
    else:
      q.task_done()
      break
  ...close sqlite connection...

# Start up the workers

nworkers = 4

for i in range(nworkers):
  worker = Thread(target=worker)
  worker.daemon = True
  worker.start()

# Place work on the Queue

for x in ...list of files...:
  q.put(("file",x))

# Place termination tokens onto the Queue

for i in range(nworkers):
  q.put(("end",))

# Wait for all work to be done.

q.join()

The termination tokens are used to ensure that the sqlite connections are closed - in case that matters.

like image 182
ErikR Avatar answered Sep 30 '22 18:09

ErikR