Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What is Trident State in Storm?

I am new to Trident in Storm. I am breaking my head over TridentState. As far as my understanding trident maintains the state (i.e metadata) for each batch (whether all the tuples in a batch are completely processed by maintaining a Transaction id in the Database) and i am not entirely sure what the following statement does

TridentState urlToTweeters =
   topology.newStaticState(getUrlToTweetersState());

Can anyone explain what actually happens when we define the above code?

like image 671
Ezhil Avatar asked Jul 13 '13 07:07

Ezhil


Video Answer


1 Answers

I hope it's never too late to answer, at least somebody else might find my answer useful :)

So, topology.newStaticState() is a Trident's abstraction of a queryable data storage. A parameter for newStaticState() should be an implementation - based on the method's contract - of storm.trident.state.StateFactory. The factory, in turn, should implement makeState() method returning an instance of storm.trident.state.State. However if you plan to query your state, you should return an istance of storm.trident.state.map.ReadOnlyMapState instead, since plain storm.trident.state.State doesn't have methods for querying an actual data source (you'll actually get a class cast exception if you try to use anything but ReadOnlyMapState).

So, let's give it a try!

A dummy state implementation:

public static class ExampleStaticState implements ReadOnlyMapState<String> {

    private final Map<String, String> dataSourceStub;

    public ExampleStaticState() {
        dataSourceStub = new HashMap<>();
        dataSourceStub.put("tuple-00", "Trident");
        dataSourceStub.put("tuple-01", "definitely");
        dataSourceStub.put("tuple-02", "lacks");
        dataSourceStub.put("tuple-03", "documentation");
    }

    @Override
    public List<String> multiGet(List<List<Object>> keys) {

        System.out.println("DEBUG: MultiGet, keys is " + keys);

        List<String> result = new ArrayList<>();

        for (List<Object> inputTuple : keys) {
            result.add(dataSourceStub.get(inputTuple.get(0)));
        }

        return result;
    }

    @Override
    public void beginCommit(Long txid) {
        // never gets executed...
        System.out.println("DEBUG: Begin commit, txid=" + txid);
    }

    @Override
    public void commit(Long txid) {
        // never gets executed...
        System.out.println("DEBUG: Commit, txid=" + txid);
    }
}

A factory:

public static class ExampleStaticStateFactory implements StateFactory {
    @Override
    public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
        return new ExampleStaticState();
    }
}

A simple psvm (aka public static void main):

public static void main(String... args) {
    TridentTopology tridentTopology = new TridentTopology();
    FeederBatchSpout spout = new FeederBatchSpout(Arrays.asList(new String[]{
            "foo"
    }));
    TridentState state = tridentTopology.newStaticState(new ExampleStaticStateFactory());
    tridentTopology
            .newStream("spout", spout)
            .stateQuery(state, new Fields("foo"), new MapGet(), new Fields("bar"))
            .each(new Fields("foo", "bar"), new Debug())
            ;

    Config conf = new Config();
    conf.setNumWorkers(6);

    LocalCluster localCluster = new LocalCluster();
    localCluster.submitTopology("tridentTopology", conf, tridentTopology.build());

    spout.feed(Arrays.asList(new Values[]{
            new Values("tuple-00"),
            new Values("tuple-01"),
            new Values("tuple-02"),
            new Values("tuple-03")
    }));

    localCluster.shutdown();
}

And, finally, the output:

DEBUG: MultiGet, keys is [[tuple-00], [tuple-01], [tuple-02], [tuple-03]]
DEBUG: [tuple-00, Trident]
DEBUG: [tuple-01, definitely]
DEBUG: [tuple-02, lacks]
DEBUG: [tuple-03, documentation]

You see, stateQuery() gets values from an input batch and maps them to the values found in the 'data storage'.

Diving a bit deeper, you can take a look at the source of MapGet class (the guy whose instance is used for querying inside the topology) and find the following there:

public class MapGet extends BaseQueryFunction<ReadOnlyMapState, Object> {
    @Override
    public List<Object> batchRetrieve(ReadOnlyMapState map, List<TridentTuple> keys) {
        return map.multiGet((List) keys);
    }    

    @Override
    public void execute(TridentTuple tuple, Object result, TridentCollector collector) {
        collector.emit(new Values(result));
    }    
}

So under the hood it simply calls multiGet() method of your ReadOnlyMapState implementation and then emits values found in the data storage, adding them to the already existing tuple. You can (although it might not be the best thing to do) create your own implementation of BaseQueryFunction<ReadOnlyMapState, Object> doing something more complicated.

like image 58
bopcat Avatar answered Sep 20 '22 13:09

bopcat