Kafka inaccessible once inside Kubernetes/Minikube

I need to run Kafka in a local Kubernetes instance (using Minikube) and to have the resulting Kafka service accessible to client applications (publishers and subscribers) outside the Minikube VM.

I have everything up and running in Minikube but I suppose that I have made a configuration mistake since I cannot access Kafka from outside. I have read similar questions and tried there suggested solutions but none of them solved the issue for me.

I have posted my YAML configuration files at https://github.com/thomasleplus/docker-kafka as well as the shell script that I am using to start the whole thing on my Ubuntu machine. I would really appreciate it if someone could help me spot what I have missed.

Here's my configuration so far:

$ kubectl describe service kafka-service
Name:           kafka-service
Namespace:      default
Labels:         run=kafka
Annotations:        <none>
Selector:       run=kafka
Type:           NodePort
Port:           kafka-port  30123/TCP
NodePort:       kafka-port  30123/TCP
Session Affinity:   None
Events:         <none>

$ kubectl describe deployment kafka-deployment
Name:           kafka-deployment
Namespace:      default
CreationTimestamp:  Thu, 17 Aug 2017 20:42:51 -0700
Labels:         run=kafka
Annotations:        deployment.kubernetes.io/revision=1
Selector:       run=kafka
Replicas:       1 desired | 1 updated | 1 total | 1 available | 0 unavailable
StrategyType:       RollingUpdate
MinReadySeconds:    0
RollingUpdateStrategy:  1 max unavailable, 1 max surge
Pod Template:
  Labels:   run=kafka
    Image:  wurstmeister/kafka
    Port:   9092/TCP
      KAFKA_ADVERTISED_PORT:        30123
      KAFKA_BROKER_ID:          1
      KAFKA_ZOOKEEPER_CONNECT:      zookeeper-service:2181
    Mounts:             <none>
  Volumes:              <none>
  Type      Status  Reason
  ----      ------  ------
  Available     True    MinimumReplicasAvailable
OldReplicaSets: <none>
NewReplicaSet:  kafka-deployment-2817439001 (1/1 replicas created)
  FirstSeen LastSeen    Count   From            SubObjectPath   Type        Reason          Message
  --------- --------    -----   ----            -------------   --------    ------          -------
  15m       15m     1   deployment-controller           Normal      ScalingReplicaSet   Scaled up replica set kafka-deployment-2817439001 to 1

The logs:

