Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to compare two datasets?

I am running a spark application that reads data from a few hive tables(IP addresses) and compares each element(IP address) in a dataset with all other elements(IP addresses) from the other datasets. The end result would be something like:

+---------------+--------+---------------+---------------+---------+----------+--------+----------+
|     ip_address|dataset1|dataset2       |dataset3       |dataset4 |dataset5  |dataset6|      date|
+---------------+--------+---------------+---------------+---------+----------+--------+----------+
| xx.xx.xx.xx.xx|     1  |              1|              0|        0|         0|      0 |2017-11-06|
| xx.xx.xx.xx.xx|     0  |              0|              1|        0|         0|      1 |2017-11-06|
| xx.xx.xx.xx.xx|     1  |              0|              0|        0|         0|      1 |2017-11-06|
| xx.xx.xx.xx.xx|     0  |              0|              1|        0|         0|      1 |2017-11-06|
| xx.xx.xx.xx.xx|     1  |              1|              0|        1|         0|      0 |2017-11-06|
---------------------------------------------------------------------------------------------------

For doing the comparison, I am converting the dataframes resulting from the hiveContext.sql("query") statement into Fastutil objects. Like this:

val df= hiveContext.sql("query")
val dfBuffer = new it.unimi.dsi.fastutil.objects.ObjectArrayList[String](df.map(r => r(0).toString).collect())

Then, I am using an iterator to iterate over each collection and write the rows to a file using FileWriter.

val dfIterator = dfBuffer.iterator()
while (dfIterator.hasNext){
     val p = dfIterator.next().toString
     //logic
}

I am running the application with --num-executors 20 --executor-memory 16g --executor-cores 5 --driver-memory 20g

The process runs for about 18-19 hours in total for about 4-5 million records with one to one comparisons on a daily basis.

However, when I checked the Application Master UI, I noticed that no activity takes place after the initial conversion of dataframes to fastutil collection objects is done (this takes only a few minutes after the job is launched). I see the count and collect statements used in the code producing new jobs till the conversion is done. After that, no new jobs are launched when the comparison is running.

  • What does this imply? Does it mean that the distributed processing is not happening at all?

  • I understand that collection objects are not treated as RDDs, could
    this be the reason for this?

  • How is spark executing my program without using the resources assigned?

Any help would be appreciated, Thank you!

like image 706
Hemanth Avatar asked Mar 07 '18 02:03

Hemanth


1 Answers

After the line:

val dfBuffer = new it.unimi.dsi.fastutil.objects.ObjectArrayList[String](df.map(r => r(0).toString).collect())

esp. that part of the above line:

df.map(r => r(0).toString).collect()

which collect is the very main thing to notice, no Spark jobs are ever performed on dfBuffer (which is a regular local one JVM data structure).

Does it mean that the distributed processing is not happening at all?

Correct. collect brings all the data on a single JVM where the driver runs (and is exactly the reason why you should not be doing it unless...you know what you are doing and what problems it may cause).

I think the above answers all the other questions.


A possible solution to your problem of comparing two datasets (in Spark and a distributed fashion) would be to join a dataset with the reference dataset and count to compare whether the number of records didn't change.

like image 160
Jacek Laskowski Avatar answered Sep 18 '22 13:09

Jacek Laskowski