Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Loading local file from client onto dask distributed cluster

A bit of a beginner question, but I was not able to find a relevant answer on this..

Essentially my data about (7gb) is located on my local machine. I have distributed cluster running on the local network. How can I get this file onto the cluster?

The usual dd.read_csv() or read_parquet() fails, as the workers aren't able to locate the file in their own environments.

Would I need to manually transfer the file to each node in the cluster?

Note: Due to admin restrictions I am limited to SFTP...

like image 809
Bot Man Avatar asked May 05 '17 04:05

Bot Man


2 Answers

Two options

Network file systems

As suggested in the comments, there are various ways to make your local file accessible to other machines in your cluster using normal file system solutions. This is a great choice if accessible to you.

Load and scatter locally

If that doesn't work then you can always load data locally and scatter it out to the various workers of your cluster. If your file is larger than the memory of your single computer then you might have to do this piece by piece.

Single pass

If everything fits in memory then I would load the data normally and then scatter it out to a worker. You could split it out afterwards and spread it to other workers if desired:

import pandas
import dask.dataframe as dd
from dask.distributed import Client

client = Client('scheduler-address:8786')

df = pd.read_csv('myfile.csv')
future = client.scatter(df)  # send dataframe to one worker
ddf = dd.from_delayed([future], meta=df)  # build dask.dataframe on remote data
ddf = ddf.repartition(npartitions=20).persist()  # split
client.rebalance(ddf)  # spread around all of your workers

Multiple bits

If you have multiple small files then you can iteratively load and scatter, perhaps in a for loop, and then make a dask.dataframe from many futures

futures = []
for fn in filenames:
    df = pd.read_csv(fn)
    future = client.scatter(df)
    futures.append(future)

ddf = dd.from_delayed(futures, meta=df)

In this case you could probably skip the repartition and rebalance steps

If you have a single large file then you would probably have to do some splitting of it yourself, either with pd.read_csv(..., chunksize=...)

like image 179
MRocklin Avatar answered Sep 19 '22 23:09

MRocklin


Network solution :

  • Under Windows only it should works with a shared forlder: dd.read_csv("\\server\shared_dir")

  • Under Unix/Linux only it should works with HDFS: import hdfs3 and then hdfs.read_csv('/server/data_dir'...)

But if you want to use Windows AND Linux workers at the same time I don't know since dd.read_csv() with UNC does not seem to be supported under Linux (because of the file path '\server\data_dir') and HDFS with hdfs.read_csv is not working under Windows (import hdfs3 failed because the lib libhdfs3.so doesn't exist under Windows)

Does anyone have a Network solution for workers under Windows and Unix ?

like image 33
Sébastien D. Avatar answered Sep 21 '22 23:09

Sébastien D.