Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Streaming: Micro batches Parallel Execution

We are receiving data in spark streaming from Kafka. Once execution has been started in Spark Streaming, it executes only one batch and the remaining batches starting queuing up in Kafka.

Our data is independent and can be processes in Parallel.

We tried multiple configurations with multiple executor, cores, back pressure and other configurations but nothing worked so far. There are a lot messages queued and only one micro batch has been processed at a time and rest are remained in queue.

We want to achieve parallelism at maximum, so that not any micro batch is queued, as we have enough resources available. So how we can reduce time by maximum utilization of resources.

enter image description here

// Start reading messages from Kafka and get DStream
final JavaInputDStream<ConsumerRecord<String, byte[]>> consumerStream = KafkaUtils.createDirectStream(
        getJavaStreamingContext(), LocationStrategies.PreferConsistent(),
        ConsumerStrategies.<String, byte[]>Subscribe("TOPIC_NAME",
                sparkServiceConf.getKafkaConsumeParams()));

ThreadContext.put(Constants.CommonLiterals.LOGGER_UID_VAR, CommonUtils.loggerUniqueId());

JavaDStream<byte[]> messagesStream = consumerStream.map(new Function<ConsumerRecord<String, byte[]>, byte[]>() {
    private static final long serialVersionUID = 1L;
    @Override
    public byte[] call(ConsumerRecord<String, byte[]> kafkaRecord) throws Exception {
        return kafkaRecord.value();
    }
});

    // Decode each binary message and generate JSON array
        JavaDStream<String> decodedStream = messagesStream.map(new Function<byte[], String>() {
            private static final long serialVersionUID = 1L;

            @Override
            public String call(byte[] asn1Data) throws Exception {
                if(asn1Data.length > 0) {
                    try (InputStream inputStream = new ByteArrayInputStream(asn1Data);
                            Writer writer = new StringWriter(); ) {


                        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(asn1Data);
                        GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream);

                        byte[] buffer = new byte[1024];
                        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();

                        int len;
                        while((len = gzipInputStream.read(buffer)) != -1) {
                            byteArrayOutputStream.write(buffer, 0, len);
                        }


                        return new String(byteArrayOutputStream.toByteArray());


                    } catch (Exception e) {
//                      
                        producer.flush();

                        throw e;
                    }
                } 

                return null;
            }
        });




// publish generated json gzip to kafka 
        cache.foreachRDD(new VoidFunction<JavaRDD<String>>() {
            private static final long serialVersionUID = 1L;

            @Override
            public void call(JavaRDD<String> jsonRdd4DF) throws Exception {
                //Dataset<Row> json = sparkSession.read().json(jsonRdd4DF);
                if(!jsonRdd4DF.isEmpty()) {
                    //JavaRDD<String> jsonRddDF = getJavaSparkContext().parallelize(jsonRdd4DF.collect());
                    Dataset<Row> json = sparkSession.read().json(jsonRdd4DF);   

                    SparkAIRMainJsonProcessor airMainJsonProcessor = new SparkAIRMainJsonProcessor();
                    airMainJsonProcessor.processAIRData(json, sparkSession);
                }

            }               
        });

        getJavaStreamingContext().start();
        getJavaStreamingContext().awaitTermination();
        getJavaStreamingContext().stop();

Technology that we are using:

HDFS  2.7.1.2.5 
YARN + MapReduce2  2.7.1.2.5 
ZooKeeper  3.4.6.2.5 
Ambari Infra  0.1.0 
Ambari Metrics  0.1.0 
Kafka  0.10.0.2.5 
Knox  0.9.0.2.5 
Ranger  0.6.0.2.5 
Ranger KMS  0.6.0.2.5 
SmartSense  1.3.0.0-1
Spark2  2.0.x.2.5 

Statistics that we got from difference experimentations:

Experiment 1

num_executors=6
executor_memory=8g
executor_cores=12

100 Files processing time 48 Minutes

Experiment 2

spark.default.parallelism=12
num_executors=6
executor_memory=8g
executor_cores=12