waiting for kafka to be ready
[2017-08-18 04:31:00,296] INFO KafkaConfig values: 
    advertised.host.name = null
    advertised.listeners = PLAINTEXT://
    advertised.port = null
    alter.config.policy.class.name = null
    authorizer.class.name = 
    auto.create.topics.enable = true
    auto.leader.rebalance.enable = true
    background.threads = 10
    broker.id = 1
    broker.id.generation.enable = true
    broker.rack = null
    compression.type = producer
    connections.max.idle.ms = 600000
    controlled.shutdown.enable = true
    controlled.shutdown.max.retries = 3
    controlled.shutdown.retry.backoff.ms = 5000
    controller.socket.timeout.ms = 30000
    create.topic.policy.class.name = null
    default.replication.factor = 1
    delete.records.purgatory.purge.interval.requests = 1
    delete.topic.enable = false
    fetch.purgatory.purge.interval.requests = 1000
    group.initial.rebalance.delay.ms = 0
    group.max.session.timeout.ms = 300000
    group.min.session.timeout.ms = 6000
    host.name = 
    inter.broker.listener.name = null
    inter.broker.protocol.version = 0.11.0-IV2
    leader.imbalance.check.interval.seconds = 300
    leader.imbalance.per.broker.percentage = 10
    listeners = PLAINTEXT://:9092
    log.cleaner.backoff.ms = 15000
    log.cleaner.dedupe.buffer.size = 134217728
    log.cleaner.delete.retention.ms = 86400000
    log.cleaner.enable = true
    log.cleaner.io.buffer.load.factor = 0.9
    log.cleaner.io.buffer.size = 524288
    log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
    log.cleaner.min.cleanable.ratio = 0.5
    log.cleaner.min.compaction.lag.ms = 0
    log.cleaner.threads = 1
    log.cleanup.policy = [delete]
    log.dir = /tmp/kafka-logs
    log.dirs = /kafka/kafka-logs-kafka-deployment-2817439001-tqbjq
    log.flush.interval.messages = 9223372036854775807
    log.flush.interval.ms = null
    log.flush.offset.checkpoint.interval.ms = 60000
    log.flush.scheduler.interval.ms = 9223372036854775807
    log.flush.start.offset.checkpoint.interval.ms = 60000
    log.index.interval.bytes = 4096
    log.index.size.max.bytes = 10485760
    log.message.format.version = 0.11.0-IV2
    log.message.timestamp.difference.max.ms = 9223372036854775807
    log.message.timestamp.type = CreateTime
    log.preallocate = false
    log.retention.bytes = -1
    log.retention.check.interval.ms = 300000
    log.retention.hours = 168
    log.retention.minutes = null
    log.retention.ms = null
    log.roll.hours = 168
    log.roll.jitter.hours = 0
    log.roll.jitter.ms = null
    log.roll.ms = null
    log.segment.bytes = 1073741824
    log.segment.delete.delay.ms = 60000
    max.connections.per.ip = 2147483647
    max.connections.per.ip.overrides = 
    message.max.bytes = 1000012
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    min.insync.replicas = 1
    num.io.threads = 8
    num.network.threads = 3
    num.partitions = 1
    num.recovery.threads.per.data.dir = 1
    num.replica.fetchers = 1
    offset.metadata.max.bytes = 4096
    offsets.commit.required.acks = -1
    offsets.commit.timeout.ms = 5000
    offsets.load.buffer.size = 5242880
    offsets.retention.check.interval.ms = 600000
    offsets.retention.minutes = 1440
    offsets.topic.compression.codec = 0
    offsets.topic.num.partitions = 50
    offsets.topic.replication.factor = 1
    offsets.topic.segment.bytes = 104857600
    port = 9092
    principal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
    producer.purgatory.purge.interval.requests = 1000
    queued.max.requests = 500
    quota.consumer.default = 9223372036854775807
    quota.producer.default = 9223372036854775807
    quota.window.num = 11
    quota.window.size.seconds = 1
    replica.fetch.backoff.ms = 1000
    replica.fetch.max.bytes = 1048576
    replica.fetch.min.bytes = 1
    replica.fetch.response.max.bytes = 10485760
    replica.fetch.wait.max.ms = 500
    replica.high.watermark.checkpoint.interval.ms = 5000
    replica.lag.time.max.ms = 10000
    replica.socket.receive.buffer.bytes = 65536
    replica.socket.timeout.ms = 30000
    replication.quota.window.num = 11
    replication.quota.window.size.seconds = 1
    request.timeout.ms = 30000
    reserved.broker.max.id = 1000
    sasl.enabled.mechanisms = [GSSAPI]
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.principal.to.local.rules = [DEFAULT]
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism.inter.broker.protocol = GSSAPI
    security.inter.broker.protocol = PLAINTEXT
    socket.receive.buffer.bytes = 102400
    socket.request.max.bytes = 104857600
    socket.send.buffer.bytes = 102400
    ssl.cipher.suites = null
    ssl.client.auth = none
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.abort.timed.out.transaction.cleanup.interval.ms = 60000
    transaction.max.timeout.ms = 900000
    transaction.remove.expired.transaction.cleanup.interval.ms = 3600000
    transaction.state.log.load.buffer.size = 5242880
    transaction.state.log.min.isr = 1
    transaction.state.log.num.partitions = 50
    transaction.state.log.replication.factor = 1
    transaction.state.log.segment.bytes = 104857600
    transactional.id.expiration.ms = 604800000
    unclean.leader.election.enable = false
    zookeeper.connect = zookeeper-service:2181
    zookeeper.connection.timeout.ms = 6000
    zookeeper.session.timeout.ms = 6000
    zookeeper.set.acl = false
    zookeeper.sync.time.ms = 2000
