Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

java.lang.OutOfMemoryError in pyspark

Hy,

I have a dataframe in a sparkcontext with 400k rows and 3 columns. Driver has 143.5 of Storage Memory

16/03/21 19:52:35 INFO BlockManagerMasterEndpoint: Registering block manager localhost:55613 with 143.5 GB RAM, BlockManagerId(driver, localhost, 55613)
16/03/21 19:52:35 INFO BlockManagerMaster: Registered BlockManager

I want returns the contents of this DataFrame as Pandas

I did

df_users =  UserDistinct.toPandas()

but I have this error

16/03/21 20:01:08 ERROR Executor: Exception in task 7.0 in stage 6.0 (TID 439)
java.lang.OutOfMemoryError
    at java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
    at java.io.ByteArrayOutputStream.grow(Unknown Source)
    at java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
    at java.io.ByteArrayOutputStream.write(Unknown Source)
    at java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown Source)
    at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown Source)
    at java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.io.ObjectOutputStream.writeObject(Unknown Source)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:239)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
16/03/21 20:01:08 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-0,5,main]
java.lang.OutOfMemoryError
    at java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
    at java.io.ByteArrayOutputStream.grow(Unknown Source)
    at java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
    at java.io.ByteArrayOutputStream.write(Unknown Source)
    at java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown Source)
    at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown Source)
    at java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.io.ObjectOutputStream.writeObject(Unknown Source)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:239)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)

How is this possible if I have 143.5 GB RAM? What can I do?

EDIT

My spark-defaults

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Default system properties included when running spark-submit.
# This is useful for setting default environmental settings.

# Example:
# spark.master                     spark://master:7077
#spark.eventLog.enabled           true
# spark.eventLog.dir               hdfs://namenode:8021/directory
# spark.serializer                 org.apache.spark.serializer.KryoSerializer
spark.driver.memory              200g
# spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"

my spark context

conf = SparkConf()

conf.set("spark.app.name","teste")
conf.set("spark.driver.maxResultSize","0")

sc = SparkContext(conf=conf)

enter image description here

EDIT

All steps

#import data for a pandas dataframe

df_ora = pd.read_sql(query, con=connection)

#change for Spark dataframe and some transformation

sqlContext = SQLContext(sc)
df_oraAS = sqlContext.createDataFrame(df_ora)
df_oraAS.registerTempTable("df_oraAS")

#new column
df_with_C = df_oraAS.withColumn("BUY", lit(1))

indexer = StringIndexer(inputCol="ENT_EMAIL", outputCol="user")

#index because I want use recommendation system
user_PK = indexer.fit(df_with_C).transform(df_with_C)

#distinct
UserDistinct = user_PK.dropDuplicates(['ENT_EMAIL' ,'user'])

#data in Pandas dataframe
df_users =  UserDistinct.toPandas()

New Edit

change for Driver 60g and Executor 60g

Error:

16/03/22 09:53:40 INFO MemoryStore: Block taskresult_446 stored as bytes in memory (estimated size 1978.5 MB, free 22.5 GB)
16/03/22 09:53:40 INFO BlockManagerInfo: Added taskresult_446 in memory on localhost:56281 (size: 1978.5 MB, free: 20.4 GB)
16/03/22 09:53:40 INFO Executor: Finished task 14.0 in stage 6.0 (TID 446). 2074557399 bytes result sent via BlockManager)
16/03/22 09:53:40 INFO TaskSetManager: Starting task 25.0 in stage 6.0 (TID 457, localhost, partition 25,NODE_LOCAL, 1999 bytes)
16/03/22 09:53:40 INFO Executor: Running task 25.0 in stage 6.0 (TID 457)
16/03/22 09:53:40 INFO ShuffleBlockFetcherIterator: Getting 8 non-empty blocks out of 8 blocks
16/03/22 09:53:40 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
16/03/22 09:53:40 INFO ShuffleBlockFetcherIterator: Getting 8 non-empty blocks out of 8 blocks
16/03/22 09:53:40 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
16/03/22 09:54:04 ERROR Executor: Exception in task 18.0 in stage 6.0 (TID 450)
java.lang.OutOfMemoryError
    at java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
    at java.io.ByteArrayOutputStream.grow(Unknown Source)
    at java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
    at java.io.ByteArrayOutputStream.write(Unknown Source)
    at java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown Source)
    at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown Source)
    at java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.io.ObjectOutputStream.writeObject(Unknown Source)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:239)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
16/03/22 09:54:04 INFO TaskSetManager: Starting task 26.0 in stage 6.0 (TID 458, localhost, partition 26,NODE_LOCAL, 1999 bytes)
16/03/22 09:54:04 INFO Executor: Running task 26.0 in stage 6.0 (TID 458)
16/03/22 09:54:04 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-5,5,main]
java.lang.OutOfMemoryError
    at java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
    at java.io.ByteArrayOutputStream.grow(Unknown Source)
    at java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
    at java.io.ByteArrayOutputStream.write(Unknown Source)
    at java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown Source)
    at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown Source)
    at java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.io.ObjectOutputStream.writeObject(Unknown Source)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:239)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
