Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

spark: SAXParseException while writing to parquet on s3

I'm trying to read in some json, infer a schema, and write it out again as parquet to s3 (s3a). For some reason, about a third of the way through the writing portion of the run, spark always errors out with the error included below. I can't find any obvious reasons for the issue: it isn't out of memory; there are no long GC pauses. There don't seem to be any additional error messages in the logs of the individual executors.

The script runs fine on another set of data that I have, which is of a very similar structure, but several orders of magnitude smaller.

I am running spark 2.0.1-hadoop-2.7 and am using the FileOutputCommitter. The algorithm version doesn't seem to matter.

Edit: This does not appear to be a problem in badly formed json or corrupted files. I have unzipped and read in each file individually with no error.

Here's a simplified version of the script:

object Foo {

  def parseJson(json: String): Option[Map[String, Any]] = {
    if (json == null)
      Some(Map())
    else
      parseOpt(json).map((j: JValue) => j.values.asInstanceOf[Map[String, Any]])
      }
  }
}

// read in as text and parse json using json4s
val jsonRDD: RDD[String] = sc.textFile(inputPath)
    .map(row -> Foo.parseJson(row))

// infer a schema that will encapsulate the most rows in a sample of size sampleRowNum
val schema: StructType = Infer.getMostCommonSchema(sc, jsonRDD, sampleRowNum)

// get documents compatibility with schema
val jsonWithCompatibilityRDD: RDD[(String, Boolean)] = jsonRDD
  .map(js => (js, Infer.getSchemaCompatibility(schema, Infer.inferSchema(js)).toBoolean))
  .repartition(partitions)

val jsonCompatibleRDD: RDD[String] = jsonWithCompatibilityRDD
  .filter { case (js: String, compatible: Boolean) => compatible }
  .map { case (js: String, _: Boolean) => js }

// create a dataframe from documents with compatible schema
val dataFrame: DataFrame = spark.read.schema(schema).json(jsonCompatibleRDD)

It completes the earlier schema inferring steps successfully. The error itself occurs on the last line, but I suppose that could encompass at least the immediately preceding statemnt, if not earlier:

org.apache.spark.SparkException: Task failed while writing rows
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:261)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Failed to commit task
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.org$apache$spark$sql$execution$datasources$DefaultWriterContainer$$commitTask$1(WriterContainer.scala:275)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:257)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1345)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:258)
    ... 8 more
    Suppressed: java.lang.NullPointerException
        at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:147)
        at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:113)
        at org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:112)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetFileFormat.scala:569)
        at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.org$apache$spark$sql$execution$datasources$DefaultWriterContainer$$abortTask$1(WriterContainer.scala:282)
        at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$2.apply$mcV$sp(WriterContainer.scala:258)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1354)
        ... 9 more
Caused by: com.amazonaws.AmazonClientException: Unable to unmarshall response (Failed to parse XML document with handler class com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser$ListBucketHandler). Response Code: 200, Response Text: OK
    at com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:738)
    at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:399)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3480)
    at com.amazonaws.services.s3.AmazonS3Client.listObjects(AmazonS3Client.java:604)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:962)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.deleteUnnecessaryFakeDirectories(S3AFileSystem.java:1147)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.finishedWrite(S3AFileSystem.java:1136)
    at org.apache.hadoop.fs.s3a.S3AOutputStream.close(S3AOutputStream.java:142)
    at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
    at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
    at org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:400)
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:117)
    at org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:112)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetFileFormat.scala:569)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.org$apache$spark$sql$execution$datasources$DefaultWriterContainer$$commitTask$1(WriterContainer.scala:267)
    ... 13 more
Caused by: com.amazonaws.AmazonClientException: Failed to parse XML document with handler class com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser$ListBucketHandler
    at com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser.parseXmlInputStream(XmlResponsesSaxParser.java:150)
    at com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser.parseListBucketObjectsResponse(XmlResponsesSaxParser.java:279)
    at com.amazonaws.services.s3.model.transform.Unmarshallers$ListObjectsUnmarshaller.unmarshall(Unmarshallers.java:75)
    at com.amazonaws.services.s3.model.transform.Unmarshallers$ListObjectsUnmarshaller.unmarshall(Unmarshallers.java:72)
    at com.amazonaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:62)
    at com.amazonaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:31)
    at com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:712)
    ... 29 more
