Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

ArrayIndexOutOfBoundsException when reading csv file in spark

I'm on the second chapter of Advanced Analytics with Spark, second Edition. I'm using

val parsed = spark.read.csv("linkage")

and getting this error:

18/01/16 12:09:32 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.ArrayIndexOutOfBoundsException: 63
    at org.apache.spark.unsafe.types.UTF8String.numBytesForFirstByte(UTF8String.java:190)
    at org.apache.spark.unsafe.types.UTF8String.numChars(UTF8String.java:205)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
18/01/16 12:09:32 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.ArrayIndexOutOfBoundsException: 63
    at org.apache.spark.unsafe.types.UTF8String.numBytesForFirstByte(UTF8String.java:190)
    at org.apache.spark.unsafe.types.UTF8String.numChars(UTF8String.java:205)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

18/01/16 12:09:32 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.ArrayIndexOutOfBoundsException: 63
    at org.apache.spark.unsafe.types.UTF8String.numBytesForFirstByte(UTF8String.java:190)
    at org.apache.spark.unsafe.types.UTF8String.numChars(UTF8String.java:205)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:336)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2853)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2153)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2153)
  at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2837)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2836)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2153)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2366)
  at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.infer(CSVDataSource.scala:147)
  at org.apache.spark.sql.execution.datasources.csv.CSVDataSource.inferSchema(CSVDataSource.scala:62)
  at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.inferSchema(CSVFileFormat.scala:57)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:177)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:177)
  at scala.Option.orElse(Option.scala:289)
  at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:176)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:366)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
  at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:533)
  at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:412)
  ... 48 elided
Caused by: java.lang.ArrayIndexOutOfBoundsException: 63
  at org.apache.spark.unsafe.types.UTF8String.numBytesForFirstByte(UTF8String.java:190)
  at org.apache.spark.unsafe.types.UTF8String.numChars(UTF8String.java:205)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  at org.apache.spark.scheduler.Task.run(Task.scala:108)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:748)

At first, I thought there was a problem in my dataset, so I checked if there are the same number of commas between elements for each line of all files. There are 11 commas:

scala> import scala.io.Source
import scala.io.Source

scala> Source.fromFile("linkage/block_1.csv").getLines take 10 foreach println
"id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"
37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE
39086,47614,1,?,1,?,1,1,1,1,1,TRUE
70031,70237,1,?,1,?,1,1,1,1,1,TRUE
84795,97439,1,?,1,?,1,1,1,1,1,TRUE
36950,42116,1,?,1,1,1,1,1,1,1,TRUE
42413,48491,1,?,1,?,1,1,1,1,1,TRUE
25965,64753,1,?,1,?,1,1,1,1,1,TRUE
49451,90407,1,?,1,?,1,1,1,1,0,TRUE
39932,40902,1,?,1,?,1,1,1,1,1,TRUE

scala> val fileNames = (1 to 10) map (num => s"block_$num.csv")
fileNames: scala.collection.immutable.IndexedSeq[String] = Vector(block_1.csv, block_2.csv, block_3.csv, block_4.csv, block_5.csv, block_6.csv, block_7.csv, block_8.csv, block_9.csv, block_10.csv)

scala> fileNames foreach { fn =>
     | val lines = Source.fromFile(s"linkage/$fn").getLines.toList
     | val notMatching = lines filterNot (line => ((line count(c => c == ',')) == 11))
     | notMatching foreach println
     | println("\n")
     | }

The output of this was just "\n", so there are always 11 commas between values for each line.

I'm using these versions:

Spark version: 2.2.0
Scala version: 2.11.8
Hadoop version: 2.8.2

I'm launching the spark shell on a local machine like this:

spark-shell --master local[*] --driver-memory 2g

The linkage directory is right under where I launch the spark-shell.

Any ideas? Maybe I'm doing something stupid, as this is my first time with Spark/Scala. I'm also not familiar with Java, if that may be the issue. Thank you.

like image 870
Samir Gadkari Avatar asked Jan 16 '18 20:01

Samir Gadkari


People also ask

How do I read a CSV file in Spark context?

To read multiple CSV files in Spark, just use textFile() method on SparkContext object by passing all file names comma separated. The below example reads text01. csv & text02. csv files into single RDD.

How do I read a CSV file with delimiter in Spark?

Use spark. read. option("delimiter", "\t"). csv(file) or sep instead of delimiter .

What happens in Spark when we read a file?

Thanks for guidance but here when Spark read data/file , definitely it will store that data which it has read. So where it will store this data.. If it won't store so what is happening on reading the file.


1 Answers

This was a silly issue. I was reading like this:

val parsed = spark.read.csv("linkage")

when it should have been:

val parsed = spark.read.csv("linkage/block_*.csv")

This is because there was another csv file (frequencies.csv) which had less commas in it. This file just gives the frequencies of a number of columns, but not all.

like image 82
Samir Gadkari Avatar answered Nov 15 '22 08:11

Samir Gadkari