I am trying to turn the Wikipedia CirrusSearch dump into Parquet backed dask dataframe indexed by title on a 450G 16-core GCP instance. CirrusSearch dumps come as a single json line formatted file. The English Wipedia dumps contain 5M recards and are 12G compressed & 90+G expanded. An important detail is that the records are not completely flat.
The simplest way to do this would be
import json
import dask
from dask import bag as db, dataframe as ddf
from toolz import curried as tz
from toolz.curried import operator as op
blocksize=2**24
npartitions='auto'
parquetopts=dict(engine='fastparquet', object_encoding='json')
lang = 'en'
wiki = 'wiki'
date = 20180625
path='./'
source = f'{path}{lang}{wiki}-{date}-cirrussearch-content.json'
(
db
.read_text(source, blocksize=blocksize)
.map(json.loads)
.filter(tz.flip(op.contains, 'title'))
.to_dataframe()
.set_index('title', npartitions=npartitions)
.to_parquet(f'{lang}{wiki}-{date}-cirrussearch.pq', **parquetopts)
)
The first problem is that with the default scheduler this utilizes only one core. That problem can be avoided by explictly using either the distributed or multiprocessing schedulers.
The bigger problem with all schedulers and settings I have tried is memory usage. It appears that dask tries to load the entire dataframe into memory when indexing. Even 450G is not enough RAM for this.
Dask. distributed stores the results of tasks in the distributed memory of the worker nodes. The central scheduler tracks all data on the cluster and determines when data should be freed. Completed results are usually cleared from memory as quickly as possible in order to make room for more computation.
Dask isn't aware of the shape of your DataFrame. In fact, it just knows the number of “partitions”. So, the number of elements in each partition are calculated when you call . compute() , which does take some time.
set_index syntaxCreate a pandas DataFrame with two columns of data, and a 2-partition Dask DataFrame from it. Print the DataFrame and see that it has one index column that was created by default by pandas and two columns with data. Take a look at the divisions of ddf. ddf has two divisions.
The JSON parsing part of this is probably GIL-bound, you want to use processes. However when you finally compute something you're using dataframes, which generally assume that computations release the GIL (this is common in Pandas) so it uses the threading backend by default. If you are mostly bound by the GIL parsing stage then you probably want to use the multiprocessing scheduler. This should solve your problem:
dask.config.set(scheduler='multiprocessing')
Yeah, the set_index computation requires the full dataset. This is a hard problem. If you're using the single-machine scheduler (which you appear to be doing) then it should be using an out-of-core data structure to do this sorting process. I'm surprised that it's running out of memory.
Unfortunately it's difficult to estimate the size of JSON-like data in memory in any language. This is much easier with flat schemas.
This doesn't solve your core issue, but you might consider staging data in Parquet format before trying to sort everything. Then try doing dd.read_parquet(...).set_index(...).to_parquet(...)
in isolation. This might help to isolate some costs.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With