I'm trying to create a Row (org.apache.spark.sql.catalyst.expressions.Row
) based on the user input. I'm not able to create a Row randomly.
Is there any functionality to create a Row from List
or Array
.
For eg., If I have a .csv
file with the following format,
"91xxxxxxxxxx,21.31,15,0,0"
If the user input [1, 2]
then I need to take only 2nd column and 3rd column along with the customer_id
which is the first column
I try to parse it with the code:
val l3 = sc.textFile("/SparkTest/abc.csv").map(_.split(" ")).map(r => (foo(input,r(0)))) `
where foo is defined as
def f(n: List[Int], s: String) : Row = {
val n = input.length
var out = new Array[Any](n+1)
var r = s.split(",")
out(0) = r(0)
for (i <- 1 to n)
out(i) = r(input(i-1)).toDouble
Row(out)
}
and input is a List say
val input = List(1,2)
Executing this code I get l3 as:
Array[org.apache.spark.sql.Row] = Array([[Ljava.lang.Object;@234d2916])
But what I want is:
Array[org.apache.spark.sql.catalyst.expressions.Row] = Array([9xxxxxxxxxx,21.31,15])`
This has to be passed to create a schema in Spark SQL
Spark SQL explode function is used to create or split an array or map DataFrame columns to rows. Spark defines several flavors of this function; explode_outer – to handle nulls and empty, posexplode – which explodes with a position of element and posexplode_outer – to handle nulls.
To create a new Row, use RowFactory. create() in Java or Row. apply() in Scala. A Row object can be constructed by providing field values.
The row_number() is a window function in Spark SQL that assigns a row number (sequential integer number) to each row in the result DataFrame. This function is used with Window. partitionBy() which partitions the data into windows frames and orderBy() clause to sort the rows in each partition.
A row in Spark is an ordered collection of fields that can be accessed starting at index 0. The row is a generic object of type Row . Columns making up the row can be of the same or different types.
Something like the following should work:
import org.apache.spark.sql._
def f(n: List[Int], s: String) : Row =
Row.fromSeq(s.split(",").zipWithIndex.collect{case (a,b) if n.contains(b) => a}.toSeq)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With