Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Iterating an RDD and updating a mutable collection returns an empty collection

I am new to Scala and Spark and would like some help in understanding why the below code isn't producing my desired outcome.

I am comparing two tables

My desired output schema is:

case class DiscrepancyData(fieldKey:String, fieldName:String, val1:String, val2:String, valExpected:String)

When I run the below code step by step manually, I actually end up with my desired outcome. Which is a List[DiscrepancyData] completely populated with my desired output. However, I must be missing something in the code below because it returns an empty list (before this code gets called there are other codes that is involved in reading tables from HIVE, mapping, grouping, filtering, etc etc etc):

val compareCols  = Set(year, nominal, adjusted_for_inflation, average_private_nonsupervisory_wage)

val key = "year"

def compare(table:RDD[(String, Iterable[Row])]): List[DiscrepancyData] = {
    var discs: ListBuffer[DiscrepancyData] = ListBuffer()
    def compareFields(fieldOne:String, fieldTwo:String, colName:String, row1:Row, row2:Row): DiscrepancyData = {
        if (fieldOne != fieldTwo){
            DiscrepancyData(
                row1.getAs(key).toString, //fieldKey
                colName, //fieldName
                row1.getAs(colName).toString, //table1Value
                row2.getAs(colName).toString, //table2Value
                row2.getAs(colName).toString) //expectedValue
        }
        else null
    }
    def comparison() {
        for(row <- table){
            var elem1 = row._2.head //gets the first element in the iterable
            var elem2 = row._2.tail.head //gets the second element in the iterable

            for(col <- compareCols){
                var value1 = elem1.getAs(col).toString
                var value2 = elem2.getAs(col).toString

                var disc = compareFields(value1, value2, col, elem1, elem2)

                if (disc != null) discs += disc
            }
        }
    }

    comparison()

    discs.toList
}

I'm calling the above function as such:

var outcome = compare(groupedFiltered)

Here is the data in groupedFiltered:

(1991,CompactBuffer([1991,7.14,5.72,39%], [1991,4.14,5.72,39%]))
(1997,CompactBuffer([1997,4.88,5.86,39%], [1997,3.88,5.86,39%]))
(1999,CompactBuffer([1999,5.15,5.96,39%], [1999,5.15,5.97,38%]))
(1947,CompactBuffer([1947,0.9,2.94,35%], [1947,0.4,2.94,35%]))
(1980,CompactBuffer([1980,3.1,6.88,45%], [1980,3.1,6.88,48%]))
(1981,CompactBuffer([1981,3.15,6.8,45%], [1981,3.35,6.8,45%]))

The table schema for groupedFiltered:

(year String, 
nominal Double,
adjusted_for_inflation Double, 
average_provate_nonsupervisory_wage String)
like image 314
adbar Avatar asked Mar 11 '23 09:03

adbar


1 Answers

Spark is a distributed computing engine. Next to "what the code is doing" of classic single-node computing, with Spark we also need to consider "where the code is running"

Let's inspect a simplified version of the expression above:

val records: RDD[List[String]] = ??? //whatever data
var list:mutable.List[String] = List()
for {record <- records
     entry <- records } 
    { list += entry }

The scala for-comprehension makes this expression look like a natural local computation, but in reality the RDD operations are serialized and "shipped" to executors, where the inner operation will be executed locally. We can rewrite the above like this:

records.foreach{ record =>     //RDD.foreach => serializes closure and executes remotely
     record.foreach{entry =>   //record.foreach => local operation on the record collection
        list += entry          // this mutable list object is updated in each executor but never sent back to the driver. All updates are lost  
     }
}

Mutable objects are in general a no-go in distributed computing. Imagine that one executor adds a record and another one removes it, what's the correct result? Or that each executor comes to a different value, which is the right one?

To implement the operation above, we need to transform the data into our desired result.

I'd start by applying another best practice: Do not use null as return value. I also moved the row ops into the function. Lets rewrite the comparison operation with this in mind:

def compareFields(colName:String, row1:Row, row2:Row): Option[DiscrepancyData] = {
    val key = "year"
    val v1 = row1.getAs(colName).toString
    val v2 = row2.getAs(colName).toString
    if (v1 != v2){
        Some(DiscrepancyData(
            row1.getAs(key).toString, //fieldKey
            colName, //fieldName
            v1, //table1Value
            v2, //table2Value
            v2) //expectedValue
        )
    } else None
}

Now, we can rewrite the computation of discrepancies as a transformation of the initial table data:

val discrepancies = table.flatMap{case (str, row) =>
    compareCols.flatMap{col => compareFields(col, row.next, row.next) }   
}

We can also use the for-comprehension notation, now that we understand where things are running:

val discrepancies = for {
    (str,row) <- table
    col <- compareCols
    dis <- compareFields(col, row.next, row.next)
} yield dis

Note that discrepancies is of type RDD[Discrepancy]. If we want to get the actual values to the driver we need to:

val materializedDiscrepancies = discrepancies.collect()
like image 139
maasg Avatar answered Mar 13 '23 06:03

maasg