I am currently working on Apache Spark. I have implemented a Custom InputFormat
for Apache Hadoop that reads key-value records through TCP Sockets. I wanted to port this code to Apache Spark and use it with the hadoopRDD()
function. My Apache Spark code is as follows:
public final class SparkParallelDataLoad {
public static void main(String[] args) {
int iterations = 100;
String dbNodesLocations = "";
if(args.length < 3) {
System.err.printf("Usage ParallelLoad <coordinator-IP> <coordinator-port> <numberOfSplits>\n");
System.exit(1);
}
JobConf jobConf = new JobConf();
jobConf.set(CustomConf.confCoordinatorIP, args[0]);
jobConf.set(CustomConf.confCoordinatorPort, args[1]);
jobConf.set(CustomConf.confDBNodesLocations, dbNodesLocations);
int numOfSplits = Integer.parseInt(args[2]);
CustomInputFormat.setCoordinatorIp(args[0]);
CustomInputFormat.setCoordinatorPort(Integer.parseInt(args[1]));
SparkConf sparkConf = new SparkConf().setAppName("SparkParallelDataLoad");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaPairRDD<LongWritable, Text> records = sc.hadoopRDD(jobConf,
CustomInputFormat.class, LongWritable.class, Text.class,
numOfSplits);
JavaRDD<LabeledPoint> points = records.map(new Function<Tuple2<LongWritable, Text>, LabeledPoint>() {
private final Log log = LogFactory.getLog(Function.class);
/**
*
*/
private static final long serialVersionUID = -1771348263117622186L;
private final Pattern SPACE = Pattern.compile(" ");
@Override
public LabeledPoint call(Tuple2<LongWritable, Text> tuple)
throws Exception {
if(tuple == null || tuple._1() == null || tuple._2() == null)
return null;
double y = Double.parseDouble(Long.toString(tuple._1.get()));
String[] tok = SPACE.split(tuple._2.toString());
double[] x = new double[tok.length];
for (int i = 0; i < tok.length; ++i) {
if(tok[i].isEmpty() == false)
x[i] = Double.parseDouble(tok[i]);
}
return new LabeledPoint(y, Vectors.dense(x));
}
});
System.out.println("Number of records: " + points.count());
LinearRegressionModel model = LinearRegressionWithSGD.train(points.rdd(), iterations);
System.out.println("Model weights: " + model.weights());
sc.stop();
}
}
In my project I also have to decide which Spark Worker is going to connect to which Data Source (something like a "matchmake" process with a 1:1 relation). Therefore, I create a number of InputSplit
s equal to the number of data sources so that my data are sent in parallel to the SparkContext
. My questions are the following:
Does the result of method InpuSplit.getLength()
affect how many records a RecordReader
returns? In detail, I have seen in my test runs that a Job ends after returning only one record, only because I have a value of 0 returned from the CustomInputSplit.getLength()
function.
In the Apache Spark context, is the number of workers equal to the number of the InputSplits
produced from my InputFormat
at least for the execution of the records.map()
function call?
The answer to question 2 above is really important for my project.
Thank you, Nick
Yes. Spark's sc.hadoopRDD
will create an RDD with as many partitions as reported by InputFormat.getSplits
.
The last argument to hadoopRDD
called minPartitions
(numOfSplits
in your code) will be used as the hint to InputFormat.getSplits
. But the number returned by getSplits
will be respected no matter if it is greater or smaller.
See the code at https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L168
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With