Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Progress reporting on dask's set_index

I am trying to wrap a progress indicator around the entire script. However, set_index(..., compute=False) does still run tasks on the scheduler, observable in the web interface.

How do I report on the progress of the set_index step?

import dask.dataframe as dd
from dask.distributed import Client, progress

if __name__ == '__main__':

  with Client() as client:

    df = dd.read_csv('big.csv')

    # I can see on the web interface that something is happening.
    # This blocks 20-30s on this particular CSV.
    df = df.set_index('id', compute=False)

    # Progress reporting works from here
    out = client.compute(
      df
    )
    progress(out)

    # out.result()
    # ...
like image 287
kadrach Avatar asked Nov 07 '22 15:11

kadrach


1 Answers

Unfortunately, there's no easy way to do this. set_index eagerly calls compute internally, because it needs to scan over your data to decide what the boundaries of the new partitions should be. There's no way to control this compute call and have it use a progress bar. If you think this would be worth exposing to users, you could open an issue to discuss it further.

Alternatively (the hard way), you could calculate the divisions of the DataFrame yourself (showing a progress bar while doing so), then pass them into set_index, to prevent set_index from doing the compute.

The downside here is you're replicating code that dask already has, and dask's method has some extra logic around repartitioning, detecting pre-sorted values and using a fastpath, etc. But in simple cases, something like this might work (note this is not using a public/documented API, so it could change without warning):

import dask
from dask.distributed import Client, progress

if __name__ == "__main__":

    with Client() as client:

        df = dask.datasets.timeseries()

        # Calculate divisions manually to show a progress bar
        from dask.dataframe.partitionquantiles import partition_quantiles

        divisions_ddf = partition_quantiles(df["id"], df.npartitions)
        future = client.compute(divisions_ddf)
        print("Computing divisions")
        progress(future)
        divisions = future.result().tolist()

        df = df.set_index("id", divisions=divisions, compute=False)

        out = client.compute(df)
        progress(out)

        # out.result()
        # ...
like image 56
Gabe Joseph Avatar answered Nov 27 '22 06:11

Gabe Joseph