[2017-08-18 04:31:00,436] INFO starting (kafka.server.KafkaServer)
[2017-08-18 04:31:00,439] INFO Connecting to zookeeper on zookeeper-service:2181 (kafka.server.KafkaServer)
[2017-08-18 04:31:00,467] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
[2017-08-18 04:31:00,472] INFO Client environment:zookeeper.version=3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT (org.apache.zookeeper.ZooKeeper)
[2017-08-18 04:31:00,472] INFO Client environment:host.name=kafka-deployment-2817439001-tqbjq (org.apache.zookeeper.ZooKeeper)
[2017-08-18 04:31:00,473] INFO Client environment:java.version=1.8.0_131 (org.apache.zookeeper.ZooKeeper)
[2017-08-18 04:31:00,473] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
[2017-08-18 04:31:00,473] INFO Client environment:java.home=/opt/jdk1.8.0_131/jre (org.apache.zookeeper.ZooKeeper)
[2017-08-18 04:31:00,473] INFO Client environment:java.class.path=:/opt/kafka/bin/../libs/aopalliance-repackaged-2.5.0-b05.jar:/opt/kafka/bin/../libs/argparse4j-0.7.0.jar:/opt/kafka/bin/../libs/commons-lang3-3.5.jar:/opt/kafka/bin/../libs/connect-api- (org.apache.zookeeper.ZooKeeper)
[2017-08-18 04:31:00,473] INFO Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)
[2017-08-18 04:31:00,473] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
[2017-08-18 04:31:00,473] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
[2017-08-18 04:31:00,473] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
[2017-08-18 04:31:00,473] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
[2017-08-18 04:31:00,474] INFO Client environment:os.version=4.9.13 (org.apache.zookeeper.ZooKeeper)
[2017-08-18 04:31:00,474] INFO Client environment:user.name=root (org.apache.zookeeper.ZooKeeper)
[2017-08-18 04:31:00,474] INFO Client environment:user.home=/root (org.apache.zookeeper.ZooKeeper)
[2017-08-18 04:31:00,474] INFO Client environment:user.dir=/ (org.apache.zookeeper.ZooKeeper)
[2017-08-18 04:31:00,475] INFO Initiating client connection, connectString=zookeeper-service:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@6d2a209c (org.apache.zookeeper.ZooKeeper)
[2017-08-18 04:31:00,500] INFO Waiting for keeper state SyncConnected (org.I0Itec.zkclient.ZkClient)
[2017-08-18 04:31:00,505] INFO Opening socket connection to server zookeeper-service.default.svc.cluster.local/ Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2017-08-18 04:31:00,516] INFO Socket connection established to zookeeper-service.default.svc.cluster.local/, initiating session (org.apache.zookeeper.ClientCnxn)
[2017-08-18 04:31:00,558] INFO Session establishment complete on server zookeeper-service.default.svc.cluster.local/, sessionid = 0x15df39b70410000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2017-08-18 04:31:00,560] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
[2017-08-18 04:31:00,682] INFO Cluster ID = V2Mj7cI3SMG_VoQxmtb9Tw (kafka.server.KafkaServer)
[2017-08-18 04:31:00,685] WARN No meta.properties file under dir /kafka/kafka-logs-kafka-deployment-2817439001-tqbjq/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2017-08-18 04:31:00,727] INFO [ThrottledRequestReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2017-08-18 04:31:00,727] INFO [ThrottledRequestReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2017-08-18 04:31:00,728] INFO [ThrottledRequestReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2017-08-18 04:31:00,801] INFO Log directory '/kafka/kafka-logs-kafka-deployment-2817439001-tqbjq' not found, creating it. (kafka.log.LogManager)
[2017-08-18 04:31:00,810] INFO Loading logs. (kafka.log.LogManager)
[2017-08-18 04:31:00,819] INFO Logs loading complete in 9 ms. (kafka.log.LogManager)
[2017-08-18 04:31:00,891] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
[2017-08-18 04:31:00,899] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
[2017-08-18 04:31:00,960] INFO Awaiting socket connections on (kafka.network.Acceptor)
[2017-08-18 04:31:00,965] INFO [Socket Server on Broker 1], Started 1 acceptor threads (kafka.network.SocketServer)
[2017-08-18 04:31:00,982] INFO [ExpirationReaper-1-Produce]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2017-08-18 04:31:00,985] INFO [ExpirationReaper-1-Fetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2017-08-18 04:31:00,989] INFO [ExpirationReaper-1-DeleteRecords]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2017-08-18 04:31:01,089] INFO [ExpirationReaper-1-topic]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2017-08-18 04:31:01,090] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2017-08-18 04:31:01,101] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2017-08-18 04:31:01,101] INFO [ExpirationReaper-1-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2017-08-18 04:31:01,128] INFO [ExpirationReaper-1-Rebalance]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2017-08-18 04:31:01,164] INFO [GroupCoordinator 1]: Starting up. (kafka.coordinator.group.GroupCoordinator)
[2017-08-18 04:31:01,170] INFO [GroupCoordinator 1]: Startup complete. (kafka.coordinator.group.GroupCoordinator)
[2017-08-18 04:31:01,178] INFO [Group Metadata Manager on Broker 1]: Removed 0 expired offsets in 11 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2017-08-18 04:31:01,200] INFO [ProducerId Manager 1]: Acquired new producerId block (brokerId:1,blockStartProducerId:0,blockEndProducerId:999) by writing to Zk with path version 1 (kafka.coordinator.transaction.ProducerIdManager)
[2017-08-18 04:31:01,259] INFO [Transaction Coordinator 1]: Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
[2017-08-18 04:31:01,277] INFO [Transaction Coordinator 1]: Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
[2017-08-18 04:31:01,277] INFO [Transaction Marker Channel Manager 1]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2017-08-18 04:31:01,335] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
[2017-08-18 04:31:01,374] INFO Creating /brokers/ids/1 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2017-08-18 04:31:01,378] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2017-08-18 04:31:01,381] INFO Registered broker 1 at path /brokers/ids/1 with addresses: EndPoint(,30123,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.utils.ZkUtils)
[2017-08-18 04:31:01,383] WARN No meta.properties file under dir /kafka/kafka-logs-kafka-deployment-2817439001-tqbjq/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2017-08-18 04:31:01,394] INFO Kafka version : (org.apache.kafka.common.utils.AppInfoParser)
[2017-08-18 04:31:01,394] INFO Kafka commitId : cb8625948210849f (org.apache.kafka.common.utils.AppInfoParser)
[2017-08-18 04:31:01,395] INFO [Kafka Server 1], started (kafka.server.KafkaServer)
[2017-08-18 04:41:01,167] INFO [Group Metadata Manager on Broker 1]: Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)

