Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark DataFrame mapPartitions

I need to proceed distributed calculation on Spark DataFrame invoking some arbitrary (not SQL) logic on chunks of DataFrame. I did:

def some_func(df_chunk):
    pan_df = df_chunk.toPandas()
    #whatever logic here

df = sqlContext.read.parquet(...)
result = df.mapPartitions(some_func)

Unfortunatelly it leads to:

AttributeError: 'itertools.chain' object has no attribute 'toPandas'

I expected to have spark DataFrame object within each map invocation, instead I got 'itertools.chain'. Why? And how to overcome this?

like image 741
Иван Судос Avatar asked Aug 03 '16 16:08

Иван Судос


People also ask

What is the difference between map and mapPartitions in spark?

mapPartitions() – This is precisely the same as map(); the difference being, Spark mapPartitions() provides a facility to do heavy initializations (for example, Database connection) once for each partition instead of doing it on every DataFrame row.

What is spark MapPartitionsRDD?

MapPartitionsRDD is an RDD that applies the provided function f to every partition of the parent RDD. By default, it does not preserve partitioning — the last input parameter preservesPartitioning is false . If it is true , it retains the original RDD's partitioning.


1 Answers

Try this:

>>> columns = df.columns
>>> df.rdd.mapPartitions(lambda iter: [pd.DataFrame(list(iter), columns=columns)])
like image 50
user6022341 Avatar answered Oct 27 '22 20:10

user6022341