Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

DataFrame-ified zipWithIndex

I am trying to solve the age-old problem of adding a sequence number to a data set. I am working with DataFrames, and there appears to be no DataFrame equivalent to RDD.zipWithIndex. On the other hand, the following works more or less the way I want it to:

val origDF = sqlContext.load(...)      val seqDF= sqlContext.createDataFrame(     origDF.rdd.zipWithIndex.map(ln => Row.fromSeq(Seq(ln._2) ++ ln._1.toSeq)),     StructType(Array(StructField("seq", LongType, false)) ++ origDF.schema.fields) ) 

In my actual application, origDF won't be loaded directly out of a file -- it is going to be created by joining 2-3 other DataFrames together and will contain upwards of 100 million rows.

Is there a better way to do this? What can I do to optimize it?

like image 493
David Griffin Avatar asked May 18 '15 13:05

David Griffin


People also ask

How do you use zipWithIndex in Pyspark?

Apply zipWithIndex to rdd from dataframe So we have to convert existing Dataframe into RDD. Since zipWithIndex start indices value from 0 and we want to start from 1, we have added 1 to "[rowId+1]". Replace 1 with your offset value if any. Also we have to add newly generated number to existing row list.

What is the schema of a DataFrame?

Spark DataFrames schemas are defined as a collection of typed columns. The entire schema is stored as a StructType and individual columns are stored as StructFields .

What is DataFrame in Spark with example?

What Are DataFrames? In Spark, a DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood.

How do you declare a DataFrame in Java?

A DataFrame is equivalent to a relational table in Spark SQL. The following example creates a DataFrame by pointing Spark SQL to a Parquet data set. Once created, it can be manipulated using the various domain-specific-language (DSL) functions defined in: DataFrame (this class), Column , and functions .


2 Answers

The following was posted on behalf of the David Griffin (edited out of question).

The all-singing, all-dancing dfZipWithIndex method. You can set the starting offset (which defaults to 1), the index column name (defaults to "id"), and place the column in the front or the back:

import org.apache.spark.sql.DataFrame import org.apache.spark.sql.types.{LongType, StructField, StructType} import org.apache.spark.sql.Row   def dfZipWithIndex(   df: DataFrame,   offset: Int = 1,   colName: String = "id",   inFront: Boolean = true ) : DataFrame = {   df.sqlContext.createDataFrame(     df.rdd.zipWithIndex.map(ln =>       Row.fromSeq(         (if (inFront) Seq(ln._2 + offset) else Seq())           ++ ln._1.toSeq ++         (if (inFront) Seq() else Seq(ln._2 + offset))       )     ),     StructType(       (if (inFront) Array(StructField(colName,LongType,false)) else Array[StructField]())          ++ df.schema.fields ++        (if (inFront) Array[StructField]() else Array(StructField(colName,LongType,false)))     )   )  } 
like image 166
4 revs, 4 users 87% Avatar answered Sep 29 '22 20:09

4 revs, 4 users 87%


Since Spark 1.6 there is a function called monotonically_increasing_id()
It generates a new column with unique 64-bit monotonic index for each row
But it isn't consequential, each partition starts a new range, so we must calculate each partition offset before using it.
Trying to provide an "rdd-free" solution, I ended up with some collect(), but it only collects offsets, one value per partition, so it will not cause OOM

def zipWithIndex(df: DataFrame, offset: Long = 1, indexName: String = "index") = {     val dfWithPartitionId = df.withColumn("partition_id", spark_partition_id()).withColumn("inc_id", monotonically_increasing_id())      val partitionOffsets = dfWithPartitionId         .groupBy("partition_id")         .agg(count(lit(1)) as "cnt", first("inc_id") as "inc_id")         .orderBy("partition_id")         .select(sum("cnt").over(Window.orderBy("partition_id")) - col("cnt") - col("inc_id") + lit(offset) as "cnt" )         .collect()         .map(_.getLong(0))         .toArray               dfWithPartitionId         .withColumn("partition_offset", udf((partitionId: Int) => partitionOffsets(partitionId), LongType)(col("partition_id")))         .withColumn(indexName, col("partition_offset") + col("inc_id"))         .drop("partition_id", "partition_offset", "inc_id") }

This solution doesn't repack the original rows and doesn't repartition the original huge dataframe, so it is quite fast in real world: 200GB of CSV data (43 million rows with 150 columns) read, indexed and packed to parquet in 2 minutes on 240 cores
After testing my solution, I have run Kirk Broadhurst's solution and it was 20 seconds slower
You may want or not want to use dfWithPartitionId.cache(), depends on task

like image 33
Evgeny Glotov Avatar answered Sep 29 '22 19:09

Evgeny Glotov