Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Getting java.lang.IllegalArgumentException: requirement failed while calling Sparks MLLIB StreamingKMeans from java application

I am new to Spark and MLlib and I'm trying to call StreamingKMeans from my java application and I get an exception that I don't seem to understand. Here is my code for transforming my training data:

JavaDStream<Vector> trainingData = sjsc.textFileStream("/training")
            .map(new Function<String, Vector>() {
                public DenseVector call(String line) throws Exception {
                    String[] lineSplit = line.split(",");

                    double[] doubleValues = new double[lineSplit.length];
                    for (int i = 0; i < lineSplit.length; i++) {
                        doubleValues[i] = Double.parseDouble(lineSplit[i] != null ? !""
                                .equals(lineSplit[i]) ? lineSplit[i] : "0" : "0");
                    }
                    DenseVector denseV = new DenseVector(doubleValues);
                    if (denseV.size() != 16) {
                        throw new Exception("All vectors are not the same size!");
                    }
                    System.out.println("Vector length is:" + denseV.size());
                    return denseV;
                }
            });

Here the code where I call the trainOn method:

int numDimensions = 18;
int numClusters = 2;
StreamingKMeans model = new StreamingKMeans();
model.setK(numClusters);
model.setDecayFactor(.5);
model.setRandomCenters(numDimensions, 0.0, Utils.random().nextLong());

model.trainOn(trainingData.dstream());

And here is the exception I receive:

java.lang.IllegalArgumentException: requirement failed
    at scala.Predef$.require(Predef.scala:221)
    at org.apache.spark.mllib.util.MLUtils$.fastSquaredDistance(MLUtils.scala:292)
    at org.apache.spark.mllib.clustering.KMeans$.fastSquaredDistance(KMeans.scala:485)
    at org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:459)
    at org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:453)
    at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:73)
    at org.apache.spark.mllib.clustering.KMeans$.findClosest(KMeans.scala:453)
    at org.apache.spark.mllib.clustering.KMeansModel.predict(KMeansModel.scala:35)
    at org.apache.spark.mllib.clustering.StreamingKMeans$$anonfun$predictOnValues$1.apply(StreamingKMeans.scala:258)
    at org.apache.spark.mllib.clustering.StreamingKMeans$$anonfun$predictOnValues$1.apply(StreamingKMeans.scala:258)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$15.apply(PairRDDFunctions.scala:674)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$15.apply(PairRDDFunctions.scala:674)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$33.apply(RDD.scala:1177)
    at org.apache.spark.rdd.RDD$$anonfun$33.apply(RDD.scala:1177)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:64)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
    at java.lang.Thread.run(Thread.java:662)

As you can see in the above code I am checking to make sure my vectors are all the same size and they appear to be, even though the error is suggesting they are not. Any help would be greatly appreciated!

like image 990
SeanB Avatar asked Jun 09 '15 16:06

SeanB


1 Answers

All the vectors are not of the same dimension could cause this exception.

In my experience, another possible reason is Vector which contains the value of NaN.

All values in the vector can not contain NaN.

like image 177
vito Avatar answered Oct 14 '22 15:10

vito