I am working in a Django web application which needs to query a PostgreSQL database. When implementing concurrency using Python threading interface, I am getting DoesNotExist
errors for the queried items. Of course, these errors do not occur when performing the queries sequentially.
Let me show a unit test which I wrote to demonstrate the unexpected behavior:
class ThreadingTest(TestCase):
fixtures = ['demo_city',]
def test_sequential_requests(self):
"""
A very simple request to database, made sequentially.
A fixture for the cities has been loaded above. It is supposed to be
six cities in the testing database now. We will made a request for
each one of the cities sequentially.
"""
for number in range(1, 7):
c = City.objects.get(pk=number)
self.assertEqual(c.pk, number)
def test_threaded_requests(self):
"""
Now, to test the threaded behavior, we will spawn a thread for
retrieving each city from the database.
"""
threads = []
cities = []
def do_requests(number):
cities.append(City.objects.get(pk=number))
[threads.append(threading.Thread(target=do_requests, args=(n,))) for n in range(1, 7)]
[t.start() for t in threads]
[t.join() for t in threads]
self.assertNotEqual(cities, [])
As you can see, the first test performs some database requests sequentially, which are indeed working with no problem. The second test, however, performs exactly the same requests but each request is spawned in a thread. This is actually failing, returning a DoesNotExist
exception.
The output of the execution of this unit tests is like this:
test_sequential_requests (cesta.core.tests.threadbase.ThreadingTest) ... ok
test_threaded_requests (cesta.core.tests.threadbase.ThreadingTest) ...
Exception in thread Thread-1:
Traceback (most recent call last):
File "/usr/lib/python2.6/threading.py", line 532, in __bootstrap_inner
self.run()
File "/usr/lib/python2.6/threading.py", line 484, in run
self.__target(*self.__args, **self.__kwargs)
File "/home/jose/Work/cesta/trunk/src/cesta/core/tests/threadbase.py", line 45, in do_requests
cities.append(City.objects.get(pk=number))
File "/home/jose/Work/cesta/trunk/parts/django/django/db/models/manager.py", line 132, in get
return self.get_query_set().get(*args, **kwargs)
File "/home/jose/Work/cesta/trunk/parts/django/django/db/models/query.py", line 349, in get
% self.model._meta.object_name)
DoesNotExist: City matching query does not exist.
... other threads returns a similar output ...
Exception in thread Thread-6:
Traceback (most recent call last):
File "/usr/lib/python2.6/threading.py", line 532, in __bootstrap_inner
self.run()
File "/usr/lib/python2.6/threading.py", line 484, in run
self.__target(*self.__args, **self.__kwargs)
File "/home/jose/Work/cesta/trunk/src/cesta/core/tests/threadbase.py", line 45, in do_requests
cities.append(City.objects.get(pk=number))
File "/home/jose/Work/cesta/trunk/parts/django/django/db/models/manager.py", line 132, in get
return self.get_query_set().get(*args, **kwargs)
File "/home/jose/Work/cesta/trunk/parts/django/django/db/models/query.py", line 349, in get
% self.model._meta.object_name)
DoesNotExist: City matching query does not exist.
FAIL
======================================================================
FAIL: test_threaded_requests (cesta.core.tests.threadbase.ThreadingTest)
----------------------------------------------------------------------
Traceback (most recent call last):
File "/home/jose/Work/cesta/trunk/src/cesta/core/tests/threadbase.py", line 52, in test_threaded_requests
self.assertNotEqual(cities, [])
AssertionError: [] == []
----------------------------------------------------------------------
Ran 2 tests in 0.278s
FAILED (failures=1)
Destroying test database for alias 'default' ('test_cesta')...
Remember that all this is happening in a PostgreSQL database, which is supposed to be thread safe, not with the SQLite or similars. Test was ran using PostgreSQL also.
At this point, I am totally lost about what can be failing. Any idea or suggestion?
Thanks!
EDIT: I wrote a little view just to check up if it works out of the tests. Here is the code of the view:
def get_cities(request):
queue = Queue.Queue()
def get_async_cities(q, n):
city = City.objects.get(pk=n)
q.put(city)
threads = [threading.Thread(target=get_async_cities, args=(queue, number)) for number in range(1, 5)]
[t.start() for t in threads]
[t.join() for t in threads]
cities = list()
while not queue.empty():
cities.append(queue.get())
return render_to_response('async/cities.html', {'cities': cities},
context_instance=RequestContext(request))
(Please, do not take into account the folly of writing the application logic inside the view code. Remember that this is only a proof of concept and would not be never in the real app.)
The result is that code is working nice, the requests are made successfully in threads and the view finally shows the cities after calling its URL.
So, I think making queries using threads will only be a problem when you need to test the code. In production, it will work without any problem.
Any useful suggestions to test this kind of code successfully?
Try using TransactionTestCase:
class ThreadingTest(TransactionTestCase):
TestCase keeps data in memory and doesn't issue a COMMIT to database. Probably the threads are trying to connect directly to DB, while the data is not commited there yet. Seedescription here: https://docs.djangoproject.com/en/dev/topics/testing/?from=olddocs#django.test.TransactionTestCase
TransactionTestCase and TestCase are identical except for the manner in which the database is reset to a known state and the ability for test code to test the effects of commit and rollback. A TransactionTestCase resets the database before the test runs by truncating all tables and reloading initial data. A TransactionTestCase may call commit and rollback and observe the effects of these calls on the database.
Becomes more clear from this part of the documentation
class LiveServerTestCase(TransactionTestCase):
"""
...
Note that it inherits from TransactionTestCase instead of TestCase because
the threads do not share the same transactions (unless if using in-memory
sqlite) and each thread needs to commit all their transactions so that the
other thread can see the changes.
"""
Now, the transaction has not been committed inside a TestCase, hence the changes are not visible to the other thread.
This sounds like it's an issue with transactions. If you're creating elements within the current request (or test), they're almost certainly in an uncommitted transaction that isn't accessible from the separate connection in the other thread. You probably need to manage your transctions manually to get this to work.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With