Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

dask bag not using all cores? alternatives?

I have a python script which does the following: i. which takes an input file of data (usually nested JSON format) ii. passes the data line by line to another function which manipulates the data into desired format iii. and finally it writes the output into a file.

Here is my current simple python line that does this...

def manipulate(line):
    # a pure python function which transforms the data
    # ...
    return manipulated_json

for line in f:
    components.append(manipulate(ujson.loads(line)))
    write_to_csv(components)`

This works, but with the python GIL limiting it to one core on the server, it's painfully slow, especially with large amounts of data.

The amount of data I normally deal with is around 4 gigs gzip compressed but occasionally I have to process data that is hundreds of gigs gzip compressed. It is not Big Data necessarily but still cannot be processed all in memory and with Python's GIL is very slow to process.

While searching for a solution to optimize our data processing, I came across dask. While PySpark seemed to be the obvious solution to me at the time, the promises of dask and it's simplicity won me over and I decided to give it a try.

After a lot of research into dask and how to use it, I put together a very small script to replicate my current process. The script looks like this:

import dask.bag as bag
import json
bag.from_filenames('input.json.gz').map(json.loads).map(lambda x:manipulate(x)).concat().to_dataframe().to_csv('output.csv.gz')`

This works and produces the same results as my original non-dask script but it still only uses one CPU on the server. So, it didn't help at all. In fact, it's slower.

What am I doing wrong? Am I missing something? I'm still fairly new to dask so let me know if I've overlooked something or if I should be doing something different altogether.

Also, are there any alternatives to dask for using the full capacity of the server (i.e. all CPUs) for what I need to do?

Thanks,

T

like image 764
tamjd1 Avatar asked Dec 03 '15 19:12

tamjd1


People also ask

Does Dask use multiple cores?

The Dask library scales Python computation to multiple cores or even to multiple machines.

How is a Dask bag different from a Dask DataFrame?

Dask Bags are good for reading in initial data, doing a bit of pre-processing, and then handing off to some other more efficient form like Dask Dataframes. Dask Dataframes use Pandas internally, and so can be much faster on numeric data and also have more complex algorithms.

Why is the advent of Dask so important?

Dask can enable efficient parallel computations on single machines by leveraging their multi-core CPUs and streaming data efficiently from disk. It can run on a distributed cluster. Dask also allows the user to replace clusters with a single-machine scheduler which would bring down the overhead.


1 Answers

The problem here is with dask.dataframe.to_csv, which forces you to single-core mode.

I recommend using dask.bag to do your reading and manipulation and then dump down to a bunch of CSV files in parallel. Dumping to many CSV files is a lot easier to coordiante than dumping to a single CSV file.

import dask.bag as bag
import json
b = bag.from_filenames('input.json.gz').map(json.loads).map(manipulate).concat()
b.map(lambda t: ','.join(map(str, t)).to_textfiles('out.*.csv').compute()

There might also be an issue with trying to read a single GZIP file in parallel, but the above should get you started.

like image 186
MRocklin Avatar answered Oct 03 '22 22:10

MRocklin