I am running a spark application that reads data from a few hive tables(IP addresses) and compares each element(IP address) in a dataset with all other elements(IP addresses) from the other datasets. The end result would be something like:
+---------------+--------+---------------+---------------+---------+----------+--------+----------+
| ip_address|dataset1|dataset2 |dataset3 |dataset4 |dataset5 |dataset6| date|
+---------------+--------+---------------+---------------+---------+----------+--------+----------+
| xx.xx.xx.xx.xx| 1 | 1| 0| 0| 0| 0 |2017-11-06|
| xx.xx.xx.xx.xx| 0 | 0| 1| 0| 0| 1 |2017-11-06|
| xx.xx.xx.xx.xx| 1 | 0| 0| 0| 0| 1 |2017-11-06|
| xx.xx.xx.xx.xx| 0 | 0| 1| 0| 0| 1 |2017-11-06|
| xx.xx.xx.xx.xx| 1 | 1| 0| 1| 0| 0 |2017-11-06|
---------------------------------------------------------------------------------------------------
For doing the comparison, I am converting the dataframes
resulting from the hiveContext.sql("query")
statement into Fastutil
objects. Like this:
val df= hiveContext.sql("query")
val dfBuffer = new it.unimi.dsi.fastutil.objects.ObjectArrayList[String](df.map(r => r(0).toString).collect())
Then, I am using an iterator
to iterate over each collection and write the rows to a file using FileWriter
.
val dfIterator = dfBuffer.iterator()
while (dfIterator.hasNext){
val p = dfIterator.next().toString
//logic
}
I am running the application with --num-executors 20 --executor-memory 16g --executor-cores 5 --driver-memory 20g
The process runs for about 18-19 hours in total for about 4-5 million records with one to one comparisons on a daily basis.
However, when I checked the Application Master UI, I noticed that no activity takes place after the initial conversion of dataframes
to fastutil collection objects
is done (this takes only a few minutes after the job is launched). I see the count
and collect
statements used in the code producing new jobs till the conversion is done. After that, no new jobs are launched when the comparison is running.
What does this imply? Does it mean that the distributed processing is not happening at all?
I understand that collection objects are not treated as RDDs, could
this be the reason for this?
How is spark executing my program without using the resources assigned?
Any help would be appreciated, Thank you!
After the line:
val dfBuffer = new it.unimi.dsi.fastutil.objects.ObjectArrayList[String](df.map(r => r(0).toString).collect())
esp. that part of the above line:
df.map(r => r(0).toString).collect()
which collect
is the very main thing to notice, no Spark jobs are ever performed on dfBuffer
(which is a regular local one JVM data structure).
Does it mean that the distributed processing is not happening at all?
Correct. collect
brings all the data on a single JVM where the driver runs (and is exactly the reason why you should not be doing it unless...you know what you are doing and what problems it may cause).
I think the above answers all the other questions.
A possible solution to your problem of comparing two datasets (in Spark and a distributed fashion) would be to join
a dataset with the reference dataset and count
to compare whether the number of records didn't change.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With