Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pyspark: spark-submit not working like CLI

I have a pyspark to load data from a TSV file and save it as parquet file as well save it as a persistent SQL table.

When I run it line by line through pyspark CLI, it works exactly like expected. When I run it as as an application using spark-submit it runs without any errors but I get strange results: 1. the data is overwritten instead of appended. 2. When I run SQL queries against it I get no data returned even though the parquet files are several gigabytes in size (what I expect). Any suggestions?

Code:

from pyspark import SparkContext, SparkConf
from pyspark.sql.types import *
from pyspark.sql.functions import *

csv_file = '/srv/spark/data/input/ipfixminute2018-03-28.tsv'
parquet_dir = '/srv/spark/data/parquet/ipfixminute'

sc = SparkContext(appName='import-ipfixminute')
spark = SQLContext(sc)

fields = [StructField('time_stamp', TimestampType(), True),
                StructField('subscriberId', StringType(), True),
                StructField('sourceIPv4Address', StringType(), True),
                StructField('destinationIPv4Address', StringType(), True),
                StructField('service',StringType(), True),
                StructField('baseService',StringType(), True),
                StructField('serverHostname', StringType(), True),
                StructField('rat', StringType(), True),
                StructField('userAgent', StringType(), True),
                StructField('accessPoint', StringType(), True),
                StructField('station', StringType(), True),
                StructField('device', StringType(), True),
                StructField('contentCategories', StringType(), True),
                StructField('incomingOctets', LongType(), True),
                StructField('outgoingOctets', LongType(), True),
                StructField('incomingShapingDrops', IntegerType(), True),
                StructField('outgoingShapingDrops', IntegerType(), True),
                StructField('qoeIncomingInternal', DoubleType(), True),
                StructField('qoeIncomingExternal', DoubleType(), True),
                StructField('qoeOutgoingInternal', DoubleType(), True),
                StructField('qoeOutgoingExternal', DoubleType(), True),
                StructField('incomingShapingLatency', DoubleType(), True),
                StructField('outgoingShapingLatency', DoubleType(), True),
                StructField('internalRtt', DoubleType(), True),
                StructField('externalRtt', DoubleType(), True),
                StructField('HttpUrl',StringType(), True)]

schema = StructType(fields)
df = spark.read.load(csv_file, format='csv',sep='\t',header=True,schema=schema,timestampFormat='yyyy-MM-dd HH:mm:ss')
df = df.drop('all')
df = df.withColumn('date',to_date('time_stamp'))
df.write.saveAsTable('test2',mode='append',partitionBy='date',path=parquet_dir)
like image 794
Mikhail Venkov Avatar asked May 22 '18 14:05

Mikhail Venkov


People also ask

How do I run PySpark using spark submit?

Spark Submit Python FileApache Spark binary comes with spark-submit.sh script file for Linux, Mac, and spark-submit. cmd command file for windows, these scripts are available at $SPARK_HOME/bin directory which is used to submit the PySpark file with . py extension (Spark with python) to the cluster.

What is the difference between spark submit and PySpark?

There is no practical difference between these two. If not configured otherwise both will execute code in a local mode. If master is configured (either by --master command line parameter or spark. master configuration) corresponding cluster will be used to execute the program.

What happens when we submit a spark submit?

Once you do a Spark submit, a driver program is launched and this requests for resources to the cluster manager and at the same time the main program of the user function of the user processing program is initiated by the driver program.

How do I run spark submit in shell?

You can find spark-submit script in bin directory of the Spark distribution. $ ./bin/spark-submit Usage: spark-submit [options] <app jar | python file> [app arguments] Usage: spark-submit --kill [submission ID] --master [spark://...] Usage: spark-submit --status [submission ID] --master [spark://...]


1 Answers

As @user8371915 suggested it is similar to this:

Spark can access Hive table from pyspark but not from spark-submit

I needed to replace

from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

with

from pyspark.sql import HiveContext

sqlContext = HiveContext(sc)

This resolved this issue.

like image 172
Mikhail Venkov Avatar answered Oct 04 '22 17:10

Mikhail Venkov