Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Testing Storm Bolts and Spouts

This is a general question regarding Unit Testing Bolts and Spouts in a Storm Topology written in Java.

What is the recommended practice and guideline for unit-testing (JUnit?) Bolts and Spouts?

For instance, I could write a JUnit test for a Bolt, but without fully understanding the framework (like the lifecycle of a Bolt) and the Serialization implications, easily make the mistake of Constructor-based creation of non-serializable member variables. In JUnit, this test would pass, but in a topology, it wouldn't work. I fully imagine there are many test points one needs to consider (such as this example with Serialization & lifecycle).

Therefore, is it recommended that if you use JUnit based unit tests, you run a small mock topology (LocalMode?) and test the implied contract for the Bolt (or Spout) under that Topology? Or, is it OK to use JUnit, but the implication being that we have to simulate the lifecycle of a Bolt (creating it, calling prepare(), mocking a Config, etc) carefully? In this case, what are some general test points for the class under test (Bolt/Spout) to consider?

What have other developers done, with respect to creating proper unit tests?

I noticed there is a Topology testing API (See: https://github.com/xumingming/storm-lib/blob/master/src/jvm/storm/TestingApiDemo.java). Is it better to use some of that API, and stand up "Test Topologies" for each individual Bolt & Spout (and verifying the implicit contract that the Bolt has to provide for, eg - it's Declared outputs)?

Thanks

like image 256
Jack Avatar asked May 14 '13 17:05

Jack


People also ask

What are spouts and bolts in Storm?

Spout emits the data to one or more bolts. Bolt represents a node in the topology having the smallest processing logic and the output of a bolt can be emitted into another bolt as input. Storm keeps the topology always running, until you kill the topology.

Which of the following method is used for performing transformation in Bolt Apache Storm?

The main method in bolts is the execute method which takes in as input a new tuple. Bolts emit new tuples using the OutputCollector object.

What is Storm topology?

The Storm topology is basically a Thrift structure. TopologyBuilder class provides simple and easy methods to create complex topologies. The TopologyBuilder class has methods to set spout (setSpout) and to set bolt (setBolt). Finally, TopologyBuilder has createTopology to create topology.

What is tuple in Storm?

The tuple is the main data structure in Storm. A tuple is a named list of values, where each value can be any type. Tuples are dynamically typed – the types of the fields do not need to be declared. Tuples have helper methods like getInteger and getString to get field values without having to cast the result.


2 Answers

Since version 0.8.1 Storm's unit testing facilities have been exposed via Java:

  • http://storm-project.net/2012/09/06/storm081-released.html

For an example how to use this API have a look here:

  • https://github.com/xumingming/storm-lib/blob/master/src/jvm/storm/TestingApiDemo.java
like image 185
asmaier Avatar answered Sep 19 '22 13:09

asmaier


Our approach is to use constructor-injection of a serializable factory into the spout/bolt. The spout/bolt then consults the factory in its open/prepare method. The factory's single responsibility is to encapsulate obtaining the spout/bolt's dependencies in a serializable fashion. This design allows our unit tests to inject fake/test/mock factories which, when consulted, return mock services. In this way we can narrowly unit test the spout/bolts using mocks e.g. Mockito.

Below is a generic example of a bolt and a test for it. I have omitted the implementation of the factory UserNotificationFactory because it depends on your application. You might use service locators to obtain the services, serialized configuration, HDFS-accessible configuration, or really any way at all to get the correct services, so long as the factory can do it after a serde cycle. You should cover serialization of that class.

Bolt

public class NotifyUserBolt extends BaseBasicBolt {   public static final String NAME = "NotifyUser";   private static final String USER_ID_FIELD_NAME = "userId";    private final UserNotifierFactory factory;   transient private UserNotifier notifier;    public NotifyUserBolt(UserNotifierFactory factory) {     checkNotNull(factory);      this.factory = factory;   }    @Override   public void prepare(Map stormConf, TopologyContext context) {     notifier = factory.createUserNotifier();   }    @Override   public void execute(Tuple input, BasicOutputCollector collector) {     // This check ensures that the time-dependency imposed by Storm has been observed     checkState(notifier != null, "Unable to execute because user notifier is unavailable.  Was this bolt successfully prepared?");      long userId = input.getLongByField(PreviousBolt.USER_ID_FIELD_NAME);      notifier.notifyUser(userId);      collector.emit(new Values(userId));   }    @Override   public void declareOutputFields(OutputFieldsDeclarer declarer) {     declarer.declare(new Fields(USER_ID_FIELD_NAME));   } } 

Test

public class NotifyUserBoltTest {    private NotifyUserBolt bolt;    @Mock   private TopologyContext topologyContext;    @Mock   private UserNotifier notifier;    // This test implementation allows us to get the mock to the unit-under-test.   private class TestFactory implements UserNotifierFactory {      private final UserNotifier notifier;      private TestFactory(UserNotifier notifier) {       this.notifier = notifier;     }      @Override     public UserNotifier createUserNotifier() {       return notifier;     }   }    @Before   public void before() {     MockitoAnnotations.initMocks(this);      // The factory will return our mock `notifier`     bolt = new NotifyUserBolt(new TestFactory(notifier));     // Now the bolt is holding on to our mock and is under our control!     bolt.prepare(new Config(), topologyContext);   }    @Test   public void testExecute() {     long userId = 24;     Tuple tuple = mock(Tuple.class);     when(tuple.getLongByField(PreviousBolt.USER_ID_FIELD_NAME)).thenReturn(userId);     BasicOutputCollector collector = mock(BasicOutputCollector.class);      bolt.execute(tuple, collector);      // Here we just verify a call on `notifier`, but we could have stubbed out behavior befor     //  the call to execute, too.     verify(notifier).notifyUser(userId);     verify(collector).emit(new Values(userId));   } } 
like image 38
Carl G Avatar answered Sep 22 '22 13:09

Carl G