Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

pyspark 1.6.0 write to parquet gives "path exists" error

I read in from parquet file(s) from different folders, take e.g. February this year (one folder = one day)

indata = sqlContext.read.parquet('/data/myfolder/201602*')

do some very simple grouping and aggregation

outdata = indata.groupby(...).agg()

and want to store again.

outdata.write.parquet(outloc)

Here is how I run the script from bash:

spark-submit 
  --master yarn-cluster 
  --num-executors 16 
  --executor-cores 4 
  --driver-memory 8g
  --executor-memory 16g 
  --files /etc/hive/conf/hive-site.xml  
  --driver-java-options 
  -XX:MaxPermSize=512m 
  spark_script.py

This generates multiple jobs (is that the right term?). First job runs successfully. Subsequent jobs fail with the following error:

  Traceback (most recent call last):
  File "spark_generate_maps.py", line 184, in <module>
    outdata.write.parquet(outloc)
  File "/opt/cloudera/parcels/CDH-5.9.0-1.cdh5.9.0.p0.23/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 471, in parquet
  File "/opt/cloudera/parcels/CDH-5.9.0-1.cdh5.9.0.p0.23/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
  File "/opt/cloudera/parcels/CDH-5.9.0-1.cdh5.9.0.p0.23/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 51, in deco
pyspark.sql.utils.AnalysisException: u'path OBFUSCATED_PATH_THAT_I_CLEANED_BEFORE_SUBMIT already exists.;'

When I give only one folder as input, this works fine.

So it seems the first job creates the folder, all subsequent jobs fail to write into that folder. Why?

just in case this could help anybody:

imports:

from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import HiveContext
from pyspark.sql.types import *
from pyspark.sql.functions import udf, collect_list, countDistinct, count
import pyspark.sql.functions as func
from pyspark.sql.functions import lit
import numpy as np
import sys
import math

config:

conf = SparkConf().setAppName('spark-compute-maps').setMaster('yarn-cluster')
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)
like image 585
mad Avatar asked Dec 09 '16 15:12

mad


2 Answers

Your question is "why does Spark iterate on input folders, but applies the default write mode, that does not make sense in that context".

Quoting the Spark V1.6 Python API...

mode(saveMode)
  Specifies the behavior when data or table already exists.
  Options include:
    append   Append contents of this DataFrame to existing data.
    overwrite Overwrite existing data.
    error       Throw an exception if data already exists.
    ignore    Silently ignore this operation if data already exists.

I think outdata.write.mode('append').parquet(outloc) is worth a try.

like image 76
Samson Scharfrichter Avatar answered Nov 10 '22 15:11

Samson Scharfrichter


You should add mode option in your code.

outdata.write.mode('append').parquet(outloc)
like image 39
Saurabh Chakraborty Avatar answered Nov 10 '22 14:11

Saurabh Chakraborty