Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

kafka embedded : java.io.FileNotFoundException: /tmp/kafka-7785736914220873149/replication-offset-checkpoint.tmp

I use kafkaEmbedded in integration test and I get FileNotFoundException :

java.io.FileNotFoundException: /tmp/kafka-7785736914220873149/replication-offset-checkpoint.tmp 
at java.io.FileOutputStream.open0(Native Method) ~[na:1.8.0_141]
at java.io.FileOutputStream.open(FileOutputStream.java:270) ~[na:1.8.0_141]
at java.io.FileOutputStream.<init>(FileOutputStream.java:213) ~[na:1.8.0_141]
at java.io.FileOutputStream.<init>(FileOutputStream.java:162) ~[na:1.8.0_141]
at kafka.server.checkpoints.CheckpointFile.write(CheckpointFile.scala:43) ~[kafka_2.11-0.11.0.0.jar:na]
at kafka.server.checkpoints.OffsetCheckpointFile.write(OffsetCheckpointFile.scala:58) ~[kafka_2.11-0.11.0.0.jar:na]
at kafka.server.ReplicaManager$$anonfun$checkpointHighWatermarks$2.apply(ReplicaManager.scala:1118) [kafka_2.11-0.11.0.0.jar:na]
at kafka.server.ReplicaManager$$anonfun$checkpointHighWatermarks$2.apply(ReplicaManager.scala:1115) [kafka_2.11-0.11.0.0.jar:na]
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) [scala-library-2.11.11.jar:na]
at scala.collection.immutable.Map$Map1.foreach(Map.scala:116) [scala-library-2.11.11.jar:na]
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) [scala-library-2.11.11.jar:na]
at kafka.server.ReplicaManager.checkpointHighWatermarks(ReplicaManager.scala:1115) [kafka_2.11-0.11.0.0.jar:na]
at kafka.server.ReplicaManager$$anonfun$1.apply$mcV$sp(ReplicaManager.scala:211) [kafka_2.11-0.11.0.0.jar:na]
at kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110) [kafka_2.11-0.11.0.0.jar:na]
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:57) [kafka_2.11-0.11.0.0.jar:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_141]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_141]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_141]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_141]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_141]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_141]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_141]

My tests pass with success but I get this error in the end of my build

After many hours of research I found this :

  • kafka TestUtils.tempDirectory method is used to create temporary directory for embedded kafka broker. It also registers shutdown hook which deletes this directory when JVM exits.
  • when unit test finishes execution it calls System.exit, which in turn executes all registered shutdown hooks

If kafka broker runs at the end of unit test it will attempt to write/read data in a dir which is deleted and produces different FileNotFound exceptions.

My config class :

@Configuration
public class KafkaEmbeddedConfiguration {

private final KafkaEmbedded kafkaEmbedded;

public KafkaEmbeddedListenerConfigurationIT() throws Exception {
    kafkaEmbedded = new KafkaEmbedded(1, true, "topic1");
    kafkaEmbedded.before();
}

@Bean
public KafkaTemplate<String, Message> sender(ProtobufSerializer protobufSerializer,
        KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry) throws Exception {
    KafkaTemplate<String, Message> sender = KafkaTestUtils.newTemplate(kafkaEmbedded, new StringSerializer(),
            protobufSerializer);
for (MessageListenerContainer listenerContainer : 
registry.getListenerContainers()) {
        ContainerTestUtils.waitForAssignment(listenerContainer, 
kafkaEmbedded.getPartitionsPerTopic());
    }        

    return sender;
}

Test class :

@RunWith(SpringRunner.class)
public class DeviceEnergyKafkaListenerIT {
 ...
@Autowired
private KafkaTemplate<String, Message> sender;

@Test
public void test (){
    ...
    sender.send(topic, msg);
    sender.flush();
}

Any ideas how to resolve this please ?

like image 647
qasmi Avatar asked Feb 27 '18 10:02

qasmi


2 Answers

With a @ClassRule broker, add an @AfterClass method...

@AfterClass
public static void tearDown() {
    embeddedKafka.getKafkaServers().forEach(b -> b.shutdown());
    embeddedKafka.getKafkaServers().forEach(b -> b.awaitShutdown());
}

For a @Rule or bean, use an @After method.

like image 187
Gary Russell Avatar answered Nov 19 '22 16:11

Gary Russell


final KafkaServer server = 
embeddedKafka.getKafkaServers().stream().findFirst().orElse(null);  
if(server != null) {
  server.replicaManager().shutdown(false);
final Field replicaManagerField = server.getClass().getDeclaredField("replicaManager");
if(replicaManagerField != null) {
    replicaManagerField.setAccessible(true);
    replicaManagerField.set(server, null);
 }
}
embeddedKafka.after();

For a more detail discussion you can refer this thread Embedded kafka issue with multiple tests using the same context

like image 27
pannu Avatar answered Nov 19 '22 15:11

pannu