I'm using Spark 2.0.1 with two workers (one executor each) with 20Gb each. And run following code:
JavaRDD<MatrixEntry> entries = ...; // filing the data
CoordinateMatrix cmatrix = new CoordinateMatrix(entries.rdd());
BlockMatrix matrix = cmatrix.toBlockMatrix(100, 1000);
BlockMatrix cooc = matrix.transpose().multiply(matrix);
My matrix contains 10 000 000 non-empty cells (each equals to 1.0) and has approx. 3000 columns. Not so big data. But during multiplications I always get:
17/01/24 08:03:10 WARN TaskMemoryManager: leak 1322.6 MB memory from org.apache.spark.util.collection.ExternalAppendOnlyMap@649e7019
17/01/24 08:03:10 ERROR Executor: Exception in task 1.0 in stage 57.0 (TID 83664)
java.lang.OutOfMemoryError: Java heap space
at org.apache.spark.mllib.linalg.DenseMatrix$.zeros(Matrices.scala:453)
at org.apache.spark.mllib.linalg.Matrix$class.multiply(Matrices.scala:101)
at org.apache.spark.mllib.linalg.SparseMatrix.multiply(Matrices.scala:565)
at org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$23$$anonfun$apply$9$$anonfun$apply$11.apply(BlockMatrix.scala:483)
at org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$23$$anonfun$apply$9$$anonfun$apply$11.apply(BlockMatrix.scala:480)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$23$$anonfun$apply$9.apply(BlockMatrix.scala:480)
at org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$23$$anonfun$apply$9.apply(BlockMatrix.scala:479)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.util.collection.CompactBuffer$$anon$1.foreach(CompactBuffer.scala:115)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at org.apache.spark.util.collection.CompactBuffer.foreach(CompactBuffer.scala:30)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at org.apache.spark.util.collection.CompactBuffer.flatMap(CompactBuffer.scala:30)
at org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$23.apply(BlockMatrix.scala:479)
at org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$23.apply(BlockMatrix.scala:478)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
Now I'm even trying to use only one core per executor. What can be the problem? And how can I debug it and find root cause? Thanks.
Upd.: Details of failed stage:
org.apache.spark.rdd.RDD.flatMap(RDD.scala:374)
org.apache.spark.mllib.linalg.distributed.BlockMatrix.multiply(BlockMatrix.scala:478)
MyClass.generate(SimilarityGenerator.java:57)
MyClass.main(GenerateSimilarity.java:54)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:497)
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
It appeared that sparse matrix multiplication is not implemented in a way I thought about it. Spark naturally multiplies block matrices even with zeros in almost all cells. We implemented our own multiplication. Here is Scala code (so also copied from some place):
def multiply(left: CoordinateMatrix, right: CoordinateMatrix): CoordinateMatrix = {
val leftEntries = left.entries.map({ case MatrixEntry(i, j, v) => (j, (i, v)) })
val rightEntries = right.entries.map({ case MatrixEntry(j, k, w) => (j, (k, w)) })
val productEntries = leftEntries
.join(rightEntries)
.map({ case (_, ((i, v), (k, w))) => ((i, k), (v * w)) })
.reduceByKey(_ + _)
.map({ case ((i, k), sum) => MatrixEntry(i, k, sum) })
new CoordinateMatrix(productEntries)
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With