Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

spark possible to split dataframe into parts for topandas

I have a 10million record dataframe. My requirement is I need to do some operations on this data in pandas, and I do not have the memory for all 10million records to be in pandas at once. So I want to be able to chunk it and use toPandas on each chunk

df = sqlContext.sql("select * from db.table")
#do chunking to take X records at a time
#how do I generated chunked_df?
p_df = chunked_df.toPandas()
#do things to p_df

How do I chunk my dataframe into either equal x-parts or into parts by record count, say 1 million at a time. Either solution is acceptable, I just need to process it in smaller chunks.

like image 543
test acc Avatar asked Oct 26 '18 15:10

test acc


People also ask

How do you split a spark DataFrame into multiple data frames?

Example 1: Split dataframe using 'DataFrame.limit()' We will make use of the split() method to create 'n' equal dataframes. Where, Limits the result count to the number specified.

How do you split a data frame in half?

We can use the iloc() function to slice DataFrames into smaller DataFrames. The iloc() function allows us to access elements based on the index of rows and columns. Using this function, we can split a DataFrame based on rows or columns.

How do you slice a DataFrame in PySpark?

In this method, we are first going to make a PySpark DataFrame using createDataFrame(). We will then use randomSplit() function to get two slices of the DataFrame while specifying the fractions of rows that will be present in both slices. The rows are split up RANDOMLY.

How do you split data in PySpark?

The PySpark SQL provides the split() function to convert delimiter separated String to an Array (StringType to ArrayType) column on DataFrame It can be done by splitting the string column on the delimiter like space, comma, pipe, etc. and converting it into ArrayType.


1 Answers

One option is to use toLocalIterator in conjunction with repartition and mapPartitions.

import pandas as pd

columns = spark_df.schema.fieldNames()
chunks = spark_df.repartition(num_chunks).rdd.mapPartitions(lambda iterator: [pd.DataFrame(list(iterator), columns=columns)]).toLocalIterator()
for pdf in chunks:
    # do work locally on chunk as pandas df

By using toLocalIterator, only one partition at a time is collected to the driver.

Another option, which in my opinion is preferable, is to distribute your work across the cluster on the pandas chunks in each partition. This can be achieved using pandas_udf:

from pyspark.sql.functions import spark_partition_id, pandas_udf, PandasUDFType

@pandas_udf(result_schema, PandasUDFType.GROUPED_MAP)
def transform_pandas_df_chunk(pdf):
    result_pdf = ...
    # do ditributed work on a chunk of the original spark dataframe as a pandas dataframe
    return result_pdf

spark_df = spark_df.repartition(num_chunks).groupby(spark_partition_id()).apply(transform_pandas_df_chunk)
like image 141
Zohar Meir Avatar answered Sep 22 '22 00:09

Zohar Meir