Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Integration Testing Multiple Celery Workers and a DB Backed Django API

I'm working with a Software Oriented Architecture that has multiple celery workers (let's call them worker1, worker2, and worker3). All three workers are separate entities (i.e., separate code bases, separate repos, separate celery instances, separate machines) and none of them are connected to a Django app.

Communicating with each of these three workers is a Django based, MySQL backed RESTful API.

In development, these services are all on a vagrant box, each acting as a separate machine running off of a separate port. We have a single RabbitMQ broker for all of the Celery tasks.

A typical path through these services might look something like this: worker1 gets a message from a device, does some processing, queues up a task on worker2, which does further processing and makes a POST to the API, which writes to the MySQL DB and triggers a task on worker3, which does some other processing and makes another POST to the API which results in a MySQL write.

The services are communicating nicely, but it's very annoying to test this flow every time we make a change to any service. I really want to get some full integration tests (i.e., starting at a message sent to worker1 and going through the entire chain) in place but I'm not sure where to start. The main problems I'm facing are these:

If I queue up something on worker1, how can I possibly tell when the whole flow is over? How can I make reasonable assertions about results when I don't know if the results have even arrived?

How do I deal with DB set up/tear down? I want to delete all of the entries made during a test at the end of each test, but if I'm starting the test from outside of the Django app, I'm not sure how to efficiently clear it out. Manually deleting it and recreating it after every test seems like it might be too much overhead.

like image 592
user1427661 Avatar asked May 02 '14 15:05

user1427661


1 Answers

Celery allows to run task synchronously, so the first step is: Divide whole flow into separate tasks, fake requests and assert results:

Original flow:

device --- worker1 --- worker2 --- django --- worker3 --- django

First-level integration tests:

1.      |- worker1 -|
2.                  |- worker2 -|
3.                              |- django -|
4.                                         |- worker3 -|
5.                                                     |- django -|

For each test create fake request or synchronous call and assert results. Place these tests in corresponding repository. For example in test for worker1, you can mock worker2 and test that it has been called with proper arguments. Then, in another test, you will call worker2 and mock request to check, that it calls right API. And so on.

Testing whole flow, will be difficult, since all tasks are separate entities. The only way I've came up with now is to make one fake call to worker1, set reasonable timeout and wait for final result in database. This kind test only tells you if it works or not. It won't show you, where is the problem.

like image 133
Tomáš Ehrlich Avatar answered Nov 14 '22 18:11

Tomáš Ehrlich