After upgrading of dask distributed to version 1.15.0 my logging stopped working.
I've used logging.config.dictConfig to initialize python logging facilities, and previously these settings propagated to all workers. But after upgrade it doesn't work anymore.
If I do dictConfig right before every log call on every worker it works but it's not a proper solution.
So the question is how it initialize logging on every worker before my computation graph starts executing and do it only once per worker?
UPDATE:
This hack worked on a dummy example but didn't make a difference on my system:
def init_logging():
# logging initializing happens here
...
client = distributed.Client()
client.map(lambda _: init_logging, client.ncores())
UPDATE 2:
After digging through documentation this fixed the problem:
client.run(init_logging)
So the question now is: Is this a proper way to solve this problem?
Dask. distributed is a centrally managed, distributed, dynamic task scheduler. The central dask-scheduler process coordinates the actions of several dask-worker processes spread across multiple machines and the concurrent requests of several clients.
Workers keep the scheduler informed of their data and use that scheduler to gather data from other workers when necessary to perform a computation. You can start a worker with the dask-worker command line application: $ dask-worker scheduler-ip:port.
As of version 1.15.0 we now fork workers from a clean process, so changes that you make to your process prior to calling Client()
won't affect forked workers. For more information search for forkserver
here: https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
Your solution of using Client.run
looks good to me. Client.run is currently (as of version 1.15.0) the best way to call a function on all currently active workers.
It is worth noting that here you're setting up clients forked from the same process on a single computer. The trick you use above will not work in a distributed setting. In case people come to this question asking about how to handle logging with Dask in a cluster context I'm adding this note.
Generally Dask does not move logs around. Instead, it is common that whatever mechanism you used to launch Dask handles this. Job schedulers like SGE/SLURM/Torque/PBS all do this. Cloud systems like YARN/Mesos/Marathon/Kubernetes all do this. The dask-ssh
tool does this.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With