I have a pyspark to load data from a TSV file and save it as parquet file as well save it as a persistent SQL table.
When I run it line by line through pyspark CLI, it works exactly like expected. When I run it as as an application using spark-submit it runs without any errors but I get strange results: 1. the data is overwritten instead of appended. 2. When I run SQL queries against it I get no data returned even though the parquet files are several gigabytes in size (what I expect). Any suggestions?
Code:
from pyspark import SparkContext, SparkConf
from pyspark.sql.types import *
from pyspark.sql.functions import *
csv_file = '/srv/spark/data/input/ipfixminute2018-03-28.tsv'
parquet_dir = '/srv/spark/data/parquet/ipfixminute'
sc = SparkContext(appName='import-ipfixminute')
spark = SQLContext(sc)
fields = [StructField('time_stamp', TimestampType(), True),
StructField('subscriberId', StringType(), True),
StructField('sourceIPv4Address', StringType(), True),
StructField('destinationIPv4Address', StringType(), True),
StructField('service',StringType(), True),
StructField('baseService',StringType(), True),
StructField('serverHostname', StringType(), True),
StructField('rat', StringType(), True),
StructField('userAgent', StringType(), True),
StructField('accessPoint', StringType(), True),
StructField('station', StringType(), True),
StructField('device', StringType(), True),
StructField('contentCategories', StringType(), True),
StructField('incomingOctets', LongType(), True),
StructField('outgoingOctets', LongType(), True),
StructField('incomingShapingDrops', IntegerType(), True),
StructField('outgoingShapingDrops', IntegerType(), True),
StructField('qoeIncomingInternal', DoubleType(), True),
StructField('qoeIncomingExternal', DoubleType(), True),
StructField('qoeOutgoingInternal', DoubleType(), True),
StructField('qoeOutgoingExternal', DoubleType(), True),
StructField('incomingShapingLatency', DoubleType(), True),
StructField('outgoingShapingLatency', DoubleType(), True),
StructField('internalRtt', DoubleType(), True),
StructField('externalRtt', DoubleType(), True),
StructField('HttpUrl',StringType(), True)]
schema = StructType(fields)
df = spark.read.load(csv_file, format='csv',sep='\t',header=True,schema=schema,timestampFormat='yyyy-MM-dd HH:mm:ss')
df = df.drop('all')
df = df.withColumn('date',to_date('time_stamp'))
df.write.saveAsTable('test2',mode='append',partitionBy='date',path=parquet_dir)
Spark Submit Python FileApache Spark binary comes with spark-submit.sh script file for Linux, Mac, and spark-submit. cmd command file for windows, these scripts are available at $SPARK_HOME/bin directory which is used to submit the PySpark file with . py extension (Spark with python) to the cluster.
There is no practical difference between these two. If not configured otherwise both will execute code in a local mode. If master is configured (either by --master command line parameter or spark. master configuration) corresponding cluster will be used to execute the program.
Once you do a Spark submit, a driver program is launched and this requests for resources to the cluster manager and at the same time the main program of the user function of the user processing program is initiated by the driver program.
You can find spark-submit script in bin directory of the Spark distribution. $ ./bin/spark-submit Usage: spark-submit [options] <app jar | python file> [app arguments] Usage: spark-submit --kill [submission ID] --master [spark://...] Usage: spark-submit --status [submission ID] --master [spark://...]
As @user8371915 suggested it is similar to this:
Spark can access Hive table from pyspark but not from spark-submit
I needed to replace
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
with
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
This resolved this issue.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With