The following simple spark program takes 4 minutes to run. I don't know what's wrong with this code.
First, I generate a VERY small rdd
D = spark.sparkContext.parallelize([(0,[1,2,3]),(1,[2,3]),(2,[0,3]),(3,[1])]).cache()
Then I generate a vector
P1 = spark.sparkContext.parallelize(list(zip(list(range(4)),[1/4]*4))).cache()
Then I defines a function to do the map step
def MyFun(x):
L0 = len(x[2])
L = []
for i in x[2]:
L.append((i,x[1]/L0))
return L
Then I execute the following code
P0 = P1
D0 = D.join(P1).map(lambda x: [x[0],x[1][1],x[1][0]]).cache()
C0 = D0.flatMap(lambda x: MyFun(x)).cache()
P1 = C0.reduceByKey(lambda x,y:x+y).mapValues(lambda x:x*1.2+3.4).sortByKey().cache()
Diff = P1.join(P0).map(lambda x: abs(x[1][0]-x[1][1])).sum()
Given my data is so small, I couldn't figure out a reason why this piece of code runs so slow...
I have a few suggestions to help you speed up this job.
The process of caching is to write the dag you created on the disk. So caching every step might cost a lot instead of speeding up the process.
I would suggest you to cache only P1.
Afterwards, I strongly suggest you to use the DataFrame api, Spark will be able to do some optimizations for you, such as push down predicates optimizations.
The last, but not the least, using custom functions cost a lot as well. If you are using DataFrames, try to use only existing functions from the org.apache.spark.sql.functions module.
I also suggest to profile your code via the Spark UI, because it might not be a problem of your code since you have a small data, but a problem with the nodes.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With