I am new to Trident in Storm. I am breaking my head over TridentState. As far as my understanding trident maintains the state (i.e metadata) for each batch (whether all the tuples in a batch are completely processed by maintaining a Transaction id in the Database) and i am not entirely sure what the following statement does
TridentState urlToTweeters =
topology.newStaticState(getUrlToTweetersState());
Can anyone explain what actually happens when we define the above code?
I hope it's never too late to answer, at least somebody else might find my answer useful :)
So, topology.newStaticState()
is a Trident's abstraction of a queryable data storage. A parameter for newStaticState()
should be an implementation - based on the method's contract - of storm.trident.state.StateFactory
. The factory, in turn, should implement makeState()
method returning an instance of storm.trident.state.State
. However if you plan to query your state, you should return an istance of storm.trident.state.map.ReadOnlyMapState
instead, since plain storm.trident.state.State
doesn't have methods for querying an actual data source (you'll actually get a class cast exception if you try to use anything but ReadOnlyMapState
).
So, let's give it a try!
A dummy state implementation:
public static class ExampleStaticState implements ReadOnlyMapState<String> {
private final Map<String, String> dataSourceStub;
public ExampleStaticState() {
dataSourceStub = new HashMap<>();
dataSourceStub.put("tuple-00", "Trident");
dataSourceStub.put("tuple-01", "definitely");
dataSourceStub.put("tuple-02", "lacks");
dataSourceStub.put("tuple-03", "documentation");
}
@Override
public List<String> multiGet(List<List<Object>> keys) {
System.out.println("DEBUG: MultiGet, keys is " + keys);
List<String> result = new ArrayList<>();
for (List<Object> inputTuple : keys) {
result.add(dataSourceStub.get(inputTuple.get(0)));
}
return result;
}
@Override
public void beginCommit(Long txid) {
// never gets executed...
System.out.println("DEBUG: Begin commit, txid=" + txid);
}
@Override
public void commit(Long txid) {
// never gets executed...
System.out.println("DEBUG: Commit, txid=" + txid);
}
}
A factory:
public static class ExampleStaticStateFactory implements StateFactory {
@Override
public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
return new ExampleStaticState();
}
}
A simple psvm
(aka public static void main
):
public static void main(String... args) {
TridentTopology tridentTopology = new TridentTopology();
FeederBatchSpout spout = new FeederBatchSpout(Arrays.asList(new String[]{
"foo"
}));
TridentState state = tridentTopology.newStaticState(new ExampleStaticStateFactory());
tridentTopology
.newStream("spout", spout)
.stateQuery(state, new Fields("foo"), new MapGet(), new Fields("bar"))
.each(new Fields("foo", "bar"), new Debug())
;
Config conf = new Config();
conf.setNumWorkers(6);
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("tridentTopology", conf, tridentTopology.build());
spout.feed(Arrays.asList(new Values[]{
new Values("tuple-00"),
new Values("tuple-01"),
new Values("tuple-02"),
new Values("tuple-03")
}));
localCluster.shutdown();
}
And, finally, the output:
DEBUG: MultiGet, keys is [[tuple-00], [tuple-01], [tuple-02], [tuple-03]]
DEBUG: [tuple-00, Trident]
DEBUG: [tuple-01, definitely]
DEBUG: [tuple-02, lacks]
DEBUG: [tuple-03, documentation]
You see, stateQuery() gets values from an input batch and maps them to the values found in the 'data storage'.
Diving a bit deeper, you can take a look at the source of MapGet
class (the guy whose instance is used for querying inside the topology) and find the following there:
public class MapGet extends BaseQueryFunction<ReadOnlyMapState, Object> {
@Override
public List<Object> batchRetrieve(ReadOnlyMapState map, List<TridentTuple> keys) {
return map.multiGet((List) keys);
}
@Override
public void execute(TridentTuple tuple, Object result, TridentCollector collector) {
collector.emit(new Values(result));
}
}
So under the hood it simply calls multiGet()
method of your ReadOnlyMapState
implementation and then emits values found in the data storage, adding them to the already existing tuple. You can (although it might not be the best thing to do) create your own implementation of BaseQueryFunction<ReadOnlyMapState, Object>
doing something more complicated.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With