I read in from parquet file(s) from different folders, take e.g. February this year (one folder = one day)
indata = sqlContext.read.parquet('/data/myfolder/201602*')
do some very simple grouping and aggregation
outdata = indata.groupby(...).agg()
and want to store again.
outdata.write.parquet(outloc)
Here is how I run the script from bash:
spark-submit
--master yarn-cluster
--num-executors 16
--executor-cores 4
--driver-memory 8g
--executor-memory 16g
--files /etc/hive/conf/hive-site.xml
--driver-java-options
-XX:MaxPermSize=512m
spark_script.py
This generates multiple jobs (is that the right term?). First job runs successfully. Subsequent jobs fail with the following error:
Traceback (most recent call last):
File "spark_generate_maps.py", line 184, in <module>
outdata.write.parquet(outloc)
File "/opt/cloudera/parcels/CDH-5.9.0-1.cdh5.9.0.p0.23/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 471, in parquet
File "/opt/cloudera/parcels/CDH-5.9.0-1.cdh5.9.0.p0.23/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
File "/opt/cloudera/parcels/CDH-5.9.0-1.cdh5.9.0.p0.23/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 51, in deco
pyspark.sql.utils.AnalysisException: u'path OBFUSCATED_PATH_THAT_I_CLEANED_BEFORE_SUBMIT already exists.;'
When I give only one folder as input, this works fine.
So it seems the first job creates the folder, all subsequent jobs fail to write into that folder. Why?
just in case this could help anybody:
imports:
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import HiveContext
from pyspark.sql.types import *
from pyspark.sql.functions import udf, collect_list, countDistinct, count
import pyspark.sql.functions as func
from pyspark.sql.functions import lit
import numpy as np
import sys
import math
config:
conf = SparkConf().setAppName('spark-compute-maps').setMaster('yarn-cluster')
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)
Your question is "why does Spark iterate on input folders, but applies the default write
mode, that does not make sense in that context".
Quoting the Spark V1.6 Python API...
mode(saveMode)
Specifies the behavior when data or table already exists.
Options include:
append Append contents of this DataFrame to existing data.
overwrite Overwrite existing data.
error Throw an exception if data already exists.
ignore Silently ignore this operation if data already exists.
I think outdata.write.mode('append').parquet(outloc)
is worth a try.
You should add mode option in your code.
outdata.write.mode('append').parquet(outloc)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With