Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to add columns into org.apache.spark.sql.Row inside of mapPartitions

Tags:

I am a newbie at scala and spark, please keep that in mind :)

Actually, I have three questions

  1. How should I define function to pass it into df.rdd.mapPartitions, if I want to create new Row with few additional columns
  2. How can I add few columns into Row object(or create a new one)
  3. How create DataFrame from created RDD

Thank you at advance

like image 323
Azat Fazulzyanov Avatar asked Nov 23 '15 16:11

Azat Fazulzyanov


People also ask

How do I add columns to spark?

Using concat() Function to Concatenate DataFrame Columns Spark SQL functions provide concat() to concatenate two or more DataFrame columns into a single Column. It can also take columns of different Data Types and concatenate them into a single column. for example, it supports String, Int, Boolean and also arrays.

How do I add multiple columns in spark?

You can add multiple columns to Spark DataFrame in several ways if you wanted to add a known set of columns you can easily do by chaining withColumn() or on select(). However, sometimes you may need to add multiple columns after applying some transformations n that case you can use either map() or foldLeft().

How do you transpose rows into columns in spark?

Spark SQL provides a pivot() function to rotate the data from one column into multiple columns (transpose row to column). It is an aggregation where one of the grouping columns values is transposed into individual columns with distinct data.


1 Answers

Usually there should be no need for that and it is better to use UDFs but here you are:

How should I define function to pass it into df.rdd.mapPartitions, if I want to create new Row with few additional columns

It should take Iterator[Row] and return Iterator[T] so in your case you should use something like this

import org.apache.spark.sql.Row  def transformRows(iter: Iterator[Row]): Iterator[Row] = ??? 

How can I add few columns into Row object(or create a new one)

There are multiple ways of accessing Row values including Row.get* methods, Row.toSeq etc. New Row can be created using Row.apply, Row.fromSeq, Row.fromTuple or RowFactory. For example:

def transformRow(row: Row): Row =  Row.fromSeq(row.toSeq ++ Array[Any](-1, 1)) 

How create DataFrame from created RDD

If you have RDD[Row] you can use SQLContext.createDataFrame and provide schema.

Putting this all together:

import org.apache.spark.sql.types.{IntegerType, StructField, StructType}  val  df = sc.parallelize(Seq(     (1.0, 2.0), (0.0, -1.0),     (3.0, 4.0), (6.0, -2.3))).toDF("x", "y")  def transformRows(iter: Iterator[Row]): Iterator[Row] = iter.map(transformRow)  val newSchema = StructType(df.schema.fields ++ Array(   StructField("z", IntegerType, false), StructField("v", IntegerType, false)))  sqlContext.createDataFrame(df.rdd.mapPartitions(transformRows), newSchema).show  // +---+----+---+---+ // |  x|   y|  z|  v| // +---+----+---+---+ // |1.0| 2.0| -1|  1| // |0.0|-1.0| -1|  1| // |3.0| 4.0| -1|  1| // |6.0|-2.3| -1|  1| // +---+----+---+---+ 
like image 135
zero323 Avatar answered Sep 26 '22 02:09

zero323