Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Batch Size in Spark Streaming

I am new to Spark and Spark Streaming. I am working on Twitter streaming data. My task involves dealing with each Tweet independently like counting the number of words in each tweet. From what I have read, each input batch forms on RDD in Spark Streaming. So if I give a batch interval of 2 seconds,then the new RDD contains all the tweets for two seconds and any transformation applied will apply to whole two sec data and there is no way to deal with individual tweets in that two seconds. Is my understanding correct? or else each tweet forms a new RDD? I am kind of confused...

like image 956
Naren Avatar asked Jun 28 '15 01:06

Naren


People also ask

What is batch size in Spark?

Batch size is product of 3 parameters. batchDuration : The time interval at which streaming data will be divided into batches (in Seconds). spark. streaming.

Is Spark batch or Streaming?

What is Spark Streaming? Apache Spark Streaming is a scalable fault-tolerant streaming processing system that natively supports both batch and streaming workloads.

Is Spark Streaming micro-batch?

Micro-batch loading technologies include Fluentd, Logstash, and Apache Spark Streaming.

What is a window duration size in Spark Streaming?

For example if you set batch interval 5 seconds - Spark Streaming will collect data for 5 seconds and then kick out calculation on RDD with that data. window size - it is interval of time in seconds for how much historical data shall be contained in RDD before processing.


1 Answers

In one batch you have a RDD containing all statuses that came in 2 seconds interval. Then you can process these statuses individually. Here is brief example:

 JavaDStream<Status> inputDStream = TwitterUtils.createStream(ctx, new OAuthAuthorization(builder.build()), filters);

      inputDStream.foreach(new Function2<JavaRDD<Status>,Time,Void>(){
            @Override
            public Void call(JavaRDD<Status> status, Time time) throws Exception {
                List<Status> statuses=status.collect();
                for(Status st:statuses){
                     System.out.println("STATUS:"+st.getText()+" user:"+st.getUser().getId());                      
                //Process and store status somewhere
                }
                return null;
            }});         
     ctx.start();
        ctx.awaitTermination();      
}

I hope I didn't misunderstand your question.

Zoran

like image 99
zoran jeremic Avatar answered Nov 01 '22 04:11

zoran jeremic