Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to get columns from an org.apache.spark.sql row by name?

I have reading records from a kafka source to mydataframe spark dataframe. I want to pick some column from the row and do some operation. So to check if I am getting the correct index, I tried to print the index in the statement println(row.getFieldIndex(pathtoDesiredColumnFromSchema)) as shown below:

val pathtoDesiredColumnFromSchema = "data.root.column1.column2.field"
val myQuery = mydataframe.writeStream.foreach(new ForeachWriter[Row]() {

  override def open(partitionId: Long, version: Long): Boolean = true
  override def process(row: Row): Unit = {
    println(row.getFieldIndex(pathtoDesiredColumnFromSchema))
  }

  override def close(errorOrNull: Throwable): Unit = {}
}).outputMode("append").start()

But the above code says that row has only one name as data, and there is no column name data.root.column1.column2.field.

What is the correct way to get columns values from the spark sql row by name paths?

like image 291
user3243499 Avatar asked Oct 24 '25 02:10

user3243499


1 Answers

You may use chain of getAs invocations for struct types, for example:

val df = spark.range(1,5).toDF.withColumn("time", current_timestamp())
.union(spark.range(5,10).toDF.withColumn("time", current_timestamp()))
.groupBy(window($"time", "1 millisecond")).count


df.printSchema
root
 |-- window: struct (nullable = true)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- count: long (nullable = false)

df.take(1).head
          .getAs[org.apache.spark.sql.Row]("window")
          .getAs[java.sql.Timestamp]("start")

Hope it helps!

like image 59
Mikhail Dubkov Avatar answered Oct 26 '25 01:10

Mikhail Dubkov