Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to stop the execution of a long process if something changes in the db?

I have a view that sends a message to a RabbitMQ queue.

message = {'origin': 'Bytes CSV',
           'data': {'csv_key': str(csv_entry.key),
                    'csv_fields': csv_fields
                    'order_by': order_by,
                    'filters': filters}}

...

queue_service.send(message=message, headers={}, exchange_name=EXCHANGE_IN_NAME,
                   routing_key=MESSAGES_ROUTING_KEY.replace('#', 'bytes_counting.create'))

On my consumer, I have a long process to generate a CSV.

def create(self, data):
    csv_obj = self._get_object(key=data['csv_key'])
    if csv_obj.status == CSVRequestStatus.CANCELED:
        self.logger.info(f'CSV {csv_obj.key} was canceled by the user')
        return

    result = self.generate_result_data(filters=data['filters'], order_by=data['order_by'], csv_obj=csv_obj)
    csv_data = self._generate_csv(result=result, csv_fields=data['csv_fields'], csv_obj=csv_obj)
    file_key = self._post_csv(csv_data=csv_data, csv_obj=csv_obj)

    csv_obj.status = CSVRequestStatus.READY
    csv_obj.status_additional = CSVRequestStatusAdditional.SUCCESS
    csv_obj.file_key = file_key
    csv_obj.ready_at = timezone.now()
    csv_obj.save(update_fields=['status', 'status_additional', 'ready_at', 'file_key'])

    self.logger.info(f'CSV {csv_obj.name} created')

The long proccess happens inside self._generate_csv, because self.generate_result_data returns a queryset, which is lazy.

As you can see, if a user changes the status of the csv_request through an endpoint BEFORE the message starts to be consumed the proccess will not be evaluated. My goal is to let this happen during the execution of self._generate_csv.

So far I tried to use Threading, but unsuccessfully.

How can I achive my goal?

Thanks a lot!

like image 217
Murilo Sitonio Avatar asked Nov 06 '22 01:11

Murilo Sitonio


1 Answers

Why don't you checkout Celery library ? Using celery with django with RabbitMQ backend is much easier than directly leveraging rabbitmq queues.

Celery has an inbuilt function revoke to terminate an ongoing task:

>>> from celery.task.control import revoke
>>> revoke(task_id, terminate=True)
  • related SO answer
  • celery docs

For your use case, you probably want something like (code snippets):

## celery/tasks.py
from celery import app

@app.task(queue="my_queue")
def create_csv(message):
    # ...snip...
    pass

## main.py
from celery import uuid, current_app

def start_task(task_id, message):
    current_app.send_task(
        "create_csv",
        args=[message],
        task_id=task_id,
    )

def kill_task(task_id):
    current_app.control.revoke(task_id, terminate=True)

## signals.py

from django.dispatch import receiver
from .models import MyModel
from .main import kill_task

# choose appropriate signal to listen for DB change
@receiver(models.signals.post_save, sender=MyModel)
def handler(sender, instance, **kwargs):
    kill_task(instance.task_id)
  • Use celery.uuid to generate task IDs which can be stored in DB or cache and use the same task ID to control the task i.e. request termination.
like image 115
eshaan7 Avatar answered Nov 12 '22 16:11

eshaan7