Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark program gives odd results when ran on standalone cluster

I have this spark program and I'll try to limit it to just the pertinent parts

# Split by delimiter ,
# If the file is in unicode, we need to convert each value to a float in order to be able to 
# treat it as a number
points = sc.textFile(filename).map(lambda line: [float(x) for x in line.split(",")]).persist()

# start with K randomly selected points from the dataset
# A centroid cannot be an actual data point or else the distance measure between a point and 
# that centroid will be zero. This leads to an undefined membership value into that centroid.
centroids = points.takeSample(False, K, 34)
#print centroids
# Initialize our new centroids
newCentroids = [[] for k in range(K)]
tempCentroids = []
for centroid in centroids:
    tempCentroids.append([centroid[N] + 0.5])
#centroids = sc.broadcast(tempCentroids)

convergence = False

ncm = NCM()

while(not convergence):
    memberships = points.map(lambda p : (p, getMemberships([p[N]], centroids.value, m)))
    cmax = memberships.map(lambda (p, mus) : (p, getCMax(mus, centroids.value)))
    # Memberships
    T = cmax.map(lambda (p, c) : (p, getMemberships2([p[N]], centroids.value, m, delta, weight1, weight2, weight3, c)))
    I = cmax.map(lambda (p, c) : (p, getIndeterminateMemberships([p[N]], centroids.value, m, delta, weight1, weight2,  weight3, c)[0]))
    F = cmax.map(lambda (p, c) : (p, getFalseMemberships([p[N]], centroids.value, m, delta, weight1,  weight2, weight3, c)[0]))
    # Components of new centroids
    wTm = T.map(lambda (x, t) : ('onekey', scalarPow(m, scalarMult(weight1, t))))
    #print "wTm = " + str(wTm.collect())
    print "at first reduce"
    sumwTm = wTm.reduceByKey(lambda p1, p2 : addPoints(p1, p2))
    #print "sumwTm = " + str(sumwTm.collect())
    wTmx = T.map(lambda (x, t) : pointMult([x[N]], scalarPow(m, scalarMult(weight1, t))))
    print "adding to cnumerator list"
    #print wTmx.collect()
    cnumerator = wTmx.flatMap(lambda p: getListComponents(p)).reduceByKey(lambda p1, p2 : p1 + p2).values()
    print "collected cnumerator, now printing"    
    #print "cnumerator = " + str(cnumerator.collect())
    #print str(sumwTm.collect())
    # Calculate the new centroids
    sumwTmCollection = sumwTm.collect()[0][1]
    cnumeratorCollection = cnumerator.collect()
    #print "sumwTmCollection = " + str(sumwTmCollection)
    #cnumeratorCollection =cnumerator.collectAsMap().get(0).items
    print "cnumeratorCollection = " + str(cnumeratorCollection)
    for i in range(len(newCentroids)):
        newCentroids[i] = scalarMult(1 / sumwTmCollection[i], [cnumeratorCollection[i]])
    centroids = newCentroids
    # Test for convergence
    convergence = ncm.test([centroids[N]], [newCentroids[N]], epsilon)

    #convergence = True 
    # Replace our old centroids with the newly found centroids and repeat if convergence not met
    # Clear out space for a new set of centroids
    newCentroids = [[] for k in range(K)]

This program works pretty well on my local machine, however, it does not behave as expected when run on a standalone cluster. It doesn't necessarily throw an error, but what it does do it give different output than that which I receive when running on my local machine. The cluster and the 3 nodes seem to be working fine. I have a feeling the problem is that I keep updating centroids, which is a python list, and it changes each time through the while-loop. Is it possible that each node may not have the most recent copy of that list? I think so so I tried using a broadcast variable but those can't be updated (read only). I also tried using an accumulator but those are just for accumulations. I also tried to save the python lists as a file on hdfs for each node to have access to, but this didn't work well. Do you think I'm understanding the problem correctly? Is something else likely going on here? How can I get code that works fine on my local machine, but not on a cluster?

like image 519
Matt Cremeens Avatar asked Apr 23 '16 21:04

Matt Cremeens


People also ask

How do I know if Spark cluster is working?

Click on the HDFS Web UI. A new web page is opened to show the Hadoop DFS (Distributed File System) health status. Click on the Spark Web UI. Another web page is opened showing the spark cluster and job status.

Can Spark only run on a cluster?

No, but if you run on a cluster, you will need some form of shared file system (for example, NFS mounted at the same path on each node). If you have this type of filesystem, you can just deploy Spark in standalone mode.

How do I make a Spark standalone cluster?

Once we are done with setting basic network configuration, we need to set Apache Spark environment by installing binaries, dependencies and adding system path to Apache Spark directory as well as python directory to run Shell scripts provided in bin directory of Spark to start clusters.


1 Answers

Thank you for all of the time and attention to this problem, especially since it sounds like I could have posted more information to make your jobs easier. The problem here is in

centroids = points.takeSample(False, K, 34)

I didn't realize this, but after a short experiment, this function returns the same output each and every time, despite being what I thought was a random sample. As long as you use the same seed (34 in this case), you will get the same RDD in return. The RDD on my cluster was different for some reason than the one returned to my local machine. In any case, since it was the same RDD each time, my output never changed. The problem with the "random" centroids returned to me is that these particular ones gave rise to something like a saddle point in mathematics, where no convergence of the centroids would be found. This part of the answer is mathematical and a programming one, so I won't mention it further. My real hope at this point is that others are helped by the notion that if you want

centroids = points.takeSample(False, K, 34)

to produce different samples each time it is called, that you change your seed each time to some random number.

I hope this all helps. I've never before spent so much time on a solution to my memory.

Thanks again.

like image 148
Matt Cremeens Avatar answered Sep 26 '22 03:09

Matt Cremeens