I'm trying to run some dramatiq actors from my Falcon API method, like this:
def on_post(self, req, resp):
begin_id = int(req.params["begin_id"])
count = int(req.params["count"])
for page_id in range(begin_id, begin_id + count):
process_vk_page.send(f"https://vk.com/id{page_id}")
resp.status = falcon.HTTP_200
My code gets to "send" method, goes through the loop without any problems. But where are no new tasks in the queue! Actor itself is not called, and "default" queue in my broker is empty. If I set custom queue, it is still empty. My actor looks like this:
@dramatiq.actor(broker=broker)
def process_vk_page(link: str):
pass
Where broker is
broker = RabbitmqBroker(url="amqp://guest:guest@rabbitmq:5672")
RabbitMQ logs tell that it is connecting fine
I've done some additional research in debugger. It gets the message (which is meant to be sent to broker) fine, and broker.enqueue in Actor.send_with_options() returns no exceptions, although I can't really get it's internal logic. I don't really know why it fails, but it is definitely RabbitmqBroker.enqueue() which is causing the problem.
Broker is RabbitMQ 3.8.2 on Erlang 22.2.1, running in Docker from rabbitmq Docker Hub image with default settings. Dramatiq version is 1.7.0.
In RabbitMQ logs there are only connections to broker when app starts and disconnections when I turn it off, like this:
2020-01-05 08:25:35.622 [info] <0.594.0> accepting AMQP connection <0.594.0> (172.20.0.1:51242 -> 172.20.0.3:5672)
2020-01-05 08:25:35.627 [info] <0.594.0> connection <0.594.0> (172.20.0.1:51242 -> 172.20.0.3:5672): user 'guest' authenticated and granted access to vhost '/'
2020-01-05 08:28:35.625 [error] <0.597.0> closing AMQP connection <0.597.0> (172.20.0.1:51246 -> 172.20.0.3:5672):
missed heartbeats from client, timeout: 60s
Broker is defined in __init__.py
of main package and imported in subpackages. I'm not sure that specifying the same broker instance in decorators of all the functions is fine, but where are nothing in docs which bans it. I guess it doesn't matter, since if I create new broker for each Actor it still doesn't work.
I've tried to set Redis as broker, but I still get the same issue.
What might be the reason for this?
Most likely the issue is that you're not telling the workers which broker to use, since you're not declaring a default broker.
You haven't mentioned how your files are laid out in your application, but, assuming your broker is defined as broker
inside tasks.py
, then you would have to let your workers know about it like so:
dramatiq tasks:broker
See the examples at the end of dramatiq --help
for more information and patterns.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With