16/03/22 09:54:05 INFO ShuffleBlockFetcherIterator: Getting 8 non-empty blocks out of 8 blocks
16/03/22 09:54:05 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
16/03/22 09:54:05 INFO SparkContext: Invoking stop() from shutdown hook
16/03/22 09:54:06 WARN QueuedThreadPool: 6 threads could not be stopped
16/03/22 09:54:06 INFO SparkUI: Stopped Spark web UI at http://10.10.5.105:4040
16/03/22 09:54:08 INFO DAGScheduler: ResultStage 6 (toPandas at <stdin>:1) failed in 385.120 s
16/03/22 09:54:08 INFO DAGScheduler: Job 3 failed: toPandas at <stdin>:1, took 398.921433 s
16/03/22 09:54:09 ERROR Utils: Uncaught exception in thread task-result-getter-1
java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(Unknown Source)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(Unknown Source)
    at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202)
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:107)
    at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:102)
    at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:588)
    at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:585)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:585)
    at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:578)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:70)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Exception in thread "task-result-getter-1" java.lang.Error: java.lang.InterruptedException
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(Unknown Source)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(Unknown Source)
    at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202)
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:107)
    at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:102)
    at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:588)
    at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:585)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:585)
    at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:578)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:70)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
    ... 3 more
16/03/22 09:54:09 ERROR Utils: Uncaught exception in thread task-result-getter-2
java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(Unknown Source)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(Unknown Source)
    at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202)
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:107)
    at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:102)
    at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:588)
    at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:585)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:585)
    at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:578)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:70)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "C:/Apache/spark-1.6.0/python/pyspark\sql\dataframe.py", line 1378, in toPandas
    return pd.DataFrame.from_records(self.collect(), columns=self.columns)
  File "C:/Apache/spark-1.6.0/python/pyspark\sql\dataframe.py", line 280, in collect
    port = self._jdf.collectToPython()
  File "C:\Users\user\Anaconda\lib\site-packages\py4j\java_gateway.py", line 813, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "C:/Apache/spark-1.6.0/python/pyspark\sql\utils.py", line 45, in deco
    return f(*a, **kw)
  File "C:\Users\user\Anaconda\lib\site-packages\py4j\protocol.py", line 308, in get_return_value
    format(target_id, ".", name), value)
like image 419
Kardu Avatar asked Mar 21 '16 20:03

Kardu


People also ask

How do I fix memory error in Pyspark?

You can resolve it by setting the partition size: increase the value of spark. sql. shuffle. partitions.

How do you stop Java Lang OutOfMemoryError?

Prevention: If MaxMetaSpaceSize, has been set on the command line, increase its value. MetaSpace is allocated from the same address spaces as the Java heap. Reducing the size of the Java heap will make more space available for MetaSpace.

What causes Java Lang OutOfMemoryError?

lang. OutOfMemoryError exception. Usually, this error is thrown when there is insufficient space to allocate an object in the Java heap. In this case, The garbage collector cannot make space available to accommodate a new object, and the heap cannot be expanded further.


2 Answers

For some reason Spark wants to serialize some data. Apparently it does so by writing to a ByteArrayOutputStream. From the docs:

This class implements an output stream in which the data is written into a byte array. The buffer automatically grows as data is written to it. The data can be retrieved using toByteArray() and toString().

The key word here is a (one!) byte array. Java byte arrays have a maximum length of 2^31-1=2147483647 bytes = 2GB. So as soon as Spark attempts to serialize anything that's greater than 2GB, you'll get an OutOfMemoryError.

And that's exactly what happened here.

To solve this issue, file a bug report with Spark. The culprit is org.apache.spark.serializer.JavaSerializerInstance.serialize(), which assumes that nothing you ever want to serialize can be larger than 2GB in its serialized form.

like image 178
Hendrik Avatar answered Nov 15 '22 09:11

Hendrik


I am assuming the storage you are referring to is disk space.

What is happening is that your application is running out of RAM; not disk space.

OutOfMemoryError is covered extensively in this Stackoverflow Question

By default, only so much memory is allocated to your driver in executor. Usually around 500MB - 5GB. If you are running spark locally, you will need to adjust the driver memory.

The Spark Documentation - Memory Management details all of the parameters/options you can configure. These can be found in:

$SPARK_HOME/conf/spark-defaults.conf

Try adjusting your driver-memory in that file.

However, if you are running your application using spark-submit, you can pass the driver-memory as an option like so:

spark-1.6.1/bin/spark-submit
  --class "MyClass"
  --driver-memory 12g
  --master local[*] 
  target/scala-2.10/simple-project_2.10-1.0.jar 
like image 22
Brian Avatar answered Nov 15 '22 09:11

Brian