Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

memory usage when indexing a large dask dataframe on a single multicore machine

Tags:

I am trying to turn the Wikipedia CirrusSearch dump into Parquet backed dask dataframe indexed by title on a 450G 16-core GCP instance. CirrusSearch dumps come as a single json line formatted file. The English Wipedia dumps contain 5M recards and are 12G compressed & 90+G expanded. An important detail is that the records are not completely flat.

The simplest way to do this would be

import json
import dask
from  dask import bag as db, dataframe as ddf
from  toolz import curried as tz
from toolz.curried import operator as op

blocksize=2**24
npartitions='auto'
parquetopts=dict(engine='fastparquet', object_encoding='json')

lang = 'en'
wiki = 'wiki'
date = 20180625
path='./'

source = f'{path}{lang}{wiki}-{date}-cirrussearch-content.json'

(
 db
 .read_text(source, blocksize=blocksize)
 .map(json.loads)
 .filter(tz.flip(op.contains, 'title'))
 .to_dataframe()
 .set_index('title', npartitions=npartitions)
 .to_parquet(f'{lang}{wiki}-{date}-cirrussearch.pq', **parquetopts)
)

The first problem is that with the default scheduler this utilizes only one core. That problem can be avoided by explictly using either the distributed or multiprocessing schedulers.

The bigger problem with all schedulers and settings I have tried is memory usage. It appears that dask tries to load the entire dataframe into memory when indexing. Even 450G is not enough RAM for this.

  • How can I reduce the memory usage for this task?
  • How can I estimate the minimum memory required without resorting to trial and error?
  • Is there a better approach?
like image 873
Daniel Mahler Avatar asked Jun 29 '18 05:06

Daniel Mahler


People also ask

How does DASK manage memory?

Dask. distributed stores the results of tasks in the distributed memory of the worker nodes. The central scheduler tracks all data on the cluster and determines when data should be freed. Completed results are usually cleared from memory as quickly as possible in order to make room for more computation.

Why does DASK Compute take so long?

Dask isn't aware of the shape of your DataFrame. In fact, it just knows the number of “partitions”. So, the number of elements in each partition are calculated when you call . compute() , which does take some time.

How do I index a DASK DataFrame?

set_index syntaxCreate a pandas DataFrame with two columns of data, and a 2-partition Dask DataFrame from it. Print the DataFrame and see that it has one index column that was created by default by pandas and two columns with data. Take a look at the divisions of ddf. ddf has two divisions.


1 Answers

Why is Dask using only one core?

The JSON parsing part of this is probably GIL-bound, you want to use processes. However when you finally compute something you're using dataframes, which generally assume that computations release the GIL (this is common in Pandas) so it uses the threading backend by default. If you are mostly bound by the GIL parsing stage then you probably want to use the multiprocessing scheduler. This should solve your problem:

dask.config.set(scheduler='multiprocessing')

How do I avoid memory use during the set_index phase

Yeah, the set_index computation requires the full dataset. This is a hard problem. If you're using the single-machine scheduler (which you appear to be doing) then it should be using an out-of-core data structure to do this sorting process. I'm surprised that it's running out of memory.

How can I estimate the minimum memory required without resorting to trial and error?

Unfortunately it's difficult to estimate the size of JSON-like data in memory in any language. This is much easier with flat schemas.

Is there a better approach?

This doesn't solve your core issue, but you might consider staging data in Parquet format before trying to sort everything. Then try doing dd.read_parquet(...).set_index(...).to_parquet(...) in isolation. This might help to isolate some costs.

like image 52
MRocklin Avatar answered Oct 25 '22 06:10

MRocklin