I am a newbie at scala and spark, please keep that in mind :)
Actually, I have three questions
Thank you at advance
Using concat() Function to Concatenate DataFrame Columns Spark SQL functions provide concat() to concatenate two or more DataFrame columns into a single Column. It can also take columns of different Data Types and concatenate them into a single column. for example, it supports String, Int, Boolean and also arrays.
You can add multiple columns to Spark DataFrame in several ways if you wanted to add a known set of columns you can easily do by chaining withColumn() or on select(). However, sometimes you may need to add multiple columns after applying some transformations n that case you can use either map() or foldLeft().
Spark SQL provides a pivot() function to rotate the data from one column into multiple columns (transpose row to column). It is an aggregation where one of the grouping columns values is transposed into individual columns with distinct data.
Usually there should be no need for that and it is better to use UDFs but here you are:
How should I define function to pass it into df.rdd.mapPartitions, if I want to create new Row with few additional columns
It should take Iterator[Row]
and return Iterator[T]
so in your case you should use something like this
import org.apache.spark.sql.Row def transformRows(iter: Iterator[Row]): Iterator[Row] = ???
How can I add few columns into Row object(or create a new one)
There are multiple ways of accessing Row
values including Row.get*
methods, Row.toSeq
etc. New Row
can be created using Row.apply
, Row.fromSeq
, Row.fromTuple
or RowFactory
. For example:
def transformRow(row: Row): Row = Row.fromSeq(row.toSeq ++ Array[Any](-1, 1))
How create DataFrame from created RDD
If you have RDD[Row]
you can use SQLContext.createDataFrame
and provide schema.
Putting this all together:
import org.apache.spark.sql.types.{IntegerType, StructField, StructType} val df = sc.parallelize(Seq( (1.0, 2.0), (0.0, -1.0), (3.0, 4.0), (6.0, -2.3))).toDF("x", "y") def transformRows(iter: Iterator[Row]): Iterator[Row] = iter.map(transformRow) val newSchema = StructType(df.schema.fields ++ Array( StructField("z", IntegerType, false), StructField("v", IntegerType, false))) sqlContext.createDataFrame(df.rdd.mapPartitions(transformRows), newSchema).show // +---+----+---+---+ // | x| y| z| v| // +---+----+---+---+ // |1.0| 2.0| -1| 1| // |0.0|-1.0| -1| 1| // |3.0| 4.0| -1| 1| // |6.0|-2.3| -1| 1| // +---+----+---+---+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With