This is a general question regarding Unit Testing Bolts and Spouts in a Storm Topology written in Java.
What is the recommended practice and guideline for unit-testing (JUnit?) Bolts and Spouts?
For instance, I could write a JUnit test for a Bolt
, but without fully understanding the framework (like the lifecycle of a Bolt
) and the Serialization implications, easily make the mistake of Constructor-based creation of non-serializable member variables. In JUnit, this test would pass, but in a topology, it wouldn't work. I fully imagine there are many test points one needs to consider (such as this example with Serialization & lifecycle).
Therefore, is it recommended that if you use JUnit based unit tests, you run a small mock topology (LocalMode
?) and test the implied contract for the Bolt
(or Spout
) under that Topology? Or, is it OK to use JUnit, but the implication being that we have to simulate the lifecycle of a Bolt (creating it, calling prepare()
, mocking a Config
, etc) carefully? In this case, what are some general test points for the class under test (Bolt/Spout) to consider?
What have other developers done, with respect to creating proper unit tests?
I noticed there is a Topology testing API (See: https://github.com/xumingming/storm-lib/blob/master/src/jvm/storm/TestingApiDemo.java). Is it better to use some of that API, and stand up "Test Topologies" for each individual Bolt
& Spout
(and verifying the implicit contract that the Bolt has to provide for, eg - it's Declared outputs)?
Thanks
Spout emits the data to one or more bolts. Bolt represents a node in the topology having the smallest processing logic and the output of a bolt can be emitted into another bolt as input. Storm keeps the topology always running, until you kill the topology.
The main method in bolts is the execute method which takes in as input a new tuple. Bolts emit new tuples using the OutputCollector object.
The Storm topology is basically a Thrift structure. TopologyBuilder class provides simple and easy methods to create complex topologies. The TopologyBuilder class has methods to set spout (setSpout) and to set bolt (setBolt). Finally, TopologyBuilder has createTopology to create topology.
The tuple is the main data structure in Storm. A tuple is a named list of values, where each value can be any type. Tuples are dynamically typed – the types of the fields do not need to be declared. Tuples have helper methods like getInteger and getString to get field values without having to cast the result.
Since version 0.8.1 Storm's unit testing facilities have been exposed via Java:
For an example how to use this API have a look here:
Our approach is to use constructor-injection of a serializable factory into the spout/bolt. The spout/bolt then consults the factory in its open/prepare method. The factory's single responsibility is to encapsulate obtaining the spout/bolt's dependencies in a serializable fashion. This design allows our unit tests to inject fake/test/mock factories which, when consulted, return mock services. In this way we can narrowly unit test the spout/bolts using mocks e.g. Mockito.
Below is a generic example of a bolt and a test for it. I have omitted the implementation of the factory UserNotificationFactory
because it depends on your application. You might use service locators to obtain the services, serialized configuration, HDFS-accessible configuration, or really any way at all to get the correct services, so long as the factory can do it after a serde cycle. You should cover serialization of that class.
Bolt
public class NotifyUserBolt extends BaseBasicBolt { public static final String NAME = "NotifyUser"; private static final String USER_ID_FIELD_NAME = "userId"; private final UserNotifierFactory factory; transient private UserNotifier notifier; public NotifyUserBolt(UserNotifierFactory factory) { checkNotNull(factory); this.factory = factory; } @Override public void prepare(Map stormConf, TopologyContext context) { notifier = factory.createUserNotifier(); } @Override public void execute(Tuple input, BasicOutputCollector collector) { // This check ensures that the time-dependency imposed by Storm has been observed checkState(notifier != null, "Unable to execute because user notifier is unavailable. Was this bolt successfully prepared?"); long userId = input.getLongByField(PreviousBolt.USER_ID_FIELD_NAME); notifier.notifyUser(userId); collector.emit(new Values(userId)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(USER_ID_FIELD_NAME)); } }
Test
public class NotifyUserBoltTest { private NotifyUserBolt bolt; @Mock private TopologyContext topologyContext; @Mock private UserNotifier notifier; // This test implementation allows us to get the mock to the unit-under-test. private class TestFactory implements UserNotifierFactory { private final UserNotifier notifier; private TestFactory(UserNotifier notifier) { this.notifier = notifier; } @Override public UserNotifier createUserNotifier() { return notifier; } } @Before public void before() { MockitoAnnotations.initMocks(this); // The factory will return our mock `notifier` bolt = new NotifyUserBolt(new TestFactory(notifier)); // Now the bolt is holding on to our mock and is under our control! bolt.prepare(new Config(), topologyContext); } @Test public void testExecute() { long userId = 24; Tuple tuple = mock(Tuple.class); when(tuple.getLongByField(PreviousBolt.USER_ID_FIELD_NAME)).thenReturn(userId); BasicOutputCollector collector = mock(BasicOutputCollector.class); bolt.execute(tuple, collector); // Here we just verify a call on `notifier`, but we could have stubbed out behavior befor // the call to execute, too. verify(notifier).notifyUser(userId); verify(collector).emit(new Values(userId)); } }
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With