Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to close a database connection opened by an IBackingMap implementation within a Storm Trident topology?

I'm implementing an IBackingMap for my Trident topology to store tuples to ElasticSearch (I know there are several implementations for Trident/ElasticSearch integration already existing at GitHub however I've decided to implement a custom one which suits my task better).

So my implementation is a classic one with a factory:

public class ElasticSearchBackingMap implements IBackingMap<OpaqueValue<BatchAggregationResult>> {

    // omitting here some other cool stuff...
    private final Client client;

    public static StateFactory getFactoryFor(final String host, final int port, final String clusterName) {

        return new StateFactory() {

            @Override
            public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {

                ElasticSearchBackingMap esbm = new ElasticSearchBackingMap(host, port, clusterName);
                CachedMap cm = new CachedMap(esbm, LOCAL_CACHE_SIZE);
                MapState ms = OpaqueMap.build(cm);
                return new SnapshottableMap(ms, new Values(GLOBAL_KEY));
            }
        };
    }

    public ElasticSearchBackingMap(String host, int port, String clusterName) {

        Settings settings = ImmutableSettings.settingsBuilder()
                .put("cluster.name", clusterName).build();

        // TODO add a possibility to close the client
        client = new TransportClient(settings)
                .addTransportAddress(new InetSocketTransportAddress(host, port));
    }

    // the actual implementation is left out
}

You see it gets host/port/cluster name as input params and creates an ElasticSearch client as a member of the class BUT IT NEVER CLOSES THE CLIENT.

It is then used from within a topology in a pretty familiar way:

tridentTopology.newStream("spout", spout)
            // ...some processing steps here...
            .groupBy(aggregationFields)
            .persistentAggregate(
                    ElasticSearchBackingMap.getFactoryFor(
                            ElasticSearchConfig.ES_HOST,
                            ElasticSearchConfig.ES_PORT,
                            ElasticSearchConfig.ES_CLUSTER_NAME
                    ),
                    new Fields(FieldNames.OUTCOME),
                    new BatchAggregator(),
                    new Fields(FieldNames.AGGREGATED));

This topology is wrapped into some public static void main, packed in a jar and sent to Storm for execution.

The question is, should I worry about closing the ElasticSearch connection or it is Storm's own business? If it is not done by Storm, how and when in the topology's lifecycle I should do that?

Thanks in advance!

like image 419
bopcat Avatar asked Feb 26 '15 13:02

bopcat


1 Answers

Okay, answering my own question.

First of all, thanks again @dedek for suggestions and reviving the ticket in Storm's Jira.

Finally, since there's no official way to do that, I've decided to go for cleanup() method of Trident's Filter. So far I've verified the following (for Storm v. 0.9.4):

With LocalCluster

  • cleanup() gets called on cluster's shutdown
  • cleanup() DOESN'T get called when killing the topology, this shouldn't be a tragedy, very likely one won't use LocalCluster for real deployments anyway

With a real cluster

  • it gets called when the topology is killed as well as when the worker is stopped using pkill -TERM -u storm -f 'backtype.storm.daemon.worker'
  • it doesn't get called if the worker is killed with kill -9 or when it crashes or - sadly - when the worker dies due to an exception

In overall that gives more or less decent guarantee of cleanup() to get called, provided you'll be careful with exception handling (I tend to add 'thundercatches' to every of my Trident primitives anyway).

My code:

public class CloseFilter implements Filter {

    private static final Logger LOG = LoggerFactory.getLogger(CloseFilter.class);

    private final Closeable[] closeables;

    public CloseFilter(Closeable... closeables) {
        this.closeables = closeables;
    }

    @Override
    public boolean isKeep(TridentTuple tuple) {
        return true;
    }

    @Override
    public void prepare(Map conf, TridentOperationContext context) {

    }

    @Override
    public void cleanup() {
        for (Closeable c : closeables) {
            try {
                c.close();
            } catch (Exception e) {
                LOG.warn("Failed to close an instance of {}", c.getClass(), e);
            }
        }
    }
}

However would be nice if some day hooks for closing connections become a part of the API.

like image 139
bopcat Avatar answered Dec 07 '22 09:12

bopcat