Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

java.lang.UnsupportedOperationExceptionfieldIndex on a Row without schema is undefined: Exception on row.getAs[String]

The following code is throwing an Exception Caused by: java.lang.UnsupportedOperationException: fieldIndex on a Row without schema is undefined. This is happening when a on a dataframe that has been returned after a groupByKey and flatMap invocation on a dataframe using ExpressionEncoder, groupedByKey and a flatMap is invoked.

Logical flow: originalDf->groupByKey->flatMap->groupByKey->flatMap->show

   import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{ IntegerType, StructField, StructType}

import scala.collection.mutable.ListBuffer



  object Test {

    def main(args: Array[String]): Unit = {

      val values = List(List("1", "One") ,List("1", "Two") ,List("2", "Three"),List("2","4")).map(x =>(x(0), x(1)))
      val session = SparkSession.builder.config("spark.master", "local").getOrCreate
      import session.implicits._
      val dataFrame = values.toDF


      dataFrame.show()
      dataFrame.printSchema()

      val newSchema = StructType(dataFrame.schema.fields
        ++ Array(
        StructField("Count", IntegerType, false)
      )
      )

      val expr = RowEncoder.apply(newSchema)

      val tranform =  dataFrame.groupByKey(row => row.getAs[String]("_1")).flatMapGroups((key, inputItr) => {
        val inputSeq = inputItr.toSeq

        val length = inputSeq.size
        var listBuff = new ListBuffer[Row]()
        var counter : Int= 0
        for(i <- 0 until(length))
        {
          counter+=1

        }

        for(i <- 0 until length ) {
          var x = inputSeq(i)
          listBuff += Row.fromSeq(x.toSeq ++ Array[Int](counter))
        }
        listBuff.iterator
      })(expr)

      tranform.show

      val newSchema1 = StructType(tranform.schema.fields
        ++ Array(
        StructField("Count1", IntegerType, false)
      )
      )
      val expr1 = RowEncoder.apply(newSchema1)
      val tranform2 =  tranform.groupByKey(row => row.getAs[String]("_1")).flatMapGroups((key, inputItr) => {
        val inputSeq = inputItr.toSeq

        val length = inputSeq.size
        var listBuff = new ListBuffer[Row]()
        var counter : Int= 0
        for(i <- 0 until(length))
        {
          counter+=1

        }

        for(i <- 0 until length ) {
          var x = inputSeq(i)
          listBuff += Row.fromSeq(x.toSeq ++ Array[Int](counter))
        }
        listBuff.iterator
      })(expr1)

      tranform2.show
    }
}

Following is the stacktrace

18/11/21 19:39:03 WARN TaskSetManager: Lost task 144.0 in stage 11.0 (TID 400, localhost, executor driver): java.lang.UnsupportedOperationException: fieldIndex on a Row without schema is undefined.
at org.apache.spark.sql.Row$class.fieldIndex(Row.scala:342)
at org.apache.spark.sql.catalyst.expressions.GenericRow.fieldIndex(rows.scala:166)
at org.apache.spark.sql.Row$class.getAs(Row.scala:333)
at org.apache.spark.sql.catalyst.expressions.GenericRow.getAs(rows.scala:166)
at com.quantuting.sparkutils.main.Test$$anonfun$4.apply(Test.scala:59)
at com.quantuting.sparkutils.main.Test$$anonfun$4.apply(Test.scala:59)
at org.apache.spark.sql.execution.AppendColumnsWithObjectExec$$anonfun$9$$anonfun$apply$3.apply(objects.scala:300)
at org.apache.spark.sql.execution.AppendColumnsWithObjectExec$$anonfun$9$$anonfun$apply$3.apply(objects.scala:298)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

How to fix this code?

like image 673
Bay Max Avatar asked Nov 20 '18 16:11

Bay Max


People also ask

What is Java unsupportedoperationexception?

What is java.lang.UnsupportedOperationException? java.lang.UnsupportedOperationException is thrown to denote that the requested operation is not supported by the underlying collection object. The List object returned by the asList method of the Arrays class is unmodifiable.

What is unsupportedoperationexception in Salesforce?

The UnsupportedOperationException indicates that the requested operation cannot be performed, due to the fact that it is forbidden for that particular class. The following methods create unmodifiable views of different collections: Returns an unmodifiable view of the specified Collection.

What is RuntimeException in Java?

This exception extends the RuntimeException class and thus, belongs to those exceptions that can be thrown during the operation of the Java Virtual Machine (JVM). It is an unchecked exception and thus, it does not need to be declared in a method’s or a constructor’s throws clause.

Why does the remove method of an iterator throw unsupportedoperationexception?

The remove method of an Iterator class may throw UnsupportedOperationException if the iterator is obtained from an unmodifiable List object (like given in the above example) and the method is called while iterating over the list. 1 2 3


2 Answers

The reported problem could be avoided by replacing the fieldname version of getAs[T] method (used in the function for groupByKey):

groupByKey(row => row.getAs[String]("_1"))

with the field-position version:

groupByKey(row => row.getAs[String](fieldIndexMap("_1")))

where fieldIndexMap maps field names to their corresponding field indexes:

val fieldIndexMap = tranform.schema.fieldNames.zipWithIndex.toMap

As a side note, your function for flatMapGroups can be simplified into something like below:

val tranform2 = tranform.groupByKey(_.getAs[String](fieldIndexMap("_1"))).
  flatMapGroups((key, inputItr) => {
    val inputSeq = inputItr.toSeq
    val length = inputSeq.size
    inputSeq.map(r => Row.fromSeq(r.toSeq :+ length))
  })(expr1)

The inconsistent behavior between applying the original groupByKey/flatMapGroups methods to "dataFrame" versus "tranform" is apparently related to how the methods handle a DataFrame versus a Dataset[Row].

like image 166
Leo C Avatar answered Nov 24 '22 09:11

Leo C


Solution as received from JIRA on Spark project: https://issues.apache.org/jira/browse/SPARK-26436

This issue is caused by how you create the row:

listBuff += Row.fromSeq(x.toSeq ++ Array[Int](counter))

Row.fromSeq creates a GenericRow and GenericRow's fieldIndex is not implemented because GenericRow doesn't have schema.

Changing the line to create GenericRowWithSchema can solve it:

listBuff += new GenericRowWithSchema((x.toSeq ++ Array[Int](counter)).toArray, newSchema)
like image 41
Bay Max Avatar answered Nov 24 '22 08:11

Bay Max