Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Implementing custom Spark RDD in Java

I have a custom data source and I want to load the data into my Spark cluster to perform some computations. For this I see that I might need to implement a new RDD for my data source.

I am a complete Scala noob and I am hoping that I can implement the RDD in Java itself. I looked around the internet and could not find any resources. Any pointers?

My data is in S3 and is indexed in Dynamo. For example, If I want to load data given a time range, I will first need to query Dynamo for the S3 file keys for the corresponding time range and then load them in Spark. The files may not always have the same S3 path prefix so sc.testFile("s3://directory_path/") won't work.

I am looking for pointers on how to implement something analogous to HadoopRDD or JdbcRDD but in Java. Something similar to what they have done here: DynamoDBRDD. This one reads data from Dynamo, my custom RDD would query DynamoDB for the S3 file keys, and then load them from S3.

like image 717
Swaranga Sarma Avatar asked May 25 '15 22:05

Swaranga Sarma


1 Answers

You can extend RDD in Java and implement the getPartitions and compute methods.

Java can extend Scala classes with some limitations.

Example:

package com.openmarket.danyal;
// Other imports left out
import org.apache.spark.Dependency;
import org.apache.spark.Partition;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;

import scala.collection.AbstractIterator;
import scala.collection.Iterator;
import scala.collection.mutable.ArrayBuffer;
import scala.reflect.ClassManifestFactory$;
import scala.reflect.ClassTag;

public class AlphaTest {
    private static final ClassTag<String> STRING_TAG = ClassManifestFactory$.MODULE$.fromClass(String.class);

    public static void main(final String[] args) {
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("Learn ABCs");
        try(JavaSparkContext sc = new JavaSparkContext(conf)) {
            System.out.println(new AlphabetRDD(sc.sc()).toJavaRDD().collect());
        }
    }

    public static class AlphabetRDD extends RDD<String> {
        private static final long serialVersionUID = 1L;

        public AlphabetRDD(SparkContext sc) {
            super(sc, new ArrayBuffer<Dependency<?>>(), STRING_TAG);
        }

        @Override
        public Iterator<String> compute(Partition arg0, TaskContext arg1) {
            AlphabetRangePartition p = (AlphabetRangePartition)arg0;
            return new CharacterIterator(p.from, p.to);
        }

        @Override
        public Partition[] getPartitions() {
            return new Partition[] {new AlphabetRangePartition(1, 'A', 'M'), new AlphabetRangePartition(2, 'P', 'Z')};
        }

    }

    /**
     * A partition representing letters of the Alphabet between a range
     */
    public static class AlphabetRangePartition implements Partition {
        private static final long serialVersionUID = 1L;
        private int index;
        private char from;
        private char to;

        public AlphabetRangePartition(int index, char c, char d) {
            this.index = index;
            this.from = c;
            this.to = d;
        }

        @Override
        public int index() {
            return index;
        }

        @Override
        public boolean equals(Object obj) {
            if(!(obj instanceof AlphabetRangePartition)) {
                return false;
            }
            return ((AlphabetRangePartition)obj).index != index;
        }

        @Override
        public int hashCode() {
            return index();
        }
    }

    /**
     * Iterators over all characters between two characters
     */
    public static class CharacterIterator extends AbstractIterator<String> {
        private char next;
        private char last;

        public CharacterIterator(char from, char to) {
            next = from;
            this.last = to;
        }

        @Override
        public boolean hasNext() {
            return next <= last;
        }

        @Override
        public String next() {
            // Post increments next after returning it
            return Character.toString(next++);
        }
    }
}
like image 190
DanyalBurke Avatar answered Sep 18 '22 01:09

DanyalBurke