Hy,
I have a dataframe in a sparkcontext with 400k rows and 3 columns. Driver has 143.5 of Storage Memory
16/03/21 19:52:35 INFO BlockManagerMasterEndpoint: Registering block manager localhost:55613 with 143.5 GB RAM, BlockManagerId(driver, localhost, 55613)
16/03/21 19:52:35 INFO BlockManagerMaster: Registered BlockManager
I want returns the contents of this DataFrame as Pandas
I did
df_users = UserDistinct.toPandas()
but I have this error
16/03/21 20:01:08 ERROR Executor: Exception in task 7.0 in stage 6.0 (TID 439)
java.lang.OutOfMemoryError
at java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
at java.io.ByteArrayOutputStream.grow(Unknown Source)
at java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
at java.io.ByteArrayOutputStream.write(Unknown Source)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown Source)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown Source)
at java.io.ObjectOutputStream.writeObject0(Unknown Source)
at java.io.ObjectOutputStream.writeObject(Unknown Source)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:239)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
16/03/21 20:01:08 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-0,5,main]
java.lang.OutOfMemoryError
at java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
at java.io.ByteArrayOutputStream.grow(Unknown Source)
at java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
at java.io.ByteArrayOutputStream.write(Unknown Source)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown Source)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown Source)
at java.io.ObjectOutputStream.writeObject0(Unknown Source)
at java.io.ObjectOutputStream.writeObject(Unknown Source)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:239)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
How is this possible if I have 143.5 GB RAM? What can I do?
EDIT
My spark-defaults
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Default system properties included when running spark-submit.
# This is useful for setting default environmental settings.
# Example:
# spark.master spark://master:7077
#spark.eventLog.enabled true
# spark.eventLog.dir hdfs://namenode:8021/directory
# spark.serializer org.apache.spark.serializer.KryoSerializer
spark.driver.memory 200g
# spark.executor.extraJavaOptions -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"
my spark context
conf = SparkConf()
conf.set("spark.app.name","teste")
conf.set("spark.driver.maxResultSize","0")
sc = SparkContext(conf=conf)
EDIT
All steps
#import data for a pandas dataframe
df_ora = pd.read_sql(query, con=connection)
#change for Spark dataframe and some transformation
sqlContext = SQLContext(sc)
df_oraAS = sqlContext.createDataFrame(df_ora)
df_oraAS.registerTempTable("df_oraAS")
#new column
df_with_C = df_oraAS.withColumn("BUY", lit(1))
indexer = StringIndexer(inputCol="ENT_EMAIL", outputCol="user")
#index because I want use recommendation system
user_PK = indexer.fit(df_with_C).transform(df_with_C)
#distinct
UserDistinct = user_PK.dropDuplicates(['ENT_EMAIL' ,'user'])
#data in Pandas dataframe
df_users = UserDistinct.toPandas()
New Edit
change for Driver 60g and Executor 60g
Error:
16/03/22 09:53:40 INFO MemoryStore: Block taskresult_446 stored as bytes in memory (estimated size 1978.5 MB, free 22.5 GB)
16/03/22 09:53:40 INFO BlockManagerInfo: Added taskresult_446 in memory on localhost:56281 (size: 1978.5 MB, free: 20.4 GB)
16/03/22 09:53:40 INFO Executor: Finished task 14.0 in stage 6.0 (TID 446). 2074557399 bytes result sent via BlockManager)
16/03/22 09:53:40 INFO TaskSetManager: Starting task 25.0 in stage 6.0 (TID 457, localhost, partition 25,NODE_LOCAL, 1999 bytes)
16/03/22 09:53:40 INFO Executor: Running task 25.0 in stage 6.0 (TID 457)
16/03/22 09:53:40 INFO ShuffleBlockFetcherIterator: Getting 8 non-empty blocks out of 8 blocks
16/03/22 09:53:40 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
16/03/22 09:53:40 INFO ShuffleBlockFetcherIterator: Getting 8 non-empty blocks out of 8 blocks
16/03/22 09:53:40 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
16/03/22 09:54:04 ERROR Executor: Exception in task 18.0 in stage 6.0 (TID 450)
java.lang.OutOfMemoryError
at java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
at java.io.ByteArrayOutputStream.grow(Unknown Source)
at java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
at java.io.ByteArrayOutputStream.write(Unknown Source)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown Source)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown Source)
at java.io.ObjectOutputStream.writeObject0(Unknown Source)
at java.io.ObjectOutputStream.writeObject(Unknown Source)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:239)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
16/03/22 09:54:04 INFO TaskSetManager: Starting task 26.0 in stage 6.0 (TID 458, localhost, partition 26,NODE_LOCAL, 1999 bytes)
16/03/22 09:54:04 INFO Executor: Running task 26.0 in stage 6.0 (TID 458)
16/03/22 09:54:04 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-5,5,main]
java.lang.OutOfMemoryError
at java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
at java.io.ByteArrayOutputStream.grow(Unknown Source)
at java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
at java.io.ByteArrayOutputStream.write(Unknown Source)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown Source)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown Source)
at java.io.ObjectOutputStream.writeObject0(Unknown Source)
at java.io.ObjectOutputStream.writeObject(Unknown Source)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:239)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
16/03/22 09:54:05 INFO ShuffleBlockFetcherIterator: Getting 8 non-empty blocks out of 8 blocks
16/03/22 09:54:05 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
16/03/22 09:54:05 INFO SparkContext: Invoking stop() from shutdown hook
16/03/22 09:54:06 WARN QueuedThreadPool: 6 threads could not be stopped
16/03/22 09:54:06 INFO SparkUI: Stopped Spark web UI at http://10.10.5.105:4040
16/03/22 09:54:08 INFO DAGScheduler: ResultStage 6 (toPandas at <stdin>:1) failed in 385.120 s
16/03/22 09:54:08 INFO DAGScheduler: Job 3 failed: toPandas at <stdin>:1, took 398.921433 s
16/03/22 09:54:09 ERROR Utils: Uncaught exception in thread task-result-getter-1
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(Unknown Source)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(Unknown Source)
at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202)
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:102)
at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:588)
at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:585)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:585)
at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:578)
at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:70)
at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Exception in thread "task-result-getter-1" java.lang.Error: java.lang.InterruptedException
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(Unknown Source)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(Unknown Source)
at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202)
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:102)
at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:588)
at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:585)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:585)
at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:578)
at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:70)
at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
... 3 more
16/03/22 09:54:09 ERROR Utils: Uncaught exception in thread task-result-getter-2
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(Unknown Source)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(Unknown Source)
at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202)
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:102)
at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:588)
at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:585)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:585)
at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:578)
at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:70)
at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "C:/Apache/spark-1.6.0/python/pyspark\sql\dataframe.py", line 1378, in toPandas
return pd.DataFrame.from_records(self.collect(), columns=self.columns)
File "C:/Apache/spark-1.6.0/python/pyspark\sql\dataframe.py", line 280, in collect
port = self._jdf.collectToPython()
File "C:\Users\user\Anaconda\lib\site-packages\py4j\java_gateway.py", line 813, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "C:/Apache/spark-1.6.0/python/pyspark\sql\utils.py", line 45, in deco
return f(*a, **kw)
File "C:\Users\user\Anaconda\lib\site-packages\py4j\protocol.py", line 308, in get_return_value
format(target_id, ".", name), value)
You can resolve it by setting the partition size: increase the value of spark. sql. shuffle. partitions.
Prevention: If MaxMetaSpaceSize, has been set on the command line, increase its value. MetaSpace is allocated from the same address spaces as the Java heap. Reducing the size of the Java heap will make more space available for MetaSpace.
lang. OutOfMemoryError exception. Usually, this error is thrown when there is insufficient space to allocate an object in the Java heap. In this case, The garbage collector cannot make space available to accommodate a new object, and the heap cannot be expanded further.
For some reason Spark wants to serialize some data. Apparently it does so by writing to a ByteArrayOutputStream. From the docs:
This class implements an output stream in which the data is written into a byte array. The buffer automatically grows as data is written to it. The data can be retrieved using toByteArray() and toString().
The key word here is a (one!) byte array. Java byte arrays have a maximum length of 2^31-1=2147483647
bytes = 2GB. So as soon as Spark attempts to serialize anything that's greater than 2GB, you'll get an OutOfMemoryError
.
And that's exactly what happened here.
To solve this issue, file a bug report with Spark. The culprit is org.apache.spark.serializer.JavaSerializerInstance.serialize()
, which assumes that nothing you ever want to serialize can be larger than 2GB in its serialized form.
I am assuming the storage you are referring to is disk space.
What is happening is that your application is running out of RAM; not disk space.
OutOfMemoryError
is covered extensively in this Stackoverflow Question
By default, only so much memory is allocated to your driver in executor. Usually around 500MB - 5GB. If you are running spark locally, you will need to adjust the driver memory.
The Spark Documentation - Memory Management details all of the parameters/options you can configure. These can be found in:
$SPARK_HOME/conf/spark-defaults.conf
Try adjusting your driver-memory
in that file.
However, if you are running your application using spark-submit
, you can pass the driver-memory as an option like so:
spark-1.6.1/bin/spark-submit
--class "MyClass"
--driver-memory 12g
--master local[*]
target/scala-2.10/simple-project_2.10-1.0.jar
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With