I have a view that sends a message to a RabbitMQ
queue.
message = {'origin': 'Bytes CSV',
'data': {'csv_key': str(csv_entry.key),
'csv_fields': csv_fields
'order_by': order_by,
'filters': filters}}
...
queue_service.send(message=message, headers={}, exchange_name=EXCHANGE_IN_NAME,
routing_key=MESSAGES_ROUTING_KEY.replace('#', 'bytes_counting.create'))
On my consumer, I have a long process to generate a CSV.
def create(self, data):
csv_obj = self._get_object(key=data['csv_key'])
if csv_obj.status == CSVRequestStatus.CANCELED:
self.logger.info(f'CSV {csv_obj.key} was canceled by the user')
return
result = self.generate_result_data(filters=data['filters'], order_by=data['order_by'], csv_obj=csv_obj)
csv_data = self._generate_csv(result=result, csv_fields=data['csv_fields'], csv_obj=csv_obj)
file_key = self._post_csv(csv_data=csv_data, csv_obj=csv_obj)
csv_obj.status = CSVRequestStatus.READY
csv_obj.status_additional = CSVRequestStatusAdditional.SUCCESS
csv_obj.file_key = file_key
csv_obj.ready_at = timezone.now()
csv_obj.save(update_fields=['status', 'status_additional', 'ready_at', 'file_key'])
self.logger.info(f'CSV {csv_obj.name} created')
The long proccess happens inside self._generate_csv
, because self.generate_result_data
returns a queryset
, which is lazy.
As you can see, if a user changes the status of the csv_request
through an endpoint BEFORE the message starts to be consumed the proccess will not be evaluated. My goal is to let this happen during the execution of self._generate_csv
.
So far I tried to use Threading
, but unsuccessfully.
How can I achive my goal?
Thanks a lot!
Why don't you checkout Celery library ? Using celery with django with RabbitMQ backend is much easier than directly leveraging rabbitmq queues.
Celery has an inbuilt function revoke
to terminate an ongoing task:
>>> from celery.task.control import revoke
>>> revoke(task_id, terminate=True)
For your use case, you probably want something like (code snippets):
## celery/tasks.py
from celery import app
@app.task(queue="my_queue")
def create_csv(message):
# ...snip...
pass
## main.py
from celery import uuid, current_app
def start_task(task_id, message):
current_app.send_task(
"create_csv",
args=[message],
task_id=task_id,
)
def kill_task(task_id):
current_app.control.revoke(task_id, terminate=True)
## signals.py
from django.dispatch import receiver
from .models import MyModel
from .main import kill_task
# choose appropriate signal to listen for DB change
@receiver(models.signals.post_save, sender=MyModel)
def handler(sender, instance, **kwargs):
kill_task(instance.task_id)
celery.uuid
to generate task IDs which can be stored in DB or cache and use the same task ID to control the task i.e. request termination.If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With