Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark program takes a really long time to complete execution

Spark is supposed to finish data processing at lightening speed. But I guess I'm not using the right functionalities for my program to make Spark work that way.

This is how my program looks like:

from pyspark import SparkContext
from pyspark import SQLContext
from pyspark.sql.types import *
from pyspark.sql import Row
from pyparsing import re
import time

start_time = time.time()
sc = SparkContext("local","test")
sqlContext = SQLContext(sc)

def func1(lines):
    for line in lines:
        qtype = re.search("qtype=(\S+)",line)
        try:
            url = re.search(" url=(\S+)",line)
            url=url.group(1)
        except:
            url="null"

        time = re.search("(^\S+)",line)
        .... #extracting field names
        row = Row(time = time.group(1),ttms = ttms.group(1),chi = chi.group(1),pssc = pssc.group(1),cqhm = cqhm.group(1),rtype = rtype.group(1),rdetails = rdetails.group(1),rurl = rurl.group(1),qtype = qtype.group(1),phn = phn.group(1),fqdn = fqdn,url = url)

    return row

file = sc.textFile("C:\\Logs\\filename.log").cache()
result = file.map(lambda x: x.split("\n"))
lines = result.map(func1).collect()
df = sqlContext.createDataFrame(lines)
df.registerTempTable("df")
df.show()

for line in df :
    query1 = sqlContext.sql("SELECT qtype,rtype,COUNT(qtype,rtype) from df GROUP BY qtype,rtype").collect()
    query2 = sqlContext.sql("SELECT qtype, COUNT(qtype) from df GROUP BY qtype").collect()
    query3 = sqlContext.sql("SELECT rtype, COUNT(rtype) from df GROUP BY rtype").collect()
    query4 = sqlContext.sql("SELECT rtype, COUNT(rtype) from df WHERE qtype = \"HTTP\" GROUP BY rtype").collect()

df1 = sqlContext.createDataFrame(query1)
df2 = sqlContext.createDataFrame(query2)
df3 = sqlContext.createDataFrame(query3)
df4 = sqlContext.createDataFrame(query4)
df1.toPandas().to_csv("C:\\Sample logs\\sqlcsv1.csv")
df2.toPandas().to_csv("C:\\Sample logs\\sqlcsv2.csv")
df3.toPandas().to_csv("C:\\Sample logs\\sqlcsv3.csv")
df4.toPandas().to_csv("C:\\Sample logs\\sqlcsv4.csv")
print(time.time() - start_time)

This program takes almost 200 seconds to execute which is a very long time. I can't figure out the reason. (My log file contains around 34k log lines). I tried using spark's filter for regex expressions but I get the error that rdd is not iterable. So I need to know in what way can I optimize my program to make it run faster. Also, I get the warning that stage x contains task of very large size. Tried broadcasting lines but gave an error.

like image 851
kavya Avatar asked Nov 09 '16 07:11

kavya


1 Answers

some reasons why this spark code will run slower vs. pure python code:

1) using one machine

sc = SparkContext("local","test")

python-spark might perform better than pure python when running on a cluster. in "local" mode, spark has no advantage on pure python.

2) using "cache" when it isn't used

file = sc.textFile("C:\\Logs\\filename.log").cache()

.cache() should be used only when the same object is called more than once. "file" is called only once - i.e. no need for cache

3) using "collect()"

lines = result.map(func1).collect()

for line in df :
    query1 = sqlContext.sql("SELECT qtype,rtype,COUNT(qtype,rtype) from df GROUP BY qtype,rtype").collect()
    query2 = sqlContext.sql("SELECT qtype, COUNT(qtype) from df GROUP BY qtype").collect()
    query3 = sqlContext.sql("SELECT rtype, COUNT(rtype) from df GROUP BY rtype").collect()
    query4 = sqlContext.sql("SELECT rtype, COUNT(rtype) from df WHERE qtype = \"HTTP\" GROUP BY rtype").collect()

A general rule - avoid using "collect()" unless it is really really needed.

4) using "toPandas()"

df1.toPandas().to_csv("C:\\Sample logs\\sqlcsv1.csv")
df2.toPandas().to_csv("C:\\Sample logs\\sqlcsv2.csv")
df3.toPandas().to_csv("C:\\Sample logs\\sqlcsv3.csv")
df4.toPandas().to_csv("C:\\Sample logs\\sqlcsv4.csv")

"toPandas()" implementation start with execution of "collect()" (see #3)

Since spark 2.0, you can write dataframe to CSV directly:

http://spark.apache.org/docs/2.0.1/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter

>>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))

5) I'm not sure that I understand the below code:

for line in df :
    query1 = sqlContext.sql("SELECT qtype,rtype,COUNT(qtype,rtype) from df GROUP BY qtype,rtype").collect()
    query2 = sqlContext.sql("SELECT qtype, COUNT(qtype) from df GROUP BY qtype").collect()
    query3 = sqlContext.sql("SELECT rtype, COUNT(rtype) from df GROUP BY rtype").collect()
    query4 = sqlContext.sql("SELECT rtype, COUNT(rtype) from df WHERE qtype = \"HTTP\" GROUP BY rtype").collect()

what do you try to achieve in "for line in df:" ?

If the dataframe holds 100,000 rows, do you plan to execute this "for-loop" 100,000 times?

It seems like the variables query1, query2, query3, query4 will hold only the result of the last execution of the for-loop (as their value seems to be override every time you read a new "line" from "df") - is it by purpose?

6) You can create Dataframe directly from RDD

e.g. using

sqlContext.createDataFrame

http://spark.apache.org/docs/2.0.1/api/python/pyspark.sql.html

createDataFrame(**data, schema=None, samplingRatio=None, verifySchema=True) **Creates a DataFrame from an RDD, a list or a pandas.DataFrame.

or

RDD.toDF()

http://spark.apache.org/docs/2.0.1/api/python/pyspark.sql.html

toDF(*cols)

Returns a new class:DataFrame that with new specified column names Parameters: cols – list of new column names (string)

>>> df.toDF('f1', 'f2').collect()
[Row(f1=2, f2=u'Alice'), Row(f1=5, f2=u'Bob')]
like image 188
Yaron Avatar answered Sep 20 '22 03:09

Yaron