Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Iterate rows and columns in Spark dataframe

I have the following Spark dataframe that is created dynamically:

val sf1 = StructField("name", StringType, nullable = true) val sf2 = StructField("sector", StringType, nullable = true) val sf3 = StructField("age", IntegerType, nullable = true)  val fields = List(sf1,sf2,sf3) val schema = StructType(fields)  val row1 = Row("Andy","aaa",20) val row2 = Row("Berta","bbb",30) val row3 = Row("Joe","ccc",40)  val data = Seq(row1,row2,row3)  val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)  df.createOrReplaceTempView("people") val sqlDF = spark.sql("SELECT * FROM people") 

Now, I need to iterate each row and column in sqlDF to print each column, this is my attempt:

sqlDF.foreach { row =>   row.foreach { col => println(col) } } 

row is type Row, but is not iterable that's why this code throws a compilation error in row.foreach. How to iterate each column in Row?

like image 366
ps0604 Avatar asked Mar 13 '18 09:03

ps0604


People also ask

How do you iterate over rows and columns in PySpark DataFrame?

iterrows() This method is used to iterate the columns in the given PySpark DataFrame. It can be used with for loop and takes column names through the row iterator and index to iterate columns. Finally, it will display the rows according to the specified indices.

How do I iterate through spark DataFrame rows?

For looping through each row using map() first we have to convert the PySpark dataframe into RDD because map() is performed on RDD's only, so first convert into RDD it then use map() in which, lambda function for iterating through each row and stores the new RDD in some variable then convert back that new RDD into ...

How do you use foreach in PySpark DataFrame?

Example of PySpark foreach Let's first create a DataFrame in Python. Now let's create a simple function first that will print all the elements in and will pass it in a For Each Loop. This is a simple Print function that prints all the data in a DataFrame. Let's iterate over all the elements using for Each loop.

What is explode in PySpark?

pyspark.sql.functions. explode (col)[source] Returns a new row for each element in the given array or map. Uses the default column name col for elements in the array and key and value for elements in the map unless specified otherwise.


2 Answers

Consider you have a Dataframe like below

+-----+------+---+ | name|sector|age| +-----+------+---+ | Andy|   aaa| 20| |Berta|   bbb| 30| |  Joe|   ccc| 40| +-----+------+---+ 

To loop your Dataframe and extract the elements from the Dataframe, you can either chose one of the below approaches.

Approach 1 - Loop using foreach

Looping a dataframe directly using foreach loop is not possible. To do this, first you have to define schema of dataframe using case class and then you have to specify this schema to the dataframe.

import spark.implicits._ import org.apache.spark.sql._ case class cls_Employee(name:String, sector:String, age:Int) val df = Seq(cls_Employee("Andy","aaa", 20), cls_Employee("Berta","bbb", 30), cls_Employee("Joe","ccc", 40)).toDF() df.as[cls_Employee].take(df.count.toInt).foreach(t => println(s"name=${t.name},sector=${t.sector},age=${t.age}")) 

Please see the result below :

enter image description here

Approach 2 - Loop using rdd

Use rdd.collect on top of your Dataframe. The row variable will contain each row of Dataframe of rdd row type. To get each element from a row, use row.mkString(",") which will contain value of each row in comma separated values. Using split function (inbuilt function) you can access each column value of rdd row with index.

for (row <- df.rdd.collect) {        var name = row.mkString(",").split(",")(0)     var sector = row.mkString(",").split(",")(1)     var age = row.mkString(",").split(",")(2)    } 

Note that there are two drawback of this approach.
1. If there is a , in the column value, data will be wrongly split to adjacent column.
2. rdd.collect is an action that returns all the data to the driver's memory where driver's memory might not be that much huge to hold the data, ending up with getting the application failed.

I would recommend to use Approach 1.

Approach 3 - Using where and select

You can directly use where and select which will internally loop and finds the data. Since it should not throws Index out of bound exception, an if condition is used

if(df.where($"name" === "Andy").select(col("name")).collect().length >= 1)     name = df.where($"name" === "Andy").select(col("name")).collect()(0).get(0).toString 

Approach 4 - Using temp tables

You can register dataframe as temptable which will be stored in spark's memory. Then you can use a select query as like other database to query the data and then collect and save in a variable

df.registerTempTable("student") name = sqlContext.sql("select name from student where name='Andy'").collect()(0).toString().replace("[","").replace("]","") 
like image 152
Sarath KS Avatar answered Sep 18 '22 23:09

Sarath KS


You can convert Row to Seq with toSeq. Once turned to Seq you can iterate over it as usual with foreach, map or whatever you need

    sqlDF.foreach { row =>             row.toSeq.foreach{col => println(col) }     } 

Output:

Berta bbb 30 Joe Andy aaa 20 ccc 40 
like image 43
SCouto Avatar answered Sep 17 '22 23:09

SCouto