Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Scala 2.10 tuple limit

I have DataFrame with 66 columns to process (almost each column value needs to be changed someway) so I'm running following statement

    val result = data.map(row=> (
        modify(row.getString(row.fieldIndex("XX"))),
        (...)
        )
    )

till 66th column. Since scala in this version has limit to max tuple of 22 pairs I cannot perform this like that. Question is, is there any workaround for it? After all line operations I'm converting it to df with specific column names

   result.toDf("c1",...,"c66")
   result.storeAsTempTable("someFancyResult")

"modify" function is just an example to show my point

like image 545
Silverrose Avatar asked Mar 14 '23 02:03

Silverrose


1 Answers

If all you do is modifying values from an existing DataFrame it is better to use an UDF instead of mapping over a RDD:

import org.apache.spark.sql.functions.udf

val modifyUdf = udf(modify)
data.withColumn("c1", modifyUdf($"c1"))

If for some reason above doesn't fit your needs the simplest thing you can do is to recreateDataFrame from a RDD[Row]. for example like this:

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField, StructType, IntegerType}


val result: RDD[Row] = data.map(row => {
  val buffer = ArrayBuffer.empty[Any]

  // Add value to buffer
  buffer.append(modify(row.getAs[String]("c1")))

  // ... repeat for other values

  // Build row
  Row.fromSeq(buffer)
})

// Create schema
val schema = StructType(Seq(
  StructField("c1", StringType, false),
  // ...  
  StructField("c66", StringType, false)
))

sqlContext.createDataFrame(result, schema)
like image 145
zero323 Avatar answered Mar 31 '23 13:03

zero323