Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Failing to write offset data to zookeeper in kafka-storm

I was setting up a storm cluster to calculate real time trending and other statistics, however I have some problems introducing the "recovery" feature into this project, by allowing the offset that was last read by the kafka-spout (the source code for kafka-spout comes from https://github.com/apache/incubator-storm/tree/master/external/storm-kafka) to be remembered. I start my kafka-spout in this way:

BrokerHosts zkHost = new ZkHosts("localhost:2181");
SpoutConfig kafkaConfig = new SpoutConfig(zkHost, "test", "", "test");
kafkaConfig.forceFromStart = false;
KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("test" + "spout", kafkaSpout, ESConfig.spoutParallelism);

The default settings should be doing this, but I think it is not doing so in my case, every time I start my project, the PartitionManager tries to look for the file with the offsets, then nothing is found:

2014-06-25 11:57:08 INFO  PartitionManager:73 - Read partition information from: /storm/partition_1  --> null
2014-06-25 11:57:08 INFO  PartitionManager:86 - No partition information found, using configuration to determine offset

Then it starts reading from the latest possible offset. Which is okay if my project never fails, but not exactly what I wanted.

I also looked a bit more into the PartitionManager class which uses Zkstate class to write the offsets, from this code snippet:

PartitionManeger

public void commit() {
    long lastCompletedOffset = lastCompletedOffset();
    if (_committedTo != lastCompletedOffset) {
        LOG.debug("Writing last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
        Map<Object, Object> data = (Map<Object, Object>) ImmutableMap.builder()
                .put("topology", ImmutableMap.of("id", _topologyInstanceId,
                        "name", _stormConf.get(Config.TOPOLOGY_NAME)))
                .put("offset", lastCompletedOffset)
                .put("partition", _partition.partition)
                .put("broker", ImmutableMap.of("host", _partition.host.host,
                        "port", _partition.host.port))
                .put("topic", _spoutConfig.topic).build();
        _state.writeJSON(committedPath(), data);

        _committedTo = lastCompletedOffset;
        LOG.debug("Wrote last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
    } else {
        LOG.debug("No new offset for " + _partition + " for topology: " + _topologyInstanceId);
    }
}

ZkState

public void writeBytes(String path, byte[] bytes) {
    try {
        if (_curator.checkExists().forPath(path) == null) {
            _curator.create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.PERSISTENT)
                    .forPath(path, bytes);
        } else {
            _curator.setData().forPath(path, bytes);
        }
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

I could see that for the first message, the writeBytes method gets into the if block and tries to create a path, then for the second message it goes into the else block, which seems to be ok. But when I start the project again, the same message as mentioned above shows up. No partition information can be found.

like image 816
Juto Avatar asked Jun 25 '14 11:06

Juto


1 Answers

I had the same problem. Turned out I was running in local mode which uses an in memory zookeeper and not the zookeeper that Kafka is using.

To make sure that KafkaSpout doesn't use Storm's ZooKeeper for the ZkState that stores the offset, you need to set the SpoutConfig.zkServers, SpoutConfig.zkPort, and SpoutConfig.zkRoot in addition to the ZkHosts. For example

import org.apache.zookeeper.client.ConnectStringParser;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;
import storm.kafka.KeyValueSchemeAsMultiScheme;

...

    final ConnectStringParser connectStringParser = new ConnectStringParser(zkConnectStr);
    final List<InetSocketAddress> serverInetAddresses = connectStringParser.getServerAddresses();
    final List<String> serverAddresses = new ArrayList<>(serverInetAddresses.size());
    final Integer zkPort = serverInetAddresses.get(0).getPort();
    for (InetSocketAddress serverInetAddress : serverInetAddresses) {
        serverAddresses.add(serverInetAddress.getHostName());
    }

    final ZkHosts zkHosts = new ZkHosts(zkConnectStr);
    zkHosts.brokerZkPath = kafkaZnode + zkHosts.brokerZkPath;

    final SpoutConfig spoutConfig = new SpoutConfig(zkHosts, inputTopic, kafkaZnode, kafkaConsumerGroup);
    spoutConfig.scheme = new KeyValueSchemeAsMultiScheme(inputKafkaKeyValueScheme);

    spoutConfig.zkServers = serverAddresses;
    spoutConfig.zkPort = zkPort;
    spoutConfig.zkRoot = kafkaZnode;
like image 145
Anthony Dotterer Avatar answered Oct 27 '22 20:10

Anthony Dotterer