Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Developing a spark streaming application

so the problem i'm trying to tackle is the following:

  • I need a data source that emits messages at a certain frequency
  • There are N neural nets that need to process each message individually
  • The outputs from all neural nets are aggregated and only when all N outputs for each message are collected, should a message be declared fully processed
  • At the end i should measure the time it took for a message to be fully processed (time between when it was emitted and when all N neural net outputs from that message have been collected)

I'm curious as to how one would approach such a task using spark streaming.

My current implementation uses 3 types of components: a custom receiver and two classes that implement Function, one for the neural nets, one for the end aggregator.

In broad strokes, my application is built as follows:

JavaReceiverInputDStream<...> rndLists = jssc.receiverStream(new JavaRandomReceiver(...));

Function<JavaRDD<...>, Void> aggregator = new JavaSyncBarrier(numberOfNets);

for(int i = 0; i < numberOfNets; i++){
    rndLists.map(new NeuralNetMapper(neuralNetConfig)).foreachRDD(aggregator);
}

The main problem i'm having with this, though, is that it runs faster in local mode than when submitted to a 4-node cluster.

Is my implementation wrong to begin with or is something else happening here ?

There's also a full post here http://apache-spark-user-list.1001560.n3.nabble.com/Developing-a-spark-streaming-application-td12893.html with more details regarding the implementation of each of the three components mentioned previously.

like image 667
andrei Avatar asked Mar 19 '26 23:03

andrei


1 Answers

It seems there might be a lot of repetitive instantiation and serialization of objects. The later might be hitting your performance in a cluster.

You should try instantiating your neural networks only once. You will have to ensure that they are serializable. You should use flatMap instead of multiple maps + union. Something along these lines:

// Initialize neural net first
List<NeuralNetMapper> neuralNetMappers = new ArrayList<>(numberOfNets);
for(int i = 0; i < numberOfNets; i++){
    neuralNetMappers.add(new NeuralNetMapper(neuralNetConfig));
}

// Then create a DStream applying all of them
JavaDStream<Result> neuralNetResults = rndLists.flatMap(new FlatMapFunction<Item, Result>() {
    @Override
    public Iterable<Result> call(Item item) {
        List<Result> results = new ArrayList<>(numberOfNets);
        for (int i = 0; i < numberOfNets; i++) {
            results.add(neuralNetMappers.get(i).doYourNeuralNetStuff(item));
        }
        return results;
    }
});

// The aggregation stuff
neuralNetResults.foreachRDD(aggregator);

If you can afford to initialize the networks this way, you can save quite a lot of time. Also, the union stuff you included in your linked posts seems unnecessary and is penalizing your performance: a flatMap will do.

Finally, in order to further tune your performance in the cluster, you can use the Kryo serializer.

like image 121
smola Avatar answered Mar 22 '26 16:03

smola