Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Sharing objects across workers using pyarrow

I would like to give read-only access to shared DataFrame to multiple worker processes created by multiprocessing.Pool.map().

I would like to avoid copying and pickling.

I understood that pyarrow can be used for that. However, I find their documentation quite cumbersome. Can anybody provide an example on how it can be done?

like image 613
Konstantin Avatar asked Feb 07 '19 20:02

Konstantin


1 Answers

The example at https://github.com/apache/arrow/blob/master/python/examples/plasma/sorting/sort_df.py is a working example that shares a Pandas dataframe between multiple workers using Python multiprocessing (note that it requires you to build a small Cython library in order to run it).

The dataframe is shared via Arrow's Plasma object store.

If you are not tied to Python multiprocessing, you can use Ray to do what you want with simpler syntax.

To give multiple workers read-only access to a Pandas dataframe, you can do the following.

import numpy as np
import pandas
import ray

ray.init()

df = pandas.DataFrame(np.random.normal(size=(1000, 10)))

@ray.remote
def f(df):
    # This task will run on a worker and have read only access to the 
    # dataframe. For example, "df.iloc[0][0] = 1" will raise an exception.
    try:
        df.iloc[0][0] = 1
    except ValueError:
        pass
    return df.iloc[0][0]

# Serialize the dataframe with pyarrow and store it in shared memory.
df_id = ray.put(df)

# Run four tasks that have access to the dataframe.
result_ids = [f.remote(df_id) for _ in range(4)]

# Get the results.
results = ray.get(result_ids)

Note that the line df_id = ray.put(df) can be omitted (and you can directly call f.remote(df)). In that case, df will still be stored in shared memory and shared with the workers, but it will be stored 4 times (once for each call to f.remote(df)), which is less efficient.

like image 72
Robert Nishihara Avatar answered Oct 17 '22 20:10

Robert Nishihara