I need to proceed distributed calculation on Spark DataFrame invoking some arbitrary (not SQL) logic on chunks of DataFrame. I did:
def some_func(df_chunk):
pan_df = df_chunk.toPandas()
#whatever logic here
df = sqlContext.read.parquet(...)
result = df.mapPartitions(some_func)
Unfortunatelly it leads to:
AttributeError: 'itertools.chain' object has no attribute 'toPandas'
I expected to have spark DataFrame object within each map invocation, instead I got 'itertools.chain'. Why? And how to overcome this?
mapPartitions() – This is precisely the same as map(); the difference being, Spark mapPartitions() provides a facility to do heavy initializations (for example, Database connection) once for each partition instead of doing it on every DataFrame row.
MapPartitionsRDD is an RDD that applies the provided function f to every partition of the parent RDD. By default, it does not preserve partitioning — the last input parameter preservesPartitioning is false . If it is true , it retains the original RDD's partitioning.
Try this:
>>> columns = df.columns
>>> df.rdd.mapPartitions(lambda iter: [pd.DataFrame(list(iter), columns=columns)])
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With