We have the following structure of the project right now:
We decided to keep these modules completely independent and move them to different docker containers. When a query from a user arrives to the web-server it sends another query to the analytics module to get the recommendations.
For recommendations to be consistent we need to do some background calculations periodically and when, for instance, new users register within our system. Also some background tasks are connected purely with the web-server logic. For this purposes we decided to use a distributed task queue, e.g., Celery.
There are following possible scenarios of task creation and execution:
So far I see 3 rather weird possibilities to use Celery here:
I. Celery in separate container and does everything
This way, we loose isolation, as the functionality is shared by Celery container and other containers.
II. Celery in separate container and does much less
Same as I, but tasks now are just requests to web-server and analytics module, which are handled asynchronously there, with the result polled inside the task until it is ready.
This way, we get benefits from having the broker, but all heavy computations are moved from Celery workers.
III. Separate Celery in each container
This way, tasks scheduled at web-server could be executed in analytics module. However, still have to share code of tasks across the containers or use dummy tasks, and, additionally, need to run celery workers in each container.
What is the best way to do this, or the logic should be changed completely, e.g., move everything inside one container?
First, let clarify the difference between celery library (which you get with pip install
or in your setup.py
) and celery worker - which is the actual process that dequeue tasks from the broker and handle them. Of course you might wanna have multiple workers/processes (for separating different task to a different worker - for example).
Lets say you have two tasks: calculate_recommendations_task
and periodic_update_task
and you want to run them on a separate worker i.e recommendation_worker
and periodic_worker
.
Another process will be celery beat
which just enqueue the periodic_update_task
into the broker each x hours.
In addition, let's say you have simple web server implemented with bottle.
I'll assume you want to use celery broker & backend with docker too and I'll pick the recommended usage of celery - RabbitMQ as broker and Redis as backend.
So now we have 6 containers, I'll write them in a docker-compose.yml
:
version: '2'
services:
rabbit:
image: rabbitmq:3-management
ports:
- "15672:15672"
- "5672:5672"
environment:
- RABBITMQ_DEFAULT_VHOST=vhost
- RABBITMQ_DEFAULT_USER=guest
- RABBITMQ_DEFAULT_PASS=guest
redis:
image: library/redis
command: redis-server /usr/local/etc/redis/redis.conf
expose:
- "6379"
ports:
- "6379:6379"
recommendation_worker:
image: recommendation_image
command: celery worker -A recommendation.celeryapp:app -l info -Q recommendation_worker -c 1 -n recommendation_worker@%h -Ofair
periodic_worker:
image: recommendation_image
command: celery worker -A recommendation.celeryapp:app -l info -Q periodic_worker -c 1 -n periodic_worker@%h -Ofair
beat:
image: recommendation_image
command: <not sure>
web:
image: web_image
command: python web_server.py
both dockerfiles, which builds the recommendation_image
and the web_image
should install celery library. Only the recommendation_image
should have the tasks code because the workers are going to handle those tasks:
RecommendationDockerfile:
FROM python:2.7-wheezy
RUN pip install celery
COPY tasks_src_code..
WebDockerfile:
FROM python:2.7-wheezy
RUN pip install celery
RUN pip install bottle
COPY web_src_code..
The other images (rabbitmq:3-management
& library/redis
are available from docker hub and they will be pulled automatically when you run docker-compose up
).
Now here is the thing: In you web server you can trigger celery tasks by their string name and pull the result by task-ids (without sharing the code) web_server.py:
import bottle
from celery import Celery
rabbit_path = 'amqp://guest:guest@rabbit:5672/vhost'
celeryapp = Celery('recommendation', broker=rabbit_path)
celeryapp.config_from_object('config.celeryconfig')
@app.route('/trigger_task', method='POST')
def trigger_task():
r = celeryapp.send_task('calculate_recommendations_task', args=(1, 2, 3))
return r.id
@app.route('/trigger_task_res', method='GET')
def trigger_task_res():
task_id = request.query['task_id']
result = celery.result.AsyncResult(task_id, app=celeryapp)
if result.ready():
return result.get()
return result.state
last file config.celeryconfig.py:
CELERY_ROUTES = {
'calculate_recommendations_task': {
'exchange': 'recommendation_worker',
'exchange_type': 'direct',
'routing_key': 'recommendation_worker'
}
}
CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With