Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Spark with custom InputFormat for HadoopRDD

I am currently working on Apache Spark. I have implemented a Custom InputFormat for Apache Hadoop that reads key-value records through TCP Sockets. I wanted to port this code to Apache Spark and use it with the hadoopRDD() function. My Apache Spark code is as follows:

public final class SparkParallelDataLoad {

    public static void main(String[] args) {
        int iterations = 100;
        String dbNodesLocations = "";
        if(args.length < 3) {
            System.err.printf("Usage ParallelLoad <coordinator-IP> <coordinator-port> <numberOfSplits>\n");
            System.exit(1);
        }
        JobConf jobConf = new JobConf();
        jobConf.set(CustomConf.confCoordinatorIP, args[0]);
        jobConf.set(CustomConf.confCoordinatorPort, args[1]);
        jobConf.set(CustomConf.confDBNodesLocations, dbNodesLocations);

        int numOfSplits = Integer.parseInt(args[2]);

        CustomInputFormat.setCoordinatorIp(args[0]);
        CustomInputFormat.setCoordinatorPort(Integer.parseInt(args[1]));

        SparkConf sparkConf = new SparkConf().setAppName("SparkParallelDataLoad");

        JavaSparkContext sc = new JavaSparkContext(sparkConf);

        JavaPairRDD<LongWritable, Text> records = sc.hadoopRDD(jobConf, 
                CustomInputFormat.class, LongWritable.class, Text.class, 
                numOfSplits);

        JavaRDD<LabeledPoint> points = records.map(new Function<Tuple2<LongWritable, Text>, LabeledPoint>() {

            private final Log log = LogFactory.getLog(Function.class);
            /**
             * 
             */
            private static final long serialVersionUID = -1771348263117622186L;

            private final Pattern SPACE = Pattern.compile(" ");
            @Override
            public LabeledPoint call(Tuple2<LongWritable, Text> tuple)
                    throws Exception {
                if(tuple == null || tuple._1() == null || tuple._2() == null)
                    return null;
                double y = Double.parseDouble(Long.toString(tuple._1.get()));
                String[] tok = SPACE.split(tuple._2.toString());
                double[] x = new double[tok.length];
                for (int i = 0; i < tok.length; ++i) {
                    if(tok[i].isEmpty() == false)
                        x[i] = Double.parseDouble(tok[i]);
                }
                return new LabeledPoint(y, Vectors.dense(x));
            }

        });

        System.out.println("Number of records: " + points.count());
        LinearRegressionModel model = LinearRegressionWithSGD.train(points.rdd(), iterations);
        System.out.println("Model weights: " + model.weights());

        sc.stop();
    }
}

In my project I also have to decide which Spark Worker is going to connect to which Data Source (something like a "matchmake" process with a 1:1 relation). Therefore, I create a number of InputSplits equal to the number of data sources so that my data are sent in parallel to the SparkContext. My questions are the following:

  1. Does the result of method InpuSplit.getLength() affect how many records a RecordReader returns? In detail, I have seen in my test runs that a Job ends after returning only one record, only because I have a value of 0 returned from the CustomInputSplit.getLength() function.

  2. In the Apache Spark context, is the number of workers equal to the number of the InputSplits produced from my InputFormat at least for the execution of the records.map() function call?

The answer to question 2 above is really important for my project.

Thank you, Nick

like image 393
nick.katsip Avatar asked Jul 15 '14 18:07

nick.katsip


1 Answers

Yes. Spark's sc.hadoopRDD will create an RDD with as many partitions as reported by InputFormat.getSplits.

The last argument to hadoopRDD called minPartitions (numOfSplits in your code) will be used as the hint to InputFormat.getSplits. But the number returned by getSplits will be respected no matter if it is greater or smaller.

See the code at https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L168

like image 75
Daniel Darabos Avatar answered Sep 21 '22 02:09

Daniel Darabos