Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Error - Max iterations (100) reached for batch Resolution

I am working on Spark SQL where I need to find out Diff between two large CSV's.

Diff should give:-

  • Inserted Rows or new Record // Comparing only Id's

  • Changed Rows (Not include inserted ones) - Comparing all column values

  • Deleted rows // Comparing only Id's

Spark 2.4.4 + Java

I am using Databricks to Read/Write CSV

Dataset<Row> insertedDf = newDf_temp.join(oldDf_temp,oldDf_temp.col(key)
                .equalTo(newDf_temp.col(key)),"left_anti");
Long insertedCount = insertedDf.count();
logger.info("Inserted File Count == "+insertedCount);


Dataset<Row> deletedDf = oldDf_temp.join(newDf_temp,oldDf_temp.col(key)
                .equalTo(newDf_temp.col(key)),"left_anti")
                .select(oldDf_temp.col(key));
Long deletedCount = deletedDf.count();
logger.info("deleted File Count == "+deletedCount);


Dataset<Row> changedDf = newDf_temp.exceptAll(oldDf_temp); // This gives rows (New +changed Records)

Dataset<Row> changedDfTemp = changedDf.join(insertedDf, changedDf.col(key)
                .equalTo(insertedDf.col(key)),"left_anti"); // This gives only changed record

Long changedCount = changedDfTemp.count();
logger.info("Changed File Count == "+changedCount);

This works well for CSV with columns upto 50 or so.

The Above code fails for one row in CSV with 300+columns, so I am sure this is not file Size problem.

But if I have a CSV having 300+ Columns then it fails with Exception

Max iterations (100) reached for batch Resolution – Spark Error

If I set the below property in Spark, It Works!!!

sparkConf.set("spark.sql.optimizer.maxIterations", "500");

But my question is why do I have to set this?

Is there something wrong which I am doing? Or this behaviour is expected for CSV's which have large columns.

Can I optimize it in any way to handle Large column CSV's.

like image 946
Art Avatar asked Dec 07 '25 07:12

Art


1 Answers

The issue you are running into is related to how spark takes the instructions you tell it and transforms that into the actual things it's going to do. It first needs to understand your instructions by running Analyzer, then it tries to improve them by running its optimizer. The setting appears to apply to both.

Specifically your code is bombing out during a step in the Analyzer. The analyzer is responsible for figuring out when you refer to things what things you are actually referring to. For example, mapping function names to implementations or mapping column names across renames, and different transforms. It does this in multiple passes resolving additional things each pass, then checking again to see if it can resolve move.

I think what is happening for your case is each pass probably resolves one column, but 100 passes isn't enough to resolve all of the columns. By increasing it you are giving it enough passes to be able to get entirely through your plan. This is definitely a red flag for a potential performance issue, but if your code is working then you can probably just increase the value and not worry about it.

If it isn't working, then you will probably need to try to do something to reduce the number of columns used in your plan. Maybe combining all the columns into one encoded string column as the key. You might benefit from checkpointing the data before doing the join so you can shorten your plan.

EDIT:

Also, I would refactor your above code so you could do it all with only one join. This should be a lot faster, and might solve your other problem.

Each join leads to a shuffle (data being sent between compute nodes) which adds time to your job. Instead of computing adds, deletes and changes independently, you can just do them all at once. Something like the below code. It's in scala psuedo code because I'm more familiar with that than the Java APIs.

import org.apache.spark.sql.functions._

var oldDf = ..
var newDf = ..
val changeCols = newDf.columns.filter(_ != "id").map(col)

// Make the columns you want to compare into a single struct column for easier comparison
newDf = newDF.select($"id", struct(changeCols:_*) as "compare_new")
oldDf = oldDF.select($"id", struct(changeCols:_*) as "compare_old")

// Outer join on ID
val combined = oldDF.join(newDf, Seq("id"), "outer")

// Figure out status of each based upon presence of old/new
//  IF old side is missing, must be an ADD
//  IF new side is missing, must be a DELETE
//  IF both sides present but different, it's a CHANGE
//  ELSE it's NOCHANGE
val status = when($"compare_new".isNull, lit("add")).
             when($"compare_old".isNull, lit("delete")).
             when($"$compare_new" != $"compare_old", lit("change")).
             otherwise(lit("nochange"))

val labeled = combined.select($"id", status)

At this point, we have every ID labeled ADD/DELETE/CHANGE/NOCHANGE so we can just a groupBy/count. This agg can be done almost entirely map side so it will be a lot faster than a join.

labeled.groupBy("status").count.show
like image 155
Ryan Widmaier Avatar answered Dec 08 '25 22:12

Ryan Widmaier