Most answers to similar questions recommend to use the service type NodePort as I do. And to use port/targetPort/nodePort to map the default 9092 port of Kafka to an exposable port (I chose 30123).

$ minikube service kafka-service --url

$ nmap -p 30123

Starting Nmap 7.40 ( https://nmap.org ) at 2017-08-17 20:43 PDT
Nmap scan report for
Host is up (0.00036s latency).
30123/tcp open  unknown

Nmap done: 1 IP address (1 host up) scanned in 13.06 seconds

In the end, it looks like should be the way to access Kafka from outside Minikube (so that's what I've put in KAFKA_ADVERTISED_HOST_NAME and KAFKA_ADVERTISED_PORT) yet clients can't connect using these:

$ kafkacat -C -b -t demo
% ERROR: Topic demo error: Broker: Leader not available

Finally some answers mention potential firewall interference so I have tried disabling my machine's firewall but it didn't change anything. If I need to disable the firewall inside the Minikube VM, I am not sure how to do that.

Any help would be greatly appreciated.

Thomas Leplus

2 Answers

The problem is to do with a bug in recent versions of minikube (see https://github.com/kubernetes/minikube/issues/1690).

The solution is simply:

minikube ssh sudo ip link set docker0 promisc on

Fab


I know that this is an old post but I have had a similar "Leader not available" issue that I faced with almost a similar kafka + zookeeper deployment and service within minikube. The crux of this issue is that zookeeper is not able to talk to kafka within minikube.

The easiest fix for you here would be to revert to an earlier compatible version of minikube: v0.17.1 (that uses kubernetes version v1.5.3). If this is not an option for you, the possible workarounds that worked for me in minikube versions v0.19.0, v0.21.0 and v0.23.0 were the following for me:

  1. Force the minikube to start with a compatible working server version of kubernetes. The version v1.5.3 worked for me. I was facing this issue with minikube versions v0.21.0 (that uses kubernetes version v1.7.0) and v0.23.0 (that uses kubernetes version v1.7.0). To start minikube with a specific kubernetes version, use the command: minikube start --kubernetes-version v1.5.3
  2. Secondly, the issue was that I was using the KAFKA_ZOOKEEPER_CONNECT: zookeeper-service:2181 like you. Whilst this setting seemed to work fine if minikube was started with kubernetes client and server version v1.5.3 (i.e. using minikube version v0.17.1), this setting seemed to fail for kubernetes server version v1.7.0 (i.e. using minikube versions v0.21.0 and v0.23.0). The workaround was to expose the zookeeper service using type NodePort like so:

    apiVersion: v1
    kind: Service
        app: zookeeper-service
      name: zookeeper-service
      type: NodePort
      - name: zookeeper-port
        port: 2181
        nodePort: 30181
        targetPort: 2181
        app: zookeeper

and updating the kafka-deployment.yml like so:

apiVersion: extensions/v1beta1
kind: Deployment
    app: kafka
  name: kafka
  replicas: 1
        app: kafka
      - env:
          value: ""
          value: "30123"
        - name: KAFKA_BROKER_ID
          value: "1"
        - name: KAFKA_CREATE_TOPICS
          value: "demo:1:1"
        image: wurstmeister/kafka
        imagePullPolicy: Always
        name: kafka
        - containerPort: 9092

echo "Am I receiving this message?" | kafkacat -P -b -t demo

kafkacat -C -b -t demo

% Reached end of topic demo [0] at offset 0

Am I receiving this message?

Unfortunately, the latest minikube version v0.25.0 (that uses kubernetes version v1.9.0) does not support downgrading the kubernetes version and this workaround does not work if you are using the latest minikube version.

If anyone else finds a better solution on this issue, please do update this thread!


Abhilash Nair