Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

kafka on kubernetes cannot produce/consume topics (ClosedChannelException, ErrorLoggingCallback)

I run 1 kafka and 3 zookeeper-server in docker on kubernetes following this instruction. I cannot produce/consume topics outside pod(docker container).

bin/kafka-console-producer.sh --broker-list 1.2.3.4:9092 --topic

[2016-06-11 15:14:46,889] ERROR Error when sending message to topic test with key: null, value: 3 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 3 record(s) expired due to timeout while requesting metadata from brokers for test-0

bin/kafka-console-consumer.sh --zookeeper 5.6.7.8:2181 --topic test --from-beginning 

[2016-06-11 15:15:58,985] WARN Fetching topic metadata with correlation id 0 for topics [Set(test)] from broker [BrokerEndPoint(1001,kafka-service,9092)] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2016-06-11 15:15:58,992] WARN [console-consumer-66869_tattoo-NV49C-1465629357799-ce1529da-leader-finder-thread], Failed to find leader for Set([test,0]) (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(BrokerEndPoint(1001,kafka-service,9092))] failed
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
Caused by: java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
    ... 3 more


kafka log:
    [2016-06-11 07:47:58,269] INFO [Kafka Server 1001], started (kafka.server.KafkaServer)
[2016-06-11 07:53:50,404] INFO [ReplicaFetcherManager on broker 1001] Removed fetcher for partitions [test,0] (kafka.    server.ReplicaFetcherManager)
[2016-06-11 07:53:50,443] INFO Completed load of log test-0 with log end offset 0 (kafka.log.Log)
[2016-06-11 07:53:50,458] INFO Created log for partition [test,0] in /kafka/kafka-logs-kafka-controller-3rsv3 with     properties {compression.type -> producer, message.format.version -> 0.10.0-IV1, file.delete.delay.ms -> 60000, max.    message.bytes -> 1000012, message.timestamp.type -> CreateTime, min.insync.replicas -> 1, segment.jitter.ms -> 0,     preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable     -> true, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> delete, flush.ms ->     9223372036854775807, segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 604800000, message.    timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages ->     9223372036854775807}. (kafka.log.LogManager)
[2016-06-11 07:53:50,459] INFO Partition [test,0] on broker 1001: No checkpointed highwatermark is found for     partition [test,0] (kafka.cluster.Partition)
[2016-06-11 07:57:57,955] INFO [Group Metadata Manager on Broker 1001]: Removed 0 expired offsets in 0 milliseconds. (    kafka.coordinator.GroupMetadataManager)

And the config/server.properties

broker.id=-1
log.dirs=/kafka/kafka-logs-kafka-controller-3rsv3
num.partitions=1
zookeeper.connect=zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
zookeeper.connection.timeout.ms=6000

service.port.9092.tcp.addr=10.254.68.65
service.port.9092.tcp.proto=tcp
service.service.port.kafka.port=9092
service.service.port=9092
service.port=tcp://10.254.68.65:9092
service.port.9092.tcp.port=9092
version=0.10.0.0
service.service.host=10.254.68.65
port=9092
advertised.host.name=kafka-service
service.port.9092.tcp=tcp://10.254.68.65:9092
advertised.port=9092

But I can do bin/kafka-console-producer.sh --broker-list localhost:9092 --topic and bin/kafka-console-consumer.sh --zookeeper 5.6.7.8:2181 --topic test --from-beginning if I am inside the pod (docker container).

And I can create and list topics normally when connecting to zookeeper's service:

bin/kafka-topics.sh --describe --zookeeper 5.6.7.8:2181 --topic test
Topic:test  PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: test Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001

And my yaml file for creating kafka replicaiton-congtroller and service:

---
apiVersion: v1
kind: Service
metadata:
  name: kafka-service2
  labels:
    app: kafka2
spec:
  clusterIP: None
  ports:
  - port: 9092
    name: kafka-port
    targetPort: 9092
    protocol: TCP
  selector:
    app: kafka2
---
apiVersion: v1
kind: ReplicationController
metadata:
  name: kafka-controller2
spec:
  replicas: 1
  selector:
    app: kafka2
  template:
    metadata:
      labels:
        app: kafka2
    spec:
      containers:
      - name: kafka2
        image: wurstmeister/kafka
        ports:
        - containerPort: 9092
        env:
        - name: KAFKA_ADVERTISED_PORT
          value: "9092"
        - name: KAFKA_ADVERTISED_HOST_NAME
          value: kafka-service2
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: zoo1:2181,zoo2:2181,zoo3:2181
like image 803
Haoyuan Ge Avatar asked Jun 11 '16 08:06

Haoyuan Ge


People also ask

Can I run Kafka in Kubernetes?

Apache Kafka is frequently deployed on the Kubernetes container management system, which is used to automate deployment, scaling, and operation of containers across clusters of hosts. Apache Kafka on Kubernetes goes hand-in-hand with cloud-native development, the next generation of application development.

How does Kafka work with Kubernetes?

Kafka runs as a cluster of brokers, and these brokers can be deployed across a Kubernetes system and made to land on different workers across separate fault domains. Kubernetes automatically recovers pods when nodes or containers fail, so it can do this for your brokers too.

How Kafka is highly available?

For a Kafka cluster to be highly available, you need to make certain both the data plane and control plane (whichever kind is being used) are highly available.


1 Answers

Kafka register to zookeeper with its service's name. And consuming/producing messages need access to the service names(here is dns records on zookeeper-1, zookeeper-2, zookeeper-3), which are only accessible through kubernetes' dns. So only application running on kubernetes can access my kafka. Therefore I cannot use the external IP of kafka-service or port-forwarding the kafka pod to localhost and then access it.

But why can I create, list and describe topics outside kubernetes cluster? I guess it is because zookeepers can do those operation by themselves. But consuming/producing message will need access to the ADVERTISED_HOST_NAME provided by kafka.

like image 198
Haoyuan Ge Avatar answered Nov 24 '22 08:11

Haoyuan Ge