I was setting up a storm cluster to calculate real time trending and other statistics, however I have some problems introducing the "recovery" feature into this project, by allowing the offset that was last read by the kafka-spout
(the source code for kafka-spout
comes from https://github.com/apache/incubator-storm/tree/master/external/storm-kafka) to be remembered. I start my kafka-spout
in this way:
BrokerHosts zkHost = new ZkHosts("localhost:2181");
SpoutConfig kafkaConfig = new SpoutConfig(zkHost, "test", "", "test");
kafkaConfig.forceFromStart = false;
KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("test" + "spout", kafkaSpout, ESConfig.spoutParallelism);
The default settings should be doing this, but I think it is not doing so in my case, every time I start my project, the PartitionManager
tries to look for the file with the offsets, then nothing is found:
2014-06-25 11:57:08 INFO PartitionManager:73 - Read partition information from: /storm/partition_1 --> null
2014-06-25 11:57:08 INFO PartitionManager:86 - No partition information found, using configuration to determine offset
Then it starts reading from the latest possible offset. Which is okay if my project never fails, but not exactly what I wanted.
I also looked a bit more into the PartitionManager
class which uses Zkstate
class to write the offsets, from this code snippet:
PartitionManeger
public void commit() {
long lastCompletedOffset = lastCompletedOffset();
if (_committedTo != lastCompletedOffset) {
LOG.debug("Writing last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
Map<Object, Object> data = (Map<Object, Object>) ImmutableMap.builder()
.put("topology", ImmutableMap.of("id", _topologyInstanceId,
"name", _stormConf.get(Config.TOPOLOGY_NAME)))
.put("offset", lastCompletedOffset)
.put("partition", _partition.partition)
.put("broker", ImmutableMap.of("host", _partition.host.host,
"port", _partition.host.port))
.put("topic", _spoutConfig.topic).build();
_state.writeJSON(committedPath(), data);
_committedTo = lastCompletedOffset;
LOG.debug("Wrote last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
} else {
LOG.debug("No new offset for " + _partition + " for topology: " + _topologyInstanceId);
}
}
ZkState
public void writeBytes(String path, byte[] bytes) {
try {
if (_curator.checkExists().forPath(path) == null) {
_curator.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(path, bytes);
} else {
_curator.setData().forPath(path, bytes);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
I could see that for the first message, the writeBytes
method gets into the if
block and tries to create a path, then for the second message it goes into the else
block, which seems to be ok. But when I start the project again, the same message as mentioned above shows up. No partition information
can be found.
I had the same problem. Turned out I was running in local mode which uses an in memory zookeeper and not the zookeeper that Kafka is using.
To make sure that KafkaSpout doesn't use Storm's ZooKeeper for the ZkState
that stores the offset, you need to set the SpoutConfig.zkServers
, SpoutConfig.zkPort
, and SpoutConfig.zkRoot
in addition to the ZkHosts
. For example
import org.apache.zookeeper.client.ConnectStringParser;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;
import storm.kafka.KeyValueSchemeAsMultiScheme;
...
final ConnectStringParser connectStringParser = new ConnectStringParser(zkConnectStr);
final List<InetSocketAddress> serverInetAddresses = connectStringParser.getServerAddresses();
final List<String> serverAddresses = new ArrayList<>(serverInetAddresses.size());
final Integer zkPort = serverInetAddresses.get(0).getPort();
for (InetSocketAddress serverInetAddress : serverInetAddresses) {
serverAddresses.add(serverInetAddress.getHostName());
}
final ZkHosts zkHosts = new ZkHosts(zkConnectStr);
zkHosts.brokerZkPath = kafkaZnode + zkHosts.brokerZkPath;
final SpoutConfig spoutConfig = new SpoutConfig(zkHosts, inputTopic, kafkaZnode, kafkaConsumerGroup);
spoutConfig.scheme = new KeyValueSchemeAsMultiScheme(inputKafkaKeyValueScheme);
spoutConfig.zkServers = serverAddresses;
spoutConfig.zkPort = zkPort;
spoutConfig.zkRoot = kafkaZnode;
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With