Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why is huge data shuffling in Spark when using union()/coalesce(1,false) on DataFrame?

I have Spark job which does some processing on ORC data and stores back ORC data using DataFrameWriter save() API introduced in Spark 1.4.0. I have the following piece of code which is using heavy shuffle memory. How do I optimize below code? Is there anything wrong with it? It is working fine as expected only causing slowness because of GC pause and shuffles lots of data so hitting memory issues. I am new to Spark.

JavaRDD<Row> updatedDsqlRDD = orderedFrame.toJavaRDD().coalesce(1, false).map(new Function<Row, Row>() {
   @Override
   public Row call(Row row) throws Exception {
        List<Object> rowAsList;
        Row row1 = null;
        if (row != null) {
          rowAsList = iterate(JavaConversions.seqAsJavaList(row.toSeq()));
          row1 = RowFactory.create(rowAsList.toArray());
        }
        return row1;
   }
}).union(modifiedRDD);
DataFrame updatedDataFrame = hiveContext.createDataFrame(updatedDsqlRDD,renamedSourceFrame.schema());
updatedDataFrame.write().mode(SaveMode.Append).format("orc").partitionBy("entity", "date").save("baseTable");

Edit

As per suggestion I tried to convert above code into the following using mapPartitionsWithIndex() but I still see data shuffling it is better than above code but still it fails by hitting GC limit and throws OOM or goes into GC pause for long and timeout and YARN will kill executor.

I am using spark.storage.memoryFraction as 0.5 and spark.shuffle.memoryFraction as 0.4; I tried to use default and changed many combinations, but nothing helped.

JavaRDD<Row> indexedRdd = sourceRdd.cache().mapPartitionsWithIndex(new Function2<Integer, Iterator<Row>, Iterator<Row>>() {
            @Override
            public Iterator<Row> call(Integer ind, Iterator<Row> rowIterator) throws Exception {
                List<Row> rowList = new ArrayList<>();

                while (rowIterator.hasNext()) {
                    Row row = rowIterator.next();
                    List<Object> rowAsList = iterate(JavaConversions.seqAsJavaList(row.toSeq()));
                    Row updatedRow = RowFactory.create(rowAsList.toArray());
                    rowList.add(updatedRow);
                }           
                return rowList.iterator();
            }
        }, true).coalesce(200,true);

              
like image 216
Umesh K Avatar asked Oct 18 '25 04:10

Umesh K


1 Answers

Coalescing an RDD or Dataframe to a single partition means that all your processing is happening on a single machine. This is not a good thing for a variety of reasons: all of the data has to be shuffled across the network, there is no more parallelism, etc. Instead you should look at other operators like reduceByKey, mapPartitions, or really pretty much anything besides coalescing the data to a single machine.

Note: looking are your code I don't see why you are bringing it down to a single machine, you can probably just remove that part.

like image 124
Holden Avatar answered Oct 20 '25 12:10

Holden