I figured I would ask the question. I've found a clever way to reduce the size of a PySpark Dataframe and convert it to Pandas and I was just wondering, does the toPandas function get faster as the size of the pyspark dataframe gets smaller? Here is some code:
window = Window.partitionBy(F.lit('A')).orderBy(F.lit('A'))
eps_tfs = {}
while True:
pdf = toPandas(conn.select(F.col('*')).where(F.col('row_number') <= 2500))
n = len(pdf)
trigger = 0
for u in pdf['features']:
indices = [i for i, x in enumerate(u) if x == 1.0]
for idx in range(len(eps_columns)):
if idx in indices:
try:
eps_tfs[eps_columns[idx]].append(True)
except:
eps_tfs[eps_columns[idx]] = [True]
else:
try:
eps_tfs[eps_columns[idx]].append(False)
except:
eps_tfs[eps_columns[idx]] = [False]
full_view = full_view.append(pd.concat([pdf, pd.DataFrame(eps_tfs)], axis=1))
conn = conn.select(F.col('*')).where(F.col('row_number') > 2500)
conn = conn.drop("row_number")
conn = conn.select(F.col('*'), F.row_number().over(window).alias('row_number'))
eps_tfs = {}
del pdf
if n < 2500:
break
Also, is the following code really a faster way to map the dataframe to pandas?
def _map_to_pandas(rdds):
""" Needs to be here due to pickling issues """
return [pd.DataFrame(list(rdds))]
def toPandas(df, n_partitions=None):
"""
Returns the contents of `df` as a local `pandas.DataFrame` in a speedy fashion. The DataFrame is
repartitioned if `n_partitions` is passed.
:param df: pyspark.sql.DataFrame
:param n_partitions: int or None
:return: pandas.DataFrame
"""
if n_partitions is not None: df = df.repartition(n_partitions)
df_pand = df.rdd.mapPartitions(_map_to_pandas).collect()
df_pand = pd.concat(df_pand)
df_pand.columns = df.columns
return df_pand
Is there any better way to go about doing this?
In our case, we found that just not doing toPandas() and using pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns) was fastest. We couldn't use the arrow option because we got the error "arrow is not supported when using file-based collect".
Looking at the source code for toPandas(), one reason it may be slow is because it first creates the pandas DataFrame, and then copies each of the Series in that DataFrame over to the returned DataFrame. If you know that all of your columns have unique names, and that the data types will convert nicely via having pandas infer the dtype values, there is no need to do any of that copying or dtype inference.
Side note: We were converting a Spark DataFrame on Databricks with about 2 million rows and 6 columns, so your mileage may vary dependent on the size of your conversion.
The answer by @EZY is true (that you need to collect all rows to the driver or client). However, there is one more optimisation possible with apache arrow integration. It provides faster libraries for numpy and pandas data types. It's not enabled by default, so you need to enable it by setting spark conf like below.
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With