Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring Cloud Kafka Stream Unable to create Producer Config Error

I have two Spring boot project with Kafka-stream dependencies, they have exactly same dependencies in gradle and exactly same configurations, yet one of the project when started logs error as below

11:35:37.974 [restartedMain] INFO  o.a.k.c.admin.AdminClientConfig - AdminClientConfig values: 
    bootstrap.servers = [192.169.0.109:6667]
    client.id = client
    connections.max.idle.ms = 300000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 120000
    retries = 5
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS

11:35:38.017 [restartedMain] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka version : 1.0.0
11:35:38.017 [restartedMain] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka commitId : aaa7af6d4a11b29d
11:35:38.136 [restartedMain] INFO  o.s.c.s.b.k.p.KafkaTopicProvisioner - Using kafka topic for outbound: createschedule
11:36:08.147 [restartedMain] ERROR o.s.c.stream.binding.BindingService - Failed to create producer binding; retrying in 30 seconds
org.springframework.cloud.stream.provisioning.ProvisioningException: provisioning exception; nested exception is java.util.concurrent.TimeoutException
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:243)
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:126)
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:71)
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:140)
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:77)
    at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:136)
    at org.springframework.cloud.stream.binding.BindingService.doBindProducer(BindingService.java:244)
    at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:221)
    at org.springframework.cloud.stream.binding.BindableProxyFactory.bindOutputs(BindableProxyFactory.java:252)
    at org.springframework.cloud.stream.binding.OutputBindingLifecycle.doStartWithBindable(OutputBindingLifecycle.java:46)
    at java.util.Iterator.forEachRemaining(Iterator.java:116)
    at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
    at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
    at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:47)
    at org.springframework.cloud.stream.binding.OutputBindingLifecycle.start(OutputBindingLifecycle.java:29)
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:181)
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:52)
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356)
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:157)
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:121)
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:884)
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:161)
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:552)
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:140)
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:752)
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:388)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:327)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1246)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1234)
    at com.und.ClientPanelApplicationKt.main(ClientPanelApplication.kt:21)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49)
Caused by: java.util.concurrent.TimeoutException: null
    at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:108)
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:225)
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicAndPartitions(KafkaTopicProvisioner.java:271)
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicIfNecessary(KafkaTopicProvisioner.java:251)
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:236)
    ... 34 common frames omitted

I am using kotlin 2.30, and Spring Release 2.0.0.0 Relevant part of my build gradle is below

ext {
    kotlinVersion = '1.2.30'
    springBootVersion = '2.0.0.RELEASE'
    springCloudVersion = 'Finchley.M8'
}

dependencies{
  compile('org.springframework.cloud:spring-cloud-starter-stream-kafka')
}

I have defined one OutPut channel as below

interface EventStream {

    @Output("createschedule")
    fun outputEvent(): MessageChannel

}

Below is relvant section of my code where i have defined sender and listener

@Service
class TestService {



    @Autowired
    private lateinit var eventStream: EventStream

    fun test() {
        processSchedule("Hello")
    }






    @SendTo("createschedule")
    fun processSchedule(campaign: String): String {
        return campaign
    }

    @StreamListener("createschedule")
    fun listenSchedule(campaign: String) {
        println(campaign)
        //return campaign
    }

}

Below is relevant section of my application.yaml

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: ${KAFKA_IP}
          defaultBrokerPort: ${KAFKA_PORT}
          zkNodes: ${ZK_IP}
          defaultZkPort: ${ZK_PORT}
        bindings:
          createschedule:
            group: createscheduleGroup
            destination: createschedule
            consumer:
              group: createscheduleGroup



  kafka:
    admin:
      properties:
        #security.protocol: SSL
        client.id: client

As i already stated it throw error while starting and after it starts it keep throwing below error in logs

11:54:38.231 [pool-3-thread-1] INFO  o.s.c.s.b.k.p.KafkaTopicProvisioner - Using kafka topic for outbound: createschedule
11:54:59.070 [kafka-admin-client-thread | client-panel] WARN  o.apache.kafka.clients.NetworkClient - [AdminClient clientId=client-panel] Connection to node -1 could not be established. Broker may not be available.
11:55:08.234 [pool-3-thread-1] ERROR o.s.c.stream.binding.BindingService - Failed to create producer binding; retrying in 30 seconds
org.springframework.cloud.stream.provisioning.ProvisioningException: provisioning exception; nested exception is java.util.concurrent.TimeoutException
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:243)
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:126)
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:71)
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:140)
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:77)
    at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:136)
    at org.springframework.cloud.stream.binding.BindingService.lambda$rescheduleProducerBinding$2(BindingService.java:262)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at java.util.concurrent.Executors$RunnableAdapter.call$$$capture(Executors.java:511)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java)
    at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
    at java.util.concurrent.FutureTask.run(FutureTask.java)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.TimeoutException: null
    at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:108)
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:225)
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicAndPartitions(KafkaTopicProvisioner.java:271)
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicIfNecessary(KafkaTopicProvisioner.java:251)
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:236)
    ... 16 common frames omitted
like image 329
no name Avatar asked Mar 22 '18 06:03

no name


1 Answers

It is the problem with kafka. If you are using docker cointainers, then stop kafka, remove kafka and start kafka cointainer.

like image 152
Dheeraj Avatar answered Oct 23 '22 06:10

Dheeraj