I have Spark job which does some processing on ORC data and stores back ORC data using DataFrameWriter save() API introduced in Spark 1.4.0. I have the following piece of code which is using heavy shuffle memory. How do I optimize below code? Is there anything wrong with it? It is working fine as expected only causing slowness because of GC pause and shuffles lots of data so hitting memory issues. I am new to Spark.
JavaRDD<Row> updatedDsqlRDD = orderedFrame.toJavaRDD().coalesce(1, false).map(new Function<Row, Row>() {
@Override
public Row call(Row row) throws Exception {
List<Object> rowAsList;
Row row1 = null;
if (row != null) {
rowAsList = iterate(JavaConversions.seqAsJavaList(row.toSeq()));
row1 = RowFactory.create(rowAsList.toArray());
}
return row1;
}
}).union(modifiedRDD);
DataFrame updatedDataFrame = hiveContext.createDataFrame(updatedDsqlRDD,renamedSourceFrame.schema());
updatedDataFrame.write().mode(SaveMode.Append).format("orc").partitionBy("entity", "date").save("baseTable");
As per suggestion I tried to convert above code into the following using mapPartitionsWithIndex
() but I still see data shuffling it is better than above code but still it fails by hitting GC limit and throws OOM or goes into GC pause for long and timeout and YARN will kill executor.
I am using spark.storage.memoryFraction as 0.5 and spark.shuffle.memoryFraction as 0.4; I tried to use default and changed many combinations, but nothing helped.
JavaRDD<Row> indexedRdd = sourceRdd.cache().mapPartitionsWithIndex(new Function2<Integer, Iterator<Row>, Iterator<Row>>() {
@Override
public Iterator<Row> call(Integer ind, Iterator<Row> rowIterator) throws Exception {
List<Row> rowList = new ArrayList<>();
while (rowIterator.hasNext()) {
Row row = rowIterator.next();
List<Object> rowAsList = iterate(JavaConversions.seqAsJavaList(row.toSeq()));
Row updatedRow = RowFactory.create(rowAsList.toArray());
rowList.add(updatedRow);
}
return rowList.iterator();
}
}, true).coalesce(200,true);
Coalescing an RDD or Dataframe to a single partition means that all your processing is happening on a single machine. This is not a good thing for a variety of reasons: all of the data has to be shuffled across the network, there is no more parallelism, etc. Instead you should look at other operators like reduceByKey, mapPartitions, or really pretty much anything besides coalescing the data to a single machine.
Note: looking are your code I don't see why you are bringing it down to a single machine, you can probably just remove that part.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With