I am using Spark 1.6.1
I have a DataFrame that I need to iterate through and write each row to Kafka. As of right now I'm doing something like this:
Producer<String><String> message;
for(Row x: my_df.collect()){
kafka_message = new Producer<String><String>(topic, String.valueOf(x))
my_kafka_producer.send(kafka_message);
}
The problem here is that a collect sends the data to the driver to then push to kafka. Given that I have roughly 250 executors, my 1 driver cannot handle the workload efficiently. So, I want to know how I can iterate through a dataframe on my executors instead. This will need to avoid doing the collect(). I found an article that roughly explains how to do it, but unfortunately their link to their GitHub is actually expired so I can't find how to implement it.
Article for reference: https://pythagoreanscript.wordpress.com/2015/05/28/iterate-through-a-spark-dataframe-using-its-partitions-in-java/comment-page-1/
In Java, you can try something like below. Extend the AbstractFunction1
import scala.runtime.AbstractFunction1;
abstract class MyFunction1<T,R> extends AbstractFunction1<T, R> implements Serializable {
}
Now call foreachPartition for your Dataframe like below.
import scala.collection.Iterator;
import scala.runtime.BoxedUnit;
df.foreachPartition(new MyFunction1<Iterator<Row>,BoxedUnit>(){
@Override
public BoxedUnit apply(Iterator<Row> rows) {
while(rows.hasNext()){
//get the Row
Row row = rows.next();
}
return BoxedUnit.UNIT;
}
});
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With