Spark is supposed to finish data processing at lightening speed. But I guess I'm not using the right functionalities for my program to make Spark work that way.
This is how my program looks like:
from pyspark import SparkContext
from pyspark import SQLContext
from pyspark.sql.types import *
from pyspark.sql import Row
from pyparsing import re
import time
start_time = time.time()
sc = SparkContext("local","test")
sqlContext = SQLContext(sc)
def func1(lines):
for line in lines:
qtype = re.search("qtype=(\S+)",line)
try:
url = re.search(" url=(\S+)",line)
url=url.group(1)
except:
url="null"
time = re.search("(^\S+)",line)
.... #extracting field names
row = Row(time = time.group(1),ttms = ttms.group(1),chi = chi.group(1),pssc = pssc.group(1),cqhm = cqhm.group(1),rtype = rtype.group(1),rdetails = rdetails.group(1),rurl = rurl.group(1),qtype = qtype.group(1),phn = phn.group(1),fqdn = fqdn,url = url)
return row
file = sc.textFile("C:\\Logs\\filename.log").cache()
result = file.map(lambda x: x.split("\n"))
lines = result.map(func1).collect()
df = sqlContext.createDataFrame(lines)
df.registerTempTable("df")
df.show()
for line in df :
query1 = sqlContext.sql("SELECT qtype,rtype,COUNT(qtype,rtype) from df GROUP BY qtype,rtype").collect()
query2 = sqlContext.sql("SELECT qtype, COUNT(qtype) from df GROUP BY qtype").collect()
query3 = sqlContext.sql("SELECT rtype, COUNT(rtype) from df GROUP BY rtype").collect()
query4 = sqlContext.sql("SELECT rtype, COUNT(rtype) from df WHERE qtype = \"HTTP\" GROUP BY rtype").collect()
df1 = sqlContext.createDataFrame(query1)
df2 = sqlContext.createDataFrame(query2)
df3 = sqlContext.createDataFrame(query3)
df4 = sqlContext.createDataFrame(query4)
df1.toPandas().to_csv("C:\\Sample logs\\sqlcsv1.csv")
df2.toPandas().to_csv("C:\\Sample logs\\sqlcsv2.csv")
df3.toPandas().to_csv("C:\\Sample logs\\sqlcsv3.csv")
df4.toPandas().to_csv("C:\\Sample logs\\sqlcsv4.csv")
print(time.time() - start_time)
This program takes almost 200 seconds to execute which is a very long time. I can't figure out the reason. (My log file contains around 34k log lines). I tried using spark's filter
for regex expressions but I get the error that rdd is not iterable
. So I need to know in what way can I optimize my program to make it run faster.
Also, I get the warning that stage x contains task of very large size
. Tried broadcasting lines
but gave an error.
some reasons why this spark code will run slower vs. pure python code:
1) using one machine
sc = SparkContext("local","test")
python-spark might perform better than pure python when running on a cluster. in "local" mode, spark has no advantage on pure python.
2) using "cache" when it isn't used
file = sc.textFile("C:\\Logs\\filename.log").cache()
.cache() should be used only when the same object is called more than once. "file" is called only once - i.e. no need for cache
3) using "collect()"
lines = result.map(func1).collect()
for line in df :
query1 = sqlContext.sql("SELECT qtype,rtype,COUNT(qtype,rtype) from df GROUP BY qtype,rtype").collect()
query2 = sqlContext.sql("SELECT qtype, COUNT(qtype) from df GROUP BY qtype").collect()
query3 = sqlContext.sql("SELECT rtype, COUNT(rtype) from df GROUP BY rtype").collect()
query4 = sqlContext.sql("SELECT rtype, COUNT(rtype) from df WHERE qtype = \"HTTP\" GROUP BY rtype").collect()
A general rule - avoid using "collect()" unless it is really really needed.
4) using "toPandas()"
df1.toPandas().to_csv("C:\\Sample logs\\sqlcsv1.csv")
df2.toPandas().to_csv("C:\\Sample logs\\sqlcsv2.csv")
df3.toPandas().to_csv("C:\\Sample logs\\sqlcsv3.csv")
df4.toPandas().to_csv("C:\\Sample logs\\sqlcsv4.csv")
"toPandas()" implementation start with execution of "collect()" (see #3)
Since spark 2.0, you can write dataframe to CSV directly:
http://spark.apache.org/docs/2.0.1/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter
>>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
5) I'm not sure that I understand the below code:
for line in df :
query1 = sqlContext.sql("SELECT qtype,rtype,COUNT(qtype,rtype) from df GROUP BY qtype,rtype").collect()
query2 = sqlContext.sql("SELECT qtype, COUNT(qtype) from df GROUP BY qtype").collect()
query3 = sqlContext.sql("SELECT rtype, COUNT(rtype) from df GROUP BY rtype").collect()
query4 = sqlContext.sql("SELECT rtype, COUNT(rtype) from df WHERE qtype = \"HTTP\" GROUP BY rtype").collect()
what do you try to achieve in "for line in df:" ?
If the dataframe holds 100,000 rows, do you plan to execute this "for-loop" 100,000 times?
It seems like the variables query1, query2, query3, query4 will hold only the result of the last execution of the for-loop (as their value seems to be override every time you read a new "line" from "df") - is it by purpose?
6) You can create Dataframe directly from RDD
e.g. using
sqlContext.createDataFrame
http://spark.apache.org/docs/2.0.1/api/python/pyspark.sql.html
createDataFrame(**data, schema=None, samplingRatio=None, verifySchema=True) **Creates a DataFrame from an RDD, a list or a pandas.DataFrame.
or
RDD.toDF()
http://spark.apache.org/docs/2.0.1/api/python/pyspark.sql.html
toDF(*cols)
Returns a new class:DataFrame that with new specified column names Parameters: cols – list of new column names (string)
>>> df.toDF('f1', 'f2').collect()
[Row(f1=2, f2=u'Alice'), Row(f1=5, f2=u'Bob')]
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With