Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Spark java heap space error during matrix multiplication

I'm using Spark 2.0.1 with two workers (one executor each) with 20Gb each. And run following code:

JavaRDD<MatrixEntry> entries = ...; // filing the data
CoordinateMatrix cmatrix = new CoordinateMatrix(entries.rdd());
BlockMatrix matrix = cmatrix.toBlockMatrix(100, 1000);
BlockMatrix cooc = matrix.transpose().multiply(matrix);

My matrix contains 10 000 000 non-empty cells (each equals to 1.0) and has approx. 3000 columns. Not so big data. But during multiplications I always get:

17/01/24 08:03:10 WARN TaskMemoryManager: leak 1322.6 MB memory from org.apache.spark.util.collection.ExternalAppendOnlyMap@649e7019
17/01/24 08:03:10 ERROR Executor: Exception in task 1.0 in stage 57.0 (TID 83664)
java.lang.OutOfMemoryError: Java heap space
        at org.apache.spark.mllib.linalg.DenseMatrix$.zeros(Matrices.scala:453)
        at org.apache.spark.mllib.linalg.Matrix$class.multiply(Matrices.scala:101)
        at org.apache.spark.mllib.linalg.SparseMatrix.multiply(Matrices.scala:565)
        at org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$23$$anonfun$apply$9$$anonfun$apply$11.apply(BlockMatrix.scala:483)
        at org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$23$$anonfun$apply$9$$anonfun$apply$11.apply(BlockMatrix.scala:480)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.immutable.List.map(List.scala:285)
        at org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$23$$anonfun$apply$9.apply(BlockMatrix.scala:480)
        at org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$23$$anonfun$apply$9.apply(BlockMatrix.scala:479)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at org.apache.spark.util.collection.CompactBuffer$$anon$1.foreach(CompactBuffer.scala:115)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at org.apache.spark.util.collection.CompactBuffer.foreach(CompactBuffer.scala:30)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at org.apache.spark.util.collection.CompactBuffer.flatMap(CompactBuffer.scala:30)
        at org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$23.apply(BlockMatrix.scala:479)
        at org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$23.apply(BlockMatrix.scala:478)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
        at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
        at org.apache.spark.scheduler.Task.run(Task.scala:86)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

Now I'm even trying to use only one core per executor. What can be the problem? And how can I debug it and find root cause? Thanks.

Upd.: Details of failed stage:

org.apache.spark.rdd.RDD.flatMap(RDD.scala:374)
org.apache.spark.mllib.linalg.distributed.BlockMatrix.multiply(BlockMatrix.scala:478)
MyClass.generate(SimilarityGenerator.java:57)
MyClass.main(GenerateSimilarity.java:54)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:497)
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
like image 912
Osmin Avatar asked Apr 21 '26 14:04

Osmin


1 Answers

It appeared that sparse matrix multiplication is not implemented in a way I thought about it. Spark naturally multiplies block matrices even with zeros in almost all cells. We implemented our own multiplication. Here is Scala code (so also copied from some place):

def multiply(left: CoordinateMatrix, right: CoordinateMatrix): CoordinateMatrix = {
  val leftEntries = left.entries.map({ case MatrixEntry(i, j, v) => (j, (i, v)) })
  val rightEntries = right.entries.map({ case MatrixEntry(j, k, w) => (j, (k, w)) })

  val productEntries = leftEntries
    .join(rightEntries)
    .map({ case (_, ((i, v), (k, w))) => ((i, k), (v * w)) })
    .reduceByKey(_ + _)
    .map({ case ((i, k), sum) => MatrixEntry(i, k, sum) })

  new CoordinateMatrix(productEntries)
}
like image 138
Osmin Avatar answered Apr 24 '26 03:04

Osmin



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!