Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Dramatiq doesn't add tasks to the queue

I'm trying to run some dramatiq actors from my Falcon API method, like this:

def on_post(self, req, resp):
    begin_id = int(req.params["begin_id"])
    count = int(req.params["count"])

    for page_id in range(begin_id, begin_id + count):
        process_vk_page.send(f"https://vk.com/id{page_id}")

    resp.status = falcon.HTTP_200

My code gets to "send" method, goes through the loop without any problems. But where are no new tasks in the queue! Actor itself is not called, and "default" queue in my broker is empty. If I set custom queue, it is still empty. My actor looks like this:

@dramatiq.actor(broker=broker)
def process_vk_page(link: str):
   pass

Where broker is

broker = RabbitmqBroker(url="amqp://guest:guest@rabbitmq:5672")

RabbitMQ logs tell that it is connecting fine

I've done some additional research in debugger. It gets the message (which is meant to be sent to broker) fine, and broker.enqueue in Actor.send_with_options() returns no exceptions, although I can't really get it's internal logic. I don't really know why it fails, but it is definitely RabbitmqBroker.enqueue() which is causing the problem.

Broker is RabbitMQ 3.8.2 on Erlang 22.2.1, running in Docker from rabbitmq Docker Hub image with default settings. Dramatiq version is 1.7.0.

In RabbitMQ logs there are only connections to broker when app starts and disconnections when I turn it off, like this:

2020-01-05 08:25:35.622 [info] <0.594.0> accepting AMQP connection <0.594.0> (172.20.0.1:51242 -> 172.20.0.3:5672)
2020-01-05 08:25:35.627 [info] <0.594.0> connection <0.594.0> (172.20.0.1:51242 -> 172.20.0.3:5672): user 'guest' authenticated and granted access to vhost '/'
2020-01-05 08:28:35.625 [error] <0.597.0> closing AMQP connection <0.597.0> (172.20.0.1:51246 -> 172.20.0.3:5672):
missed heartbeats from client, timeout: 60s

Broker is defined in __init__.py of main package and imported in subpackages. I'm not sure that specifying the same broker instance in decorators of all the functions is fine, but where are nothing in docs which bans it. I guess it doesn't matter, since if I create new broker for each Actor it still doesn't work.

I've tried to set Redis as broker, but I still get the same issue.

What might be the reason for this?

like image 853
keddad Avatar asked Jan 02 '20 21:01

keddad


1 Answers

Most likely the issue is that you're not telling the workers which broker to use, since you're not declaring a default broker.

You haven't mentioned how your files are laid out in your application, but, assuming your broker is defined as broker inside tasks.py, then you would have to let your workers know about it like so:

dramatiq tasks:broker

See the examples at the end of dramatiq --help for more information and patterns.

like image 133
Bogdan Popa Avatar answered Sep 19 '22 11:09

Bogdan Popa