Is there anyway I can shuffle a column of an RDD or dataframe such that the entries in that column appear in random order? I'm not sure which APIs I could use to accomplish such a task.
Shuffle DataFrame Randomly by Rows and Columns You can use df. sample(frac=1, axis=1). sample(frac=1). reset_index(drop=True) to shuffle rows and columns randomly.
The Spark SQL shuffle is a mechanism for redistributing or re-partitioning data so that the data is grouped differently across partitions, based on your data size you may need to reduce or increase the number of partitions of RDD/DataFrame using spark.
RDD – RDD API is slower to perform simple grouping and aggregation operations. DataFrame – DataFrame API is very easy to use. It is faster for exploratory analysis, creating aggregated statistics on large data sets. DataSet – In Dataset it is faster to perform aggregation operation on plenty of data sets.
Operations which can cause a shuffle include repartition operations like repartition and coalesce, 'ByKey operations (except for counting) like groupByKey and reduceByKey, and join operations like cogroup and join.
In case someone is looking for a PySpark equivalent of Sascha Vetter's post, you can find it below:
from pyspark.sql.functions import rand
from pyspark.sql import Row
from pyspark.sql.types import *
def add_index_to_row(row, index):
print(index)
row_dict = row.asDict()
row_dict["index"] = index
return Row(**row_dict)
def add_index_to_df(df):
df_with_index = df.rdd.zipWithIndex().map(lambda x: add_index_to_row(x[0], x[1]))
new_schema = StructType(df.schema.fields + [StructField("index", IntegerType(), True)])
return spark.createDataFrame(df_with_index, new_schema)
def shuffle_single_column(df, column_name):
df_cols = df.columns
# select the desired column and shuffle it (i.e. order it by column with random numbers)
shuffled_col = df.select(column_name).orderBy(F.rand())
# add explicit index to the shuffled column
shuffled_col_index = add_index_to_df(shuffled_col)
# add explicit index to the original dataframe
df_index = add_index_to_df(df)
# drop the desired column from df, join it with the shuffled column on created index and finally drop the index column
df_shuffled = df_index.drop(column_name).join(shuffled_col_index, "index").drop("index")
# reorder columns so that the shuffled column comes back to its initial position instead of the last position
df_shuffled = df_shuffled.select(df_cols)
return df_shuffled
# initialize random array
z = np.random.randint(20, size=(10, 3)).tolist()
# create the pyspark dataframe
example_df = sc.parallelize(z).toDF(("a","b","c"))
# shuffle one column of the dataframe
example_df_shuffled = shuffle_single_column(df = example_df, column_name = "a")
What about selecting the column to shuffle, orderBy(rand)
the column and zip it by index to the existing dataframe?
import org.apache.spark.sql.functions.rand
def addIndex(df: DataFrame) = spark.createDataFrame(
// Add index
df.rdd.zipWithIndex.map{case (r, i) => Row.fromSeq(r.toSeq :+ i)},
// Create schema
StructType(df.schema.fields :+ StructField("_index", LongType, false))
)
case class Entry(name: String, salary: Double)
val r1 = Entry("Max", 2001.21)
val r2 = Entry("Zhang", 3111.32)
val r3 = Entry("Bob", 1919.21)
val r4 = Entry("Paul", 3001.5)
val df = addIndex(spark.createDataFrame(Seq(r1, r2, r3, r4)))
val df_shuffled = addIndex(df
.select(col("salary").as("salary_shuffled"))
.orderBy(rand))
df.join(df_shuffled, Seq("_index"))
.drop("_index")
.show(false)
+-----+-------+---------------+
|name |salary |salary_shuffled|
+-----+-------+---------------+
|Max |2001.21|3001.5 |
|Zhang|3111.32|3111.32 |
|Paul |3001.5 |2001.21 |
|Bob |1919.21|1919.21 |
+-----+-------+---------------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With