I have reading records from a kafka source to mydataframe spark dataframe.
I want to pick some column from the row and do some operation. So to check if I am getting the correct index, I tried to print the index in the statement println(row.getFieldIndex(pathtoDesiredColumnFromSchema)) as shown below:
val pathtoDesiredColumnFromSchema = "data.root.column1.column2.field"
val myQuery = mydataframe.writeStream.foreach(new ForeachWriter[Row]() {
override def open(partitionId: Long, version: Long): Boolean = true
override def process(row: Row): Unit = {
println(row.getFieldIndex(pathtoDesiredColumnFromSchema))
}
override def close(errorOrNull: Throwable): Unit = {}
}).outputMode("append").start()
But the above code says that row has only one name as data, and there is no column name data.root.column1.column2.field.
What is the correct way to get columns values from the spark sql row by name paths?
You may use chain of getAs invocations for struct types, for example:
val df = spark.range(1,5).toDF.withColumn("time", current_timestamp())
.union(spark.range(5,10).toDF.withColumn("time", current_timestamp()))
.groupBy(window($"time", "1 millisecond")).count
df.printSchema
root
|-- window: struct (nullable = true)
| |-- start: timestamp (nullable = true)
| |-- end: timestamp (nullable = true)
|-- count: long (nullable = false)
df.take(1).head
.getAs[org.apache.spark.sql.Row]("window")
.getAs[java.sql.Timestamp]("start")
Hope it helps!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With