Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Hang in Python script using SQLAlchemy and multiprocessing

Consider the following Python script, which uses SQLAlchemy and the Python multiprocessing module. This is with Python 2.6.6-8+b1(default) and SQLAlchemy 0.6.3-3 (default) on Debian squeeze. This is a simplified version of some actual code.

import multiprocessing
from sqlalchemy import *
from sqlalchemy.orm import *
dbuser = ...
password = ...
dbname = ...
dbstring = "postgresql://%s:%s@localhost:5432/%s"%(dbuser, password, dbname)
db = create_engine(dbstring)
m = MetaData(db)

def make_foo(i):
    t1 = Table('foo%s'%i, m, Column('a', Integer, primary_key=True))

conn = db.connect()
for i in range(10):
    conn.execute("DROP TABLE IF EXISTS foo%s"%i)
conn.close()
db.dispose()

for i in range(10):
    make_foo(i)

m.create_all()

def do(kwargs):
    i, dbstring = kwargs['i'], kwargs['dbstring']

    db = create_engine(dbstring)
    Session = scoped_session(sessionmaker())
    Session.configure(bind=db)
    Session.execute("COMMIT; BEGIN; TRUNCATE foo%s; COMMIT;")
    Session.commit()
    db.dispose()

pool = multiprocessing.Pool(processes=5)               # start 4 worker processes
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i, 'dbstring':dbstring})
r = pool.map_async(do, arglist, callback=results.append) # evaluate "f(10)" asynchronously
r.get()
r.wait()
pool.close()
pool.join()

This script hangs with the following error message.

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.6/threading.py", line 532, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.6/threading.py", line 484, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.6/multiprocessing/pool.py", line 259, in _handle_results
    task = get()
TypeError: ('__init__() takes at least 4 arguments (2 given)', <class 'sqlalchemy.exc.ProgrammingError'>, ('(ProgrammingError) syntax error at or near "%"\nLINE 1: COMMIT; BEGIN; TRUNCATE foo%s; COMMIT;\n        ^\n',))

Of course, the syntax error here is TRUNCATE foo%s;. My question is, why is the process hanging, and can I persuade it to exit with an error instead, without doing major surgery to my code? This behavior is very similar to that of my actual code.

Note that the hang does not occur if the statement is replaced by something like print foobarbaz. Also, the hang still happens if we replace

Session.execute("COMMIT; BEGIN; TRUNCATE foo%s; COMMIT;")
Session.commit()
db.dispose()

by just Session.execute("TRUNCATE foo%s;")

I'm using the former version because it is closer to what my actual code is doing.

Also, removing multiprocessing from the picture and looping over the tables serially makes the hang go away, and it just exits with an error.

I'm also kind of puzzled by the form of the error, particularly the TypeError: ('__init__() takes at least 4 arguments (2 given)' bit. Where is this error coming from? It seems likely it is from somewhere in the multiprocessing code.

The PostgreSQL logs aren't helpful. I see lots of lines like

2012-01-09 14:16:34.174 IST [7810] 4f0aa96a.1e82/1 12/583 0 ERROR:  syntax error at or near "%" at character 28
2012-01-09 14:16:34.175 IST [7810] 4f0aa96a.1e82/2 12/583 0 STATEMENT:  COMMIT; BEGIN; TRUNCATE foo%s; COMMIT;

but nothing else that seems relevant.

UPDATE 1: Thanks to lbolla and his insightful analysis, I was able to file a Python bug report about this. See sbt's analysis in that report, and also here. See also the Python bug report Fix exception pickling. So, following sbt's explanation, we can reproduce the original error with

import sqlalchemy.exc
e = sqlalchemy.exc.ProgrammingError("", {}, None)
type(e)(*e.args)

which gives

Traceback (most recent call last):
  File "<stdin>", line 9, in <module>
TypeError: __init__() takes at least 4 arguments (2 given)

UPDATE 2: This has been fixed, at least for SQLAlchemy, by Mike Bayer, see the bug report StatementError Exceptions un-pickable.. Per Mike's suggestion, I also reported a similar bug to psycopg2, though I didn't (and don't) have an actual example of breakage. Regardless, they have apparently fixed it, though they gave no details of the fix. See psycopg exceptions cannot be pickled. For good measure, I also reported a Python bug ConfigParser exceptions are not pickleable corresponding to the SO question lbolla mentioned. It seems they want a test for this.

Anyway, this looks like it will continue to be a problem in the foreseeable future, since, by and large, Python developers don't seem to be aware of this issue and so don't guard against it. Surprisingly, it seems that there are not enough people using multiprocessing for this to be a well known issue, or maybe they just put up with it. I hope the Python developers get around to fixing it at least for Python 3, because it is annoying.

I accepted lbolla's answer, as without his explanation of how the problem was related to exception handling, I would likely have gone nowhere in understanding this. I also want to thank sbt, who explained that Python not being able to pickle exceptions was the problem. I'm very grateful to both of them, and please vote their answers up. Thanks.

UPDATE 3: I posted a followup question: Catching unpickleable exceptions and re-raising.

like image 275
Faheem Mitha Avatar asked Jan 09 '12 09:01

Faheem Mitha


3 Answers

I believe the TypeError comes from multiprocessing's get.

I've stripped out all the DB code from your script. Take a look at this:

import multiprocessing
import sqlalchemy.exc

def do(kwargs):
    i = kwargs['i']
    print i
    raise sqlalchemy.exc.ProgrammingError("", {}, None)
    return i


pool = multiprocessing.Pool(processes=5)               # start 4 worker processes
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append) # evaluate "f(10)" asynchronously

