Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reading LAZ to Dask dataframe using delayed loading

Action Reading multiple LAZ point cloud files to a Dask DataFrame.

Problem Unzipping LAZ (compressed) to LAS (uncompressed) requires a lot of memory. Varying filesizes and multiple processes created by Dask result in MemoryError's.

Attempts

I tried limiting the number of workers following the guide, but it does not seem to work.

from distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=3)
client = Client(cluster)

dfs = [load(file) for file in lasfiles]
df = dd.from_delayed(dfs, meta=meta)
df = df.repartition(npartitions=len(df) // part_size)
df.to_parquet('/raw', compression='GZIP')

Question How to go about loading such large amount of data in a non-standard format?

Example

Following example is my current implementation. It groups all input files per 5 to limit max 5 parallel uncompressing processes. Then repartitions and write to Parquet to enable further processing. To me this implementation seems to totally miss the point of Dask.

from laspy.file import File
import numpy as np
import pandas as pd
import dask.dataframe as dd
from dask.delayed import delayed

@delayed
def load(file):
    with File(file.as_posix(), mode='r') as las_data:
        las_df = pd.DataFrame(las_data.points['point'], dtype=float)
        return las_df

meta = pd.DataFrame(np.empty(0, dtype=[('X',float),('Y',float),('Z',float),('intensity',float),('raw_classification',int)]))

lasfile_dir = Path('/data/las/')
lasfiles = sorted(list(lasfile_dir.glob('*.laz')))

part_size = 5000000

for idx, sublasfiles in enumerate([lasfiles[i:i+5] for i in range(0,len(lasfiles),5)]):
    try:
        dfs = [load(file) for file in sublasfiles]
        df = dd.from_delayed(dfs, meta=meta)
        df = df.repartition(npartitions=len(df) // part_size)
        df.to_parquet('/data/las/parquet/'+str(idx), compression='GZIP')
like image 741
Tom Hemmes Avatar asked Apr 23 '26 08:04

Tom Hemmes


1 Answers

Your implementation seems mostly fine to me.

The one thing I would change here is that I would avoid the call to len(df), which will force a computation of the entire dataframe (there is no way to determine the length of the dataframe without reading through all of the files).

Just to be clear, Dask will not be able to parallelize within your load function (it has no concept of LAZ files), so your parallelism will be limited by the number of files that you have.

like image 50
MRocklin Avatar answered Apr 24 '26 22:04

MRocklin



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!