Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to map tuples with persistent state in Trident?

I'm learning Trident framework. There are several methods on Trident Streams for aggregation tuples within a batch, including this one which allows to preform a stateful mapping of the tuples using Aggregator interface. But unfortunately a built-in counterpart to additionally persist the map state, like other 9 overloadings of persistentAggregate(), only with Aggregator as an argument, is not present.

Thus how can I implement the desired functionality by combining lower-level Trident and Storm abstractions and tools? It is pretty hard to explore the API because there is almost no Javadoc documentation.

In other words, persistentAggregate() methods allow to end stream processing with updating some persistent state:

stream of tuples ---> persistent state

I want to update persistent state and emit different tuples by the way:

stream of tuples ------> stream of different tuples
                  with
            persistent state

Stream.aggregate(Fields, Aggregator, Fields) doesn't provide fault-tolerance:

stream of tuples ------> stream of different tuples
                  with
          simple in-memory state
like image 574
leventov Avatar asked Nov 08 '13 12:11

leventov


1 Answers

You can create a new stream from a state using the method TridentState#newValuesStream(). This will allow you to retrieve a stream of the aggregated values.

For illustrative purpose, we can improve the example in Trident documentation by adding this method and a Debug Filter :

FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
    new Values("the cow jumped over the moon"),
    new Values("the man went to the store and bought some candy"),
    new Values("four score and seven years ago"),
    new Values("how many apples can you eat"));
spout.setCycle(true);

TridentTopology topology = new TridentTopology();        
topology.newStream("spout1", spout)
    .each(new Fields("sentence"), new Split(), new Fields("word"))
    .groupBy(new Fields("word"))
    .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))                
    .newValuesStream().each(new Fields("count"), new Debug());

Running this topology will output (to the console) the aggregated counts.

Hope it helps

like image 57
Pierre Merienne Avatar answered Oct 21 '22 09:10

Pierre Merienne