# Use get or wait?
# r.get()
r.wait()

pool.close()
pool.join()
print results

Using r.wait returns the result expected, but using r.get raises TypeError. As describe in python's docs, use r.wait after a map_async.

Edit: I have to amend my previous answer. I now believe the TypeError comes from SQLAlchemy. I've amended my script to reproduce the error.

Edit 2: It looks like the problem is that multiprocessing.pool does not play well if any worker raises an Exception whose constructor requires a parameter (see also here).

I've amended my script to highlight this.

import multiprocessing

class BadExc(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.a = a

class GoodExc(Exception):
    def __init__(self, a=None):
        '''Optional param in the constructor.'''
        self.a = a

def do(kwargs):
    i = kwargs['i']
    print i
    raise BadExc('a')
    # raise GoodExc('a')
    return i

pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
    # set a timeout in order to be able to catch C-c
    r.get(1e100)
except KeyboardInterrupt:
    pass
print results

In your case, given that your code raises an SQLAlchemy exception, the only solution I can think of is to catch all the exceptions in the do function and re-raise a normal Exception instead. Something like this:

import multiprocessing

class BadExc(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.a = a

def do(kwargs):
    try:
        i = kwargs['i']
        print i
        raise BadExc('a')
        return i
    except Exception as e:
        raise Exception(repr(e))

pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
    # set a timeout in order to be able to catch C-c
    r.get(1e100)
except KeyboardInterrupt:
    pass
print results

Edit 3: so, it seems to be a bug with Python, but proper exceptions in SQLAlchemy would workaround it: hence, I've raised the issue with SQLAlchemy, too.

As a workaround the problem, I think the solution at the end of Edit 2 would do (wrapping callbacks in try-except and re-raise).

like image 115
lbolla Avatar answered Nov 20 '22 16:11

lbolla


The TypeError: ('__init__() takes at least 4 arguments (2 given) error isn't related to the sql you're trying to execute, it has to do with how you're using SqlAlchemy's API.

The trouble is that you're trying to call execute on the session class rather than an instance of that session.

Try this:

session = Session()
session.execute("COMMIT; BEGIN; TRUNCATE foo%s; COMMIT;")
session.commit()

From the docs:

It is intended that the sessionmaker() function be called within the global scope of an application, and the returned class be made available to the rest of the application as the single class used to instantiate sessions.

So Session = sessionmaker() returns a new session class and session = Session() returns an instance of that class which you can then call execute on.

like image 36
Stephen Emslie Avatar answered Nov 20 '22 18:11

Stephen Emslie


I don't know about the cause of the original exception. However, multiprocessing's problems with "bad" exceptions is really down to how pickling works. I think the sqlachemy exception class is broken.

If an exception class has an __init__() method which does not call BaseException.__init__() (directly or indirectly) then self.args probably will not be set properly. BaseException.__reduce__() (which is used by the pickle protocol) assumes that a copy of an exception e can be recreated by just doing

type(e)(*e.args)

For example

>>> e = ValueError("bad value")
>>> e
ValueError('bad value',)
>>> type(e)(*e.args)
ValueError('bad value',)

If this invariant does not hold then pickling/unpickling will fail. So instances of

class BadExc(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.a = a

can be pickled, but the result cannot be unpickled:

>>> from cPickle import loads, dumps
>>> class BadExc(Exception):
...     def __init__(self, a):
...         '''Non-optional param in the constructor.'''
...         self.a = a
...
>>> loads(dumps(BadExc(1)))
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: ('__init__() takes exactly 2 arguments (1 given)', <class '__main__.BadExc'>, ())

But instances of

class GoodExc1(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        Exception.__init__(self, a)
        self.a = a

or

class GoodExc2(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.args = (a,)
        self.a = a

can be successfully pickled/unpickled.

So you should ask the developers of sqlalchemy to fix their exception classes. In the mean time you can probably use copy_reg.pickle() to override BaseException.__reduce__() for the troublesome classes.

like image 1
sbt Avatar answered Nov 20 '22 18:11

sbt