I'm implementing an IBackingMap for my Trident topology to store tuples to ElasticSearch (I know there are several implementations for Trident/ElasticSearch integration already existing at GitHub however I've decided to implement a custom one which suits my task better).
So my implementation is a classic one with a factory:
public class ElasticSearchBackingMap implements IBackingMap<OpaqueValue<BatchAggregationResult>> {
// omitting here some other cool stuff...
private final Client client;
public static StateFactory getFactoryFor(final String host, final int port, final String clusterName) {
return new StateFactory() {
@Override
public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
ElasticSearchBackingMap esbm = new ElasticSearchBackingMap(host, port, clusterName);
CachedMap cm = new CachedMap(esbm, LOCAL_CACHE_SIZE);
MapState ms = OpaqueMap.build(cm);
return new SnapshottableMap(ms, new Values(GLOBAL_KEY));
}
};
}
public ElasticSearchBackingMap(String host, int port, String clusterName) {
Settings settings = ImmutableSettings.settingsBuilder()
.put("cluster.name", clusterName).build();
// TODO add a possibility to close the client
client = new TransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress(host, port));
}
// the actual implementation is left out
}
You see it gets host/port/cluster name as input params and creates an ElasticSearch client as a member of the class BUT IT NEVER CLOSES THE CLIENT.
It is then used from within a topology in a pretty familiar way:
tridentTopology.newStream("spout", spout)
// ...some processing steps here...
.groupBy(aggregationFields)
.persistentAggregate(
ElasticSearchBackingMap.getFactoryFor(
ElasticSearchConfig.ES_HOST,
ElasticSearchConfig.ES_PORT,
ElasticSearchConfig.ES_CLUSTER_NAME
),
new Fields(FieldNames.OUTCOME),
new BatchAggregator(),
new Fields(FieldNames.AGGREGATED));
This topology is wrapped into some public static void main, packed in a jar and sent to Storm for execution.
The question is, should I worry about closing the ElasticSearch connection or it is Storm's own business? If it is not done by Storm, how and when in the topology's lifecycle I should do that?
Thanks in advance!
Okay, answering my own question.
First of all, thanks again @dedek for suggestions and reviving the ticket in Storm's Jira.
Finally, since there's no official way to do that, I've decided to go for cleanup() method of Trident's Filter. So far I've verified the following (for Storm v. 0.9.4):
With LocalCluster
With a real cluster
In overall that gives more or less decent guarantee of cleanup() to get called, provided you'll be careful with exception handling (I tend to add 'thundercatches' to every of my Trident primitives anyway).
My code:
public class CloseFilter implements Filter {
private static final Logger LOG = LoggerFactory.getLogger(CloseFilter.class);
private final Closeable[] closeables;
public CloseFilter(Closeable... closeables) {
this.closeables = closeables;
}
@Override
public boolean isKeep(TridentTuple tuple) {
return true;
}
@Override
public void prepare(Map conf, TridentOperationContext context) {
}
@Override
public void cleanup() {
for (Closeable c : closeables) {
try {
c.close();
} catch (Exception e) {
LOG.warn("Failed to close an instance of {}", c.getClass(), e);
}
}
}
}
However would be nice if some day hooks for closing connections become a part of the API.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With