Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Randomly shuffle column in Spark RDD or dataframe

Is there anyway I can shuffle a column of an RDD or dataframe such that the entries in that column appear in random order? I'm not sure which APIs I could use to accomplish such a task.

like image 950
Ben McCann Avatar asked May 17 '16 22:05

Ben McCann


People also ask

How do I randomly shuffle a column in a DataFrame?

Shuffle DataFrame Randomly by Rows and Columns You can use df. sample(frac=1, axis=1). sample(frac=1). reset_index(drop=True) to shuffle rows and columns randomly.

What is shuffle RDD in spark?

The Spark SQL shuffle is a mechanism for redistributing or re-partitioning data so that the data is grouped differently across partitions, based on your data size you may need to reduce or increase the number of partitions of RDD/DataFrame using spark.

Why DataFrame is faster than RDD in spark?

RDD – RDD API is slower to perform simple grouping and aggregation operations. DataFrame – DataFrame API is very easy to use. It is faster for exploratory analysis, creating aggregated statistics on large data sets. DataSet – In Dataset it is faster to perform aggregation operation on plenty of data sets.

What spark operations cause shuffle?

Operations which can cause a shuffle include repartition operations like repartition and coalesce, 'ByKey operations (except for counting) like groupByKey and reduceByKey, and join operations like cogroup and join.


2 Answers

In case someone is looking for a PySpark equivalent of Sascha Vetter's post, you can find it below:

from pyspark.sql.functions import rand
from pyspark.sql import Row
from pyspark.sql.types import *

def add_index_to_row(row, index):
  print(index)
  row_dict = row.asDict()
  row_dict["index"] = index
  return Row(**row_dict)

def add_index_to_df(df):
  df_with_index = df.rdd.zipWithIndex().map(lambda x: add_index_to_row(x[0], x[1]))
  new_schema = StructType(df.schema.fields + [StructField("index", IntegerType(), True)])
  return spark.createDataFrame(df_with_index, new_schema)

def shuffle_single_column(df, column_name):
  df_cols = df.columns
  # select the desired column and shuffle it (i.e. order it by column with random numbers)
  shuffled_col = df.select(column_name).orderBy(F.rand())
  # add explicit index to the shuffled column
  shuffled_col_index = add_index_to_df(shuffled_col)
  # add explicit index to the original dataframe
  df_index = add_index_to_df(df)
  # drop the desired column from df, join it with the shuffled column on created index and finally drop the index column
  df_shuffled = df_index.drop(column_name).join(shuffled_col_index, "index").drop("index")
  # reorder columns so that the shuffled column comes back to its initial position instead of the last position
  df_shuffled = df_shuffled.select(df_cols)
  return df_shuffled

# initialize random array
z = np.random.randint(20, size=(10, 3)).tolist()
# create the pyspark dataframe
example_df = sc.parallelize(z).toDF(("a","b","c"))
# shuffle one column of the dataframe
example_df_shuffled = shuffle_single_column(df = example_df, column_name = "a")
like image 153
jared3412341 Avatar answered Sep 24 '22 01:09

jared3412341


What about selecting the column to shuffle, orderBy(rand) the column and zip it by index to the existing dataframe?

import org.apache.spark.sql.functions.rand

def addIndex(df: DataFrame) = spark.createDataFrame(
  // Add index
  df.rdd.zipWithIndex.map{case (r, i) => Row.fromSeq(r.toSeq :+ i)},
  // Create schema
  StructType(df.schema.fields :+ StructField("_index", LongType, false))
)

case class Entry(name: String, salary: Double)

val r1 = Entry("Max", 2001.21)
val r2 = Entry("Zhang", 3111.32)
val r3 = Entry("Bob", 1919.21)
val r4 = Entry("Paul", 3001.5)

val df = addIndex(spark.createDataFrame(Seq(r1, r2, r3, r4)))
val df_shuffled = addIndex(df
  .select(col("salary").as("salary_shuffled"))
  .orderBy(rand))

df.join(df_shuffled, Seq("_index"))
  .drop("_index")
  .show(false) 

+-----+-------+---------------+
|name |salary |salary_shuffled|
+-----+-------+---------------+
|Max  |2001.21|3001.5         |
|Zhang|3111.32|3111.32        |
|Paul |3001.5 |2001.21        |
|Bob  |1919.21|1919.21        |
+-----+-------+---------------+
like image 29
Sascha Vetter Avatar answered Sep 22 '22 01:09

Sascha Vetter