Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I make Spark Streaming count the words in a file in a unit test?

I've successfully built a very simple Spark Streaming application in Java that is based on the HdfsCount example in Scala.

When I submit this application to my local Spark, it waits for a file to be written to a given directory, and when I create that file it successfully prints the number of words. I terminate the application by pressing Ctrl+C.

Now I've tried to create a very basic unit test for this functionality, but in the test I was not able to print the same information, that is the number of words.

What am I missing?

Below is the unit test file, and after that I've also included the code snippet that shows the countWords method:

StarterAppTest.java

import com.google.common.io.Files;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;


import org.junit.*;

import java.io.*;

public class StarterAppTest {

  JavaStreamingContext ssc;
  File tempDir;

  @Before
  public void setUp() {
    ssc = new JavaStreamingContext("local", "test", new Duration(3000));
    tempDir = Files.createTempDir();
    tempDir.deleteOnExit();
  }

  @After
  public void tearDown() {
    ssc.stop();
    ssc = null;
  }

  @Test
  public void testInitialization() {
    Assert.assertNotNull(ssc.sc());
  }


  @Test
  public void testCountWords() {

    StarterApp starterApp = new StarterApp();

    try {
      JavaDStream<String> lines = ssc.textFileStream(tempDir.getAbsolutePath());
      JavaPairDStream<String, Integer> wordCounts = starterApp.countWords(lines);

      ssc.start();

      File tmpFile = new File(tempDir.getAbsolutePath(), "tmp.txt");
      PrintWriter writer = new PrintWriter(tmpFile, "UTF-8");
      writer.println("8-Dec-2014: Emre Emre Emre Ergin Ergin Ergin");
      writer.close();

      System.err.println("===== Word Counts =======");
      wordCounts.print();
      System.err.println("===== Word Counts =======");

    } catch (FileNotFoundException e) {
      e.printStackTrace();
    } catch (UnsupportedEncodingException e) {
      e.printStackTrace();
    }


    Assert.assertTrue(true);

  }

}

This test compiles and starts to run, Spark Streaming prints a lot of diagnostic messages on the console but the call to wordCounts.print() does not print anything, whereas in StarterApp.java itself, they do.

I've also tried adding ssc.awaitTermination(); after ssc.start() but nothing changed in that respect. After that I've also tried to create a new file manually in the directory that this Spark Streaming application was checking but this time it gave an error.

For completeness, below is the wordCounts method:

public JavaPairDStream<String, Integer> countWords(JavaDStream<String> lines) {
    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
      @Override
      public Iterable<String> call(String x) { return Lists.newArrayList(SPACE.split(x)); }
    });

    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
            new PairFunction<String, String, Integer>() {
              @Override
              public Tuple2<String, Integer> call(String s) { return new Tuple2<>(s, 1); }
            }).reduceByKey((i1, i2) -> i1 + i2);

    return wordCounts;
  }
like image 921
Emre Sevinç Avatar asked Nov 01 '22 13:11

Emre Sevinç


1 Answers

Few pointers:

  • Give at least 2 cores to SparkStreaming context. 1 for the Streaming and 1 for the Spark processing. "local" -> "local[2]"
  • Your streaming interval is of 3000ms, so somewhere in your program you need to wait -at least- that time to expect an output.
  • Spark Streaming needs some time for the setup of listeners. The file is being created immediately after ssc.start is issued. There's no warranty that the filesystem listener is already in place. I'd do some sleep(xx) after ssc.start

In Streaming, it's all about the right timing.

like image 90
maasg Avatar answered Nov 14 '22 03:11

maasg