Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Iterate Through Spark DataFrame in Java without Collect

I am using Spark 1.6.1

I have a DataFrame that I need to iterate through and write each row to Kafka. As of right now I'm doing something like this:

Producer<String><String> message;
for(Row x: my_df.collect()){
    kafka_message = new Producer<String><String>(topic, String.valueOf(x))
    my_kafka_producer.send(kafka_message);
}

The problem here is that a collect sends the data to the driver to then push to kafka. Given that I have roughly 250 executors, my 1 driver cannot handle the workload efficiently. So, I want to know how I can iterate through a dataframe on my executors instead. This will need to avoid doing the collect(). I found an article that roughly explains how to do it, but unfortunately their link to their GitHub is actually expired so I can't find how to implement it.

Article for reference: https://pythagoreanscript.wordpress.com/2015/05/28/iterate-through-a-spark-dataframe-using-its-partitions-in-java/comment-page-1/

like image 593
user3124181 Avatar asked Jun 19 '26 17:06

user3124181


1 Answers

In Java, you can try something like below. Extend the AbstractFunction1

import scala.runtime.AbstractFunction1;

abstract class MyFunction1<T,R> extends AbstractFunction1<T, R> implements Serializable {
}

Now call foreachPartition for your Dataframe like below.

import scala.collection.Iterator;
import scala.runtime.BoxedUnit;

df.foreachPartition(new MyFunction1<Iterator<Row>,BoxedUnit>(){
        @Override
        public BoxedUnit apply(Iterator<Row> rows) {
            while(rows.hasNext()){
                //get the Row
                Row row = rows.next();
            }
            return BoxedUnit.UNIT;
        }
    });
like image 126
abaghel Avatar answered Jun 22 '26 07:06

abaghel



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!