I have a 10million record dataframe. My requirement is I need to do some operations on this data in pandas, and I do not have the memory for all 10million records to be in pandas at once. So I want to be able to chunk it and use toPandas on each chunk
df = sqlContext.sql("select * from db.table")
#do chunking to take X records at a time
#how do I generated chunked_df?
p_df = chunked_df.toPandas()
#do things to p_df
How do I chunk my dataframe into either equal x-parts or into parts by record count, say 1 million at a time. Either solution is acceptable, I just need to process it in smaller chunks.
Example 1: Split dataframe using 'DataFrame.limit()' We will make use of the split() method to create 'n' equal dataframes. Where, Limits the result count to the number specified.
We can use the iloc() function to slice DataFrames into smaller DataFrames. The iloc() function allows us to access elements based on the index of rows and columns. Using this function, we can split a DataFrame based on rows or columns.
In this method, we are first going to make a PySpark DataFrame using createDataFrame(). We will then use randomSplit() function to get two slices of the DataFrame while specifying the fractions of rows that will be present in both slices. The rows are split up RANDOMLY.
The PySpark SQL provides the split() function to convert delimiter separated String to an Array (StringType to ArrayType) column on DataFrame It can be done by splitting the string column on the delimiter like space, comma, pipe, etc. and converting it into ArrayType.
One option is to use toLocalIterator
in conjunction with repartition
and mapPartitions
.
import pandas as pd
columns = spark_df.schema.fieldNames()
chunks = spark_df.repartition(num_chunks).rdd.mapPartitions(lambda iterator: [pd.DataFrame(list(iterator), columns=columns)]).toLocalIterator()
for pdf in chunks:
# do work locally on chunk as pandas df
By using toLocalIterator
, only one partition at a time is collected to the driver.
Another option, which in my opinion is preferable, is to distribute your work across the cluster on the pandas chunks in each partition. This can be achieved using pandas_udf
:
from pyspark.sql.functions import spark_partition_id, pandas_udf, PandasUDFType
@pandas_udf(result_schema, PandasUDFType.GROUPED_MAP)
def transform_pandas_df_chunk(pdf):
result_pdf = ...
# do ditributed work on a chunk of the original spark dataframe as a pandas dataframe
return result_pdf
spark_df = spark_df.repartition(num_chunks).groupby(spark_partition_id()).apply(transform_pandas_df_chunk)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With