Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to traverse/iterate a Dataset in Spark Java?

I am trying to traverse a Dataset to do some string similarity calculations like Jaro winkler or Cosine Similarity. I convert my Dataset to list of rows and then traverse with for statement which is not efficient spark way to do it. So I am looking forward for a better approach in Spark.

public class sample {

    public static void main(String[] args) {
        JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Example").setMaster("local[*]"));
        SQLContext sqlContext = new SQLContext(sc);
        SparkSession spark = SparkSession.builder().appName("JavaTokenizerExample").getOrCreate();

        List<Row> data = Arrays.asList(RowFactory.create("Mysore","Mysuru"),
                RowFactory.create("Name","FirstName"));
        StructType schema = new StructType(
                new StructField[] { new StructField("Word1", DataTypes.StringType, true, Metadata.empty()),
                        new StructField("Word2", DataTypes.StringType, true, Metadata.empty()) });

        Dataset<Row> oldDF = spark.createDataFrame(data, schema);
        oldDF.show();
        List<Row> rowslist = oldDF.collectAsList(); 
    }
}

I have found many JavaRDD examples which I am not clear. An Example for Dataset will help me a lot.

like image 279
Abhishek Vk Avatar asked Mar 13 '17 06:03

Abhishek Vk


People also ask

How do I iterate over dataset in spark?

In Spark, foreach() is an action operation that is available in RDD, DataFrame, and Dataset to iterate/loop over each element in the dataset, It is similar to for with advance concepts.

How foreach works in Spark?

foreach() operation is an action. It does not return any value. It executes input function on each element of an RDD. It executes the function on each item in RDD.


2 Answers

You can use org.apache.spark.api.java.function.ForeachFunction like below.

oldDF.foreach((ForeachFunction<Row>) row -> System.out.println(row));
like image 54
abaghel Avatar answered Nov 15 '22 23:11

abaghel


For old java jdks that don't support lambda expressions, you can use the following after importing:

import org.apache.spark.api.java.function.VoidFunction;

yourDataSet.toJavaRDD().foreach(new VoidFunction<Row>() {
        public void call(Row r) throws Exception {
            System.out.println(r.getAs("your column name here"));
        }
    });
like image 45
DigitalFox Avatar answered Nov 16 '22 00:11

DigitalFox