100 Files processing time 8 Minutes

Experiment 3

spark.default.parallelism=12
num_executors=6
executor_memory=8g
executor_cores=12

100 Files processing time 7 Minutes

Experiment 4

spark.default.parallelism=16
num_executors=6
executor_memory=8g
executor_cores=12

100 Files processing time 10 Minutes

Please advise, how we can process maximum so no queued.

like image 207
Imran Avatar asked Jul 13 '17 15:07

Imran


People also ask

Does Spark streaming support batch operations?

Apache Spark Streaming is a scalable fault-tolerant streaming processing system that natively supports both batch and streaming workloads.

Does Spark use parallel processing?

Spark uses Resilient Distributed Datasets (RDD) to perform parallel processing across a cluster or computer processors. It has easy-to-use APIs for operating on large datasets, in various programming languages. It also has APIs for transforming data, and familiar data frame APIs for manipulating semi-structured data.

Is Spark streaming micro batch?

Micro-batch loading technologies include Fluentd, Logstash, and Apache Spark Streaming.

What is batch interval in Spark streaming?

A batch interval tells spark that for what duration you have to fetch the data, like if its 1 minute, it would fetch the data for the last 1 minute. source: spark.apache.org. So the data would start pouring in a stream in batches, this continuous stream of data is called DStream.


2 Answers

I was facing same issue and I tried a few things in trying to resolve the issue and came to following findings:

First of all. Intuition says that one batch must be processed per executor but on the contrary, only one batch is processed at a time but jobs and tasks are processed in parallel.

Multiple batch processing can be achieved by using spark.streaming.concurrentjobs, but it's not documented and still needs a few fixes. One of problems is with saving Kafka offsets. Suppose we set this parameter to 4 and 4 batches are processed in parallel, what if 3rd batch finishes before 4th one, which Kafka offsets would be committed. This parameter is quite useful if batches are independent.

spark.default.parallelism because of its name is sometimes considered to make things parallel. But its true benefit is in distributed shuffle operations. Try different numbers and find an optimum number for this. You will get a considerable difference in processing time. It depends upon shuffle operations in your jobs. Setting it too high would decrease the performance. It's apparent from you experiments results too.

Another option is to use foreachPartitionAsync in place of foreach on RDD. But I think foreachPartition is better as foreachPartitionAsync would queue up the jobs whereas batches would appear to be processed but their jobs would still be in the queue or in processing. May be I didn't get its usage right. But it behaved same in my 3 services.

FAIR spark.scheduler.mode must be used for jobs with lots of tasks as round-robin assignment of tasks to jobs, gives opportunity to smaller tasks to start receiving resources while bigger tasks are processing.

Try to tune your batch duration+input size and always keep it below processing duration otherwise you're gonna see a long backlog of batches.

These are my findings and suggestions, however, there are so many configurations and methods to do streaming and often one set of operation doesn't work for others. Spark Streaming is all about learning, putting your experience and anticipation together to get to a set of optimum configuration.

Hope it helps. It would be a great relief if someone could tell specifically how we can legitimately process batches in parallel.

like image 61
Shahzad Avatar answered Oct 12 '22 13:10

Shahzad


We want to achieve parallelism at maximum, so that not any micro batch is queued

That's the thing about stream processing: you process the data in the order it was received. If you process your data at the rate slower than it arrives it will be queued. Also, don't expect that processing of one record will suddenly be parallelized across multiple nodes.

From your screenshot, it seems your batch time is 10 seconds and your producer published 100 records over 90 seconds.

It took 36s to process 2 records and 70s to process 17 records. Clearly, there is some per-batch overhead. If this dependency is linear, it would take only 4:18 to process all 100 records in a single mini-batch thus beating your record holder.

Since your code is not complete, it's hard to tell what exactly takes so much time. Transformations in the code look fine but probably the action (or subsequent transformations) are the real bottlenecks. Also, what's with producer.flush() which wasn't mentioned anywhere in your code?

like image 33
nonsleepr Avatar answered Oct 12 '22 15:10

nonsleepr