I am trying to solve the age-old problem of adding a sequence number to a data set. I am working with DataFrames, and there appears to be no DataFrame equivalent to RDD.zipWithIndex. On the other hand, the following works more or less the way I want it to:
val origDF = sqlContext.load(...) val seqDF= sqlContext.createDataFrame( origDF.rdd.zipWithIndex.map(ln => Row.fromSeq(Seq(ln._2) ++ ln._1.toSeq)), StructType(Array(StructField("seq", LongType, false)) ++ origDF.schema.fields) ) In my actual application, origDF won't be loaded directly out of a file -- it is going to be created by joining 2-3 other DataFrames together and will contain upwards of 100 million rows.
Is there a better way to do this? What can I do to optimize it?
Apply zipWithIndex to rdd from dataframe So we have to convert existing Dataframe into RDD. Since zipWithIndex start indices value from 0 and we want to start from 1, we have added 1 to "[rowId+1]". Replace 1 with your offset value if any. Also we have to add newly generated number to existing row list.
Spark DataFrames schemas are defined as a collection of typed columns. The entire schema is stored as a StructType and individual columns are stored as StructFields .
What Are DataFrames? In Spark, a DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood.
A DataFrame is equivalent to a relational table in Spark SQL. The following example creates a DataFrame by pointing Spark SQL to a Parquet data set. Once created, it can be manipulated using the various domain-specific-language (DSL) functions defined in: DataFrame (this class), Column , and functions .
The following was posted on behalf of the David Griffin (edited out of question).
The all-singing, all-dancing dfZipWithIndex method. You can set the starting offset (which defaults to 1), the index column name (defaults to "id"), and place the column in the front or the back:
import org.apache.spark.sql.DataFrame import org.apache.spark.sql.types.{LongType, StructField, StructType} import org.apache.spark.sql.Row def dfZipWithIndex( df: DataFrame, offset: Int = 1, colName: String = "id", inFront: Boolean = true ) : DataFrame = { df.sqlContext.createDataFrame( df.rdd.zipWithIndex.map(ln => Row.fromSeq( (if (inFront) Seq(ln._2 + offset) else Seq()) ++ ln._1.toSeq ++ (if (inFront) Seq() else Seq(ln._2 + offset)) ) ), StructType( (if (inFront) Array(StructField(colName,LongType,false)) else Array[StructField]()) ++ df.schema.fields ++ (if (inFront) Array[StructField]() else Array(StructField(colName,LongType,false))) ) ) }
Since Spark 1.6 there is a function called monotonically_increasing_id()
It generates a new column with unique 64-bit monotonic index for each row
But it isn't consequential, each partition starts a new range, so we must calculate each partition offset before using it.
Trying to provide an "rdd-free" solution, I ended up with some collect(), but it only collects offsets, one value per partition, so it will not cause OOM
def zipWithIndex(df: DataFrame, offset: Long = 1, indexName: String = "index") = { val dfWithPartitionId = df.withColumn("partition_id", spark_partition_id()).withColumn("inc_id", monotonically_increasing_id()) val partitionOffsets = dfWithPartitionId .groupBy("partition_id") .agg(count(lit(1)) as "cnt", first("inc_id") as "inc_id") .orderBy("partition_id") .select(sum("cnt").over(Window.orderBy("partition_id")) - col("cnt") - col("inc_id") + lit(offset) as "cnt" ) .collect() .map(_.getLong(0)) .toArray dfWithPartitionId .withColumn("partition_offset", udf((partitionId: Int) => partitionOffsets(partitionId), LongType)(col("partition_id"))) .withColumn(indexName, col("partition_offset") + col("inc_id")) .drop("partition_id", "partition_offset", "inc_id") } This solution doesn't repack the original rows and doesn't repartition the original huge dataframe, so it is quite fast in real world: 200GB of CSV data (43 million rows with 150 columns) read, indexed and packed to parquet in 2 minutes on 240 cores
After testing my solution, I have run Kirk Broadhurst's solution and it was 20 seconds slower
You may want or not want to use dfWithPartitionId.cache(), depends on task
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With