Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to apply large python model to pyspark-dataframe?

I have:

  • Large dataframe (parquet format, 100.000.000 rows, 4.5TB size) that contains some data (features)
  • Several huge ML models (each one takes 5-15GB of RAM)
  • Spark cluster (AWS EMR), typical node configuration is 8 CPU, 32 RAM, can be changed if needed.

I want to apply them using PySpark, but I always get some wired errors like:

  • OOM
  • Random timeouts (nodes doesn't return any result) -> node killed by YARN manager

I typically used code like

def apply_model(partition):
    model = load(...)  # load model only when apply this function to avoid serialization issue
    for row in partition:
        yield model.infer(row)

or

def apply_model(partition):
    model = load(...)  # load model only when apply this function to 
    yield from model.infer(partition)

and apply that using

df.select(...).rdd.mapPartitions(apply_model)

I can't broadcast model, by serialization reasons.

The question - how to apply the big python/any-non-jvm-based model to spark dataframe & avoid spark exceptions?

like image 772
Ivan Menshikh Avatar asked May 15 '19 15:05

Ivan Menshikh


People also ask

Is PySpark faster than pandas?

Due to parallel execution on all cores on multiple machines, PySpark runs operations faster than Pandas, hence we often required to covert Pandas DataFrame to PySpark (Spark with Python) for better performance. This is one of the major differences between Pandas vs PySpark DataFrame.


1 Answers

Here are some additional suggestions that could help improving the performance of your job:

  • The first change I would do is to reduce the partition size. If I understood correctly at the moment you have input data of 4.5TB. That means if you have 1000 partitions then you will end up sending 4,5GB per partition on each executor! This size is considered quite large, instead I would try to keep the partition size between 250-500MB. Roughly in your case that would mean ~10000 (4.5TB / 500MB) partitions.

  • Increase parallelism by adding more executors. That would increase the level of data locality and consequently reduce the execution time. Ideally you should have 5cores per executor and two executors (if possible) for each cluster node. The max cores per executor should not be higher than 5 since that would cause I/O bottlenecks (when/if disk storage is used).

  • As for the memory the suggestions from @rluta I think are more than sufficient. In general too large values for executor's memory would have a negative effect on Java GC time therefore an upper limit of 10GB should be the ideal value for spark.executor.memory.

like image 166
abiratsis Avatar answered Sep 28 '22 05:09

abiratsis