Caused by: org.xml.sax.SAXParseException; lineNumber: 1; columnNumber: 2; XML document structures must start and end within the same entity.
    at org.apache.xerces.util.ErrorHandlerWrapper.createSAXParseException(Unknown Source)
    at org.apache.xerces.util.ErrorHandlerWrapper.fatalError(Unknown Source)
    at org.apache.xerces.impl.XMLErrorReporter.reportError(Unknown Source)
    at org.apache.xerces.impl.XMLErrorReporter.reportError(Unknown Source)
    at org.apache.xerces.impl.XMLErrorReporter.reportError(Unknown Source)
    at org.apache.xerces.impl.XMLScanner.reportFatalError(Unknown Source)
    at org.apache.xerces.impl.XMLDocumentFragmentScannerImpl.endEntity(Unknown Source)
    at org.apache.xerces.impl.XMLDocumentScannerImpl.endEntity(Unknown Source)
    at org.apache.xerces.impl.XMLEntityManager.endEntity(Unknown Source)
    at org.apache.xerces.impl.XMLEntityScanner.load(Unknown Source)
    at org.apache.xerces.impl.XMLEntityScanner.skipChar(Unknown Source)
    at org.apache.xerces.impl.XMLDocumentScannerImpl$PrologDispatcher.dispatch(Unknown Source)
    at org.apache.xerces.impl.XMLDocumentFragmentScannerImpl.scanDocument(Unknown Source)
    at org.apache.xerces.parsers.XML11Configuration.parse(Unknown Source)
    at org.apache.xerces.parsers.XML11Configuration.parse(Unknown Source)
    at org.apache.xerces.parsers.XMLParser.parse(Unknown Source)
    at org.apache.xerces.parsers.AbstractSAXParser.parse(Unknown Source)
    at com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser.parseXmlInputStream(XmlResponsesSaxParser.java:141)
    ... 35 more

Here's my conf:

spark.executor.extraJavaOptions -XX:+UseG1GC -XX:MaxPermSize=1G -XX:+HeapDumpOnOutOfMemoryError
spark.executor.memory   16G
spark.executor.uri  https://s3.amazonaws.com/foo/spark-2.0.1-bin-hadoop2.7.tgz
spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem
spark.hadoop.fs.s3a.buffer.dir  /raid0/spark
spark.hadoop.fs.s3n.buffer.dir  /raid0/spark
spark.hadoop.fs.s3a.connection.timeout 500000
spark.hadoop.fs.s3n.multipart.uploads.enabled   true
spark.hadoop.parquet.block.size 2147483648
spark.hadoop.parquet.enable.summary-metadata    false
spark.jars.packages com.databricks:spark-avro_2.11:3.0.1
spark.local.dir /raid0/spark
spark.mesos.coarse  false
spark.mesos.constraints  priority:1
spark.network.timeout   600
spark.rpc.message.maxSize    500
spark.speculation   false
spark.sql.parquet.mergeSchema   false
spark.sql.planner.externalSort  true
spark.submit.deployMode client
spark.task.cpus 1
like image 903
Luke Avatar asked Oct 26 '16 19:10

Luke


3 Answers

I can think for three possible reasons for this problem.

  1. JVM version. AWS SDK checks for the following ones. "1.6.0_06", "1.6.0_13", "1.6.0_17", "1.6.0_65", "1.7.0_45". If you are using one of them, try upgrading.
  2. Old AWS SDK. Refer to https://github.com/aws/aws-sdk-java/issues/460 for a workaround.
  3. If you lots of files in the directory where you are writing these files, you might be hitting https://issues.apache.org/jira/browse/HADOOP-13164. Consider increasing the timeout to larger values.
like image 122
Rohit Karlupia Avatar answered Oct 18 '22 00:10

Rohit Karlupia


A SAXParseException may indicate a badly formatted XML file. Since the job fails roughly a third of the way through consistently, this means it's probably failing in the same place every time (a file whose partition is roughly a third of the way through the partition list).

Can you paste your script? It may be possible to wrap the Spark step in a try/catch loop that will print out the file if this error occurs, which will let you easily zoom in on the problem.

like image 1
Tim Avatar answered Oct 18 '22 00:10

Tim


From the logs:

Caused by: org.xml.sax.SAXParseException; lineNumber: 1; columnNumber: 2; XML document structures must start and end within the same entity.

and

Caused by: com.amazonaws.AmazonClientException: Failed to parse XML document with handler class com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser$ListBucketHandler

It looks like you have a corrupted/incorrectly formatted file, and your error is actually occurring during the read portion of the task. You could confirm this by trying another operation that will force the read such as count().

If confirmed, the goal would then be to find the corrupted file. You could do this by listing the s3 files, sc.parallelize() that list, and then trying to read the files in a custom function using map().

import boto3
from pyspark.sql import Row    

def scanKeys(startKey, endKey):
    bucket = boto3.resource('s3').Bucket('bucketName')
    for obj in bucket.objects.filter(Prefix='prefix', Marker=startKey):
        if obj.key < endKey:
            yield obj.key
        else:
            return

def testFile(s3Path):
    s3obj = boto3.resource('s3').Object(bucket_name='bucketName', key=key)
    body = s3obj.get()['Body']
    ...
    logic to test file format, or use a try/except and attempt to parse it
    ...
    if fileFormatedCorrectly == True:
        return Row(status='Good', key = s3Path)
    else:
        return Row(status='Fail', key = s3Path)


keys = list(scanKeys(startKey, endKey))
keyListRdd = sc.parallelize(keys, 1000)
keyListRdd.map(testFile).filter(lambda x: x.asDict.get('status') == 'Fail').collect()

This will return the s3 paths for the incorrectly formatted files

like image 1
David Avatar answered Oct 18 '22 01:10

David