I am trying to solve the age-old problem of adding a sequence number to a data set. I am working with DataFrames, and there appears to be no DataFrame equivalent to RDD.zipWithIndex
. On the other hand, the following works more or less the way I want it to:
val origDF = sqlContext.load(...) val seqDF= sqlContext.createDataFrame( origDF.rdd.zipWithIndex.map(ln => Row.fromSeq(Seq(ln._2) ++ ln._1.toSeq)), StructType(Array(StructField("seq", LongType, false)) ++ origDF.schema.fields) )
In my actual application, origDF won't be loaded directly out of a file -- it is going to be created by joining 2-3 other DataFrames together and will contain upwards of 100 million rows.
Is there a better way to do this? What can I do to optimize it?
Apply zipWithIndex to rdd from dataframe So we have to convert existing Dataframe into RDD. Since zipWithIndex start indices value from 0 and we want to start from 1, we have added 1 to "[rowId+1]". Replace 1 with your offset value if any. Also we have to add newly generated number to existing row list.
Spark DataFrames schemas are defined as a collection of typed columns. The entire schema is stored as a StructType and individual columns are stored as StructFields .
What Are DataFrames? In Spark, a DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood.
A DataFrame is equivalent to a relational table in Spark SQL. The following example creates a DataFrame by pointing Spark SQL to a Parquet data set. Once created, it can be manipulated using the various domain-specific-language (DSL) functions defined in: DataFrame (this class), Column , and functions .
The following was posted on behalf of the David Griffin (edited out of question).
The all-singing, all-dancing dfZipWithIndex method. You can set the starting offset (which defaults to 1), the index column name (defaults to "id"), and place the column in the front or the back:
import org.apache.spark.sql.DataFrame import org.apache.spark.sql.types.{LongType, StructField, StructType} import org.apache.spark.sql.Row def dfZipWithIndex( df: DataFrame, offset: Int = 1, colName: String = "id", inFront: Boolean = true ) : DataFrame = { df.sqlContext.createDataFrame( df.rdd.zipWithIndex.map(ln => Row.fromSeq( (if (inFront) Seq(ln._2 + offset) else Seq()) ++ ln._1.toSeq ++ (if (inFront) Seq() else Seq(ln._2 + offset)) ) ), StructType( (if (inFront) Array(StructField(colName,LongType,false)) else Array[StructField]()) ++ df.schema.fields ++ (if (inFront) Array[StructField]() else Array(StructField(colName,LongType,false))) ) ) }
Since Spark 1.6 there is a function called monotonically_increasing_id()
It generates a new column with unique 64-bit monotonic index for each row
But it isn't consequential, each partition starts a new range, so we must calculate each partition offset before using it.
Trying to provide an "rdd-free" solution, I ended up with some collect(), but it only collects offsets, one value per partition, so it will not cause OOM
def zipWithIndex(df: DataFrame, offset: Long = 1, indexName: String = "index") = { val dfWithPartitionId = df.withColumn("partition_id", spark_partition_id()).withColumn("inc_id", monotonically_increasing_id()) val partitionOffsets = dfWithPartitionId .groupBy("partition_id") .agg(count(lit(1)) as "cnt", first("inc_id") as "inc_id") .orderBy("partition_id") .select(sum("cnt").over(Window.orderBy("partition_id")) - col("cnt") - col("inc_id") + lit(offset) as "cnt" ) .collect() .map(_.getLong(0)) .toArray dfWithPartitionId .withColumn("partition_offset", udf((partitionId: Int) => partitionOffsets(partitionId), LongType)(col("partition_id"))) .withColumn(indexName, col("partition_offset") + col("inc_id")) .drop("partition_id", "partition_offset", "inc_id") }
This solution doesn't repack the original rows and doesn't repartition the original huge dataframe, so it is quite fast in real world: 200GB of CSV data (43 million rows with 150 columns) read, indexed and packed to parquet in 2 minutes on 240 cores
After testing my solution, I have run Kirk Broadhurst's solution and it was 20 seconds slower
You may want or not want to use dfWithPartitionId.cache()
, depends on task
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With