Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to create a Row from a List or Array in Spark using Scala

I'm trying to create a Row (org.apache.spark.sql.catalyst.expressions.Row) based on the user input. I'm not able to create a Row randomly.

Is there any functionality to create a Row from List or Array.

For eg., If I have a .csv file with the following format,

"91xxxxxxxxxx,21.31,15,0,0"

If the user input [1, 2] then I need to take only 2nd column and 3rd column along with the customer_id which is the first column

I try to parse it with the code:

val l3 = sc.textFile("/SparkTest/abc.csv").map(_.split(" ")).map(r => (foo(input,r(0)))) `

where foo is defined as

def f(n: List[Int], s: String) : Row = {
    val n = input.length
    var out = new Array[Any](n+1)
    var r = s.split(",")
    out(0) = r(0)
    for (i <- 1 to n)
        out(i) = r(input(i-1)).toDouble
    Row(out)
}

and input is a List say

val input = List(1,2)

Executing this code I get l3 as:

Array[org.apache.spark.sql.Row] = Array([[Ljava.lang.Object;@234d2916])

But what I want is:

Array[org.apache.spark.sql.catalyst.expressions.Row] = Array([9xxxxxxxxxx,21.31,15])`

This has to be passed to create a schema in Spark SQL

like image 972
Anju Avatar asked Jan 23 '15 10:01

Anju


People also ask

What is explode function in Scala?

Spark SQL explode function is used to create or split an array or map DataFrame columns to rows. Spark defines several flavors of this function; explode_outer – to handle nulls and empty, posexplode – which explodes with a position of element and posexplode_outer – to handle nulls.

How do I create a row in Scala?

To create a new Row, use RowFactory. create() in Java or Row. apply() in Scala. A Row object can be constructed by providing field values.

How do I create a row number in Spark?

The row_number() is a window function in Spark SQL that assigns a row number (sequential integer number) to each row in the result DataFrame. This function is used with Window. partitionBy() which partitions the data into windows frames and orderBy() clause to sort the rows in each partition.

What is row in Spark Scala?

A row in Spark is an ordered collection of fields that can be accessed starting at index 0. The row is a generic object of type Row . Columns making up the row can be of the same or different types.


1 Answers

Something like the following should work:

import org.apache.spark.sql._

def f(n: List[Int], s: String) : Row =
  Row.fromSeq(s.split(",").zipWithIndex.collect{case (a,b) if n.contains(b) => a}.toSeq)
like image 59
gruggie Avatar answered Nov 09 '22 05:11

gruggie