I have a JavaPairRDD<Integer, Integer[]>
on which I want to perform a groupByKey
action.
The groupByKey
action gives me a:
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle
which is practically an OutOfMemory error, if I am not mistaken. This occurs only in big datasets (in my case when "Shuffle Write" shown in the Web UI is ~96GB).
I have set:
spark.serializer org.apache.spark.serializer.KryoSerializer
in $SPARK_HOME/conf/spark-defaults.conf
, but I am not sure if Kryo is used to serialize my JavaPairRDD.
Is there something else that I should do to use Kryo, apart from setting this conf parameter, to serialize my RDD? I can see in the serialization instructions that:
Spark automatically includes Kryo serializers for the many commonly-used core Scala classes covered in the AllScalaRegistrar from the Twitter chill library.
and that:
Since Spark 2.0.0, we internally use Kryo serializer when shuffling RDDs with simple types, arrays of simple types, or string type.
I also noticed that when I set spark.serializer to be Kryo, the Shuffle Write in the Web UI increases from ~96GB (with default serializer) to 243GB!
EDIT: In a comment, I was asked about the logic of my program, in case groupByKey can be replaced with reduceByKey. I don't think it's possible, but here it is anyway:
Input has the form:
The shuffle write operation produces pairs in the form:
The groupByKey
operation gathers all the neighbor arrays of each entity, some possibly appearing more than once (in many buckets).
After the groupByKey
operation, I keep a weight for each bucket (based on the number of negative entity ids it contains) and for each neighbor id I sum up the weights of the buckets it belongs to.
I normalize the scores of each neighbor id with another value (let's say it's given) and emit the top-3 neighbors per entity.
The number of distinct keys that I get is around 10 million (around 5 million positive entity ids and 5 million negatives).
EDIT2: I tried using Hadoop's Writables (VIntWritable and VIntArrayWritable extending ArrayWritable) instead of Integer and Integer[], respectively, but the shuffle size was still bigger than the default JavaSerializer.
Then I increased the spark.shuffle.memoryFraction
from 0.2 to 0.4 (even if deprecated in version 2.1.0, there is no description of what should be used instead) and enabled offHeap memory, and the shuffle size was reduced by ~20GB. Even if this does what the title asks, I would prefer a more algorithmic solution, or one that includes a better compression.
Short Answer: Use fastutil and maybe increase spark.shuffle.memoryFraction
.
More details:
The problem with this RDD is that Java needs to store Object
references, which consume much more space than primitive types. In this example, I need to store Integer
s, instead of int
values. A Java Integer
takes 16 bytes, while a primitive Java int
takes 4 bytes. Scala's Int
type, on the other hand, is a 32-bit (4-byte) type, just like Java's int
, that's why people using Scala may not have faced something similar.
Apart from increasing the spark.shuffle.memoryFraction
to 0.4, another nice solution was to use the fastutil library, as suggest in Spark's tuning documentation:
The first way to reduce memory consumption is to avoid the Java features that add overhead, such as pointer-based data structures and wrapper objects. There are several ways to do this: Design your data structures to prefer arrays of objects, and primitive types, instead of the standard Java or Scala collection classes (e.g. HashMap). The fastutil library provides convenient collection classes for primitive types that are compatible with the Java standard library.
This enables storing each element in int array of my RDD pair as an int
type (i.e., using 4 bytes instead of 16 for each element of the array). In my case, I used IntArrayList
instead of Integer[]
. This made the shuffle size drop significantly and allowed my program to run in the cluster. I also used this library in other parts of the code, where I was making some temporary Map structures. Overall, by increasing spark.shuffle.memoryFraction
to 0.4 and using fastutil library, shuffle size dropped from 96GB to 50GB (!) using the default Java serializer (not Kryo).
Alternative: I have also tried sorting each int array of an rdd pair and storing the deltas using Hadoop's VIntArrayWritable type (smaller numbers use less space than bigger numbers), but this also required to register VIntWritable and VIntArrayWritable in Kryo, which didn't save any space after all. In general, I think that Kryo only makes things work faster, but does not decrease the space needed, but I am not still sure about that.
I am not marking this answer as accepted yet, because someone else might have a better idea, and because I didn't use Kryo after all, as my OP was asking. I hope reading it, will help someone else with the same issue. I will update this answer, if I manage to further reduce the shuffle size.
Still not really sure what you want to do. However, because you use groupByKey
and say that there is no way to do it by using reduceByKey
, it makes me more confused.
I think you have rdd = (Integer, Integer[])
and you want something like (Integer, Iterable[Integer[]])
that's why you are using groupByKey
.
Anyway, I am not really familiar with Java in Spark, but in Scala I would use reduceByKey
to avoid the shuffle by
rdd.mapValues(Iterable(_)).reduceByKey(_++_)
. Basically, you want to convert the value to a list of array and then combine the list together.
I think the best approach that can be recommended here (without more specific knowledge of the input data) in general is to use the persist API on your input RDD.
As step one, I'd try to call .persist(MEMORY_ONLY_SER)
on the input, RDD to lower memory usage (albeit at a certain CPU overhead, that shouldn't be that much of a problem for int
s in your case).
If that is not sufficient you can try out .persist(MEMORY_AND_DISK_SER)
or if your shuffle still takes so much memory that the input dataset needs to be made easier on the memory .persist(DISK_ONLY)
may be an option, but one that will strongly deteriorate performance.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With