I run 1 kafka and 3 zookeeper-server in docker on kubernetes following this instruction. I cannot produce/consume topics outside pod(docker container).
bin/kafka-console-producer.sh --broker-list 1.2.3.4:9092 --topic
[2016-06-11 15:14:46,889] ERROR Error when sending message to topic test with key: null, value: 3 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 3 record(s) expired due to timeout while requesting metadata from brokers for test-0
bin/kafka-console-consumer.sh --zookeeper 5.6.7.8:2181 --topic test --from-beginning
[2016-06-11 15:15:58,985] WARN Fetching topic metadata with correlation id 0 for topics [Set(test)] from broker [BrokerEndPoint(1001,kafka-service,9092)] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2016-06-11 15:15:58,992] WARN [console-consumer-66869_tattoo-NV49C-1465629357799-ce1529da-leader-finder-thread], Failed to find leader for Set([test,0]) (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(BrokerEndPoint(1001,kafka-service,9092))] failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
Caused by: java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
... 3 more
kafka log:
[2016-06-11 07:47:58,269] INFO [Kafka Server 1001], started (kafka.server.KafkaServer)
[2016-06-11 07:53:50,404] INFO [ReplicaFetcherManager on broker 1001] Removed fetcher for partitions [test,0] (kafka. server.ReplicaFetcherManager)
[2016-06-11 07:53:50,443] INFO Completed load of log test-0 with log end offset 0 (kafka.log.Log)
[2016-06-11 07:53:50,458] INFO Created log for partition [test,0] in /kafka/kafka-logs-kafka-controller-3rsv3 with properties {compression.type -> producer, message.format.version -> 0.10.0-IV1, file.delete.delay.ms -> 60000, max. message.bytes -> 1000012, message.timestamp.type -> CreateTime, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> true, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> delete, flush.ms -> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 604800000, message. timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager)
[2016-06-11 07:53:50,459] INFO Partition [test,0] on broker 1001: No checkpointed highwatermark is found for partition [test,0] (kafka.cluster.Partition)
[2016-06-11 07:57:57,955] INFO [Group Metadata Manager on Broker 1001]: Removed 0 expired offsets in 0 milliseconds. ( kafka.coordinator.GroupMetadataManager)
And the config/server.properties
broker.id=-1
log.dirs=/kafka/kafka-logs-kafka-controller-3rsv3
num.partitions=1
zookeeper.connect=zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
zookeeper.connection.timeout.ms=6000
service.port.9092.tcp.addr=10.254.68.65
service.port.9092.tcp.proto=tcp
service.service.port.kafka.port=9092
service.service.port=9092
service.port=tcp://10.254.68.65:9092
service.port.9092.tcp.port=9092
version=0.10.0.0
service.service.host=10.254.68.65
port=9092
advertised.host.name=kafka-service
service.port.9092.tcp=tcp://10.254.68.65:9092
advertised.port=9092
But I can do bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
and bin/kafka-console-consumer.sh --zookeeper 5.6.7.8:2181 --topic test --from-beginning
if I am inside the pod (docker container).
And I can create and list topics normally when connecting to zookeeper's service:
bin/kafka-topics.sh --describe --zookeeper 5.6.7.8:2181 --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
And my yaml file for creating kafka replicaiton-congtroller and service:
---
apiVersion: v1
kind: Service
metadata:
name: kafka-service2
labels:
app: kafka2
spec:
clusterIP: None
ports:
- port: 9092
name: kafka-port
targetPort: 9092
protocol: TCP
selector:
app: kafka2
---
apiVersion: v1
kind: ReplicationController
metadata:
name: kafka-controller2
spec:
replicas: 1
selector:
app: kafka2
template:
metadata:
labels:
app: kafka2
spec:
containers:
- name: kafka2
image: wurstmeister/kafka
ports:
- containerPort: 9092
env:
- name: KAFKA_ADVERTISED_PORT
value: "9092"
- name: KAFKA_ADVERTISED_HOST_NAME
value: kafka-service2
- name: KAFKA_ZOOKEEPER_CONNECT
value: zoo1:2181,zoo2:2181,zoo3:2181
Apache Kafka is frequently deployed on the Kubernetes container management system, which is used to automate deployment, scaling, and operation of containers across clusters of hosts. Apache Kafka on Kubernetes goes hand-in-hand with cloud-native development, the next generation of application development.
Kafka runs as a cluster of brokers, and these brokers can be deployed across a Kubernetes system and made to land on different workers across separate fault domains. Kubernetes automatically recovers pods when nodes or containers fail, so it can do this for your brokers too.
For a Kafka cluster to be highly available, you need to make certain both the data plane and control plane (whichever kind is being used) are highly available.
Kafka register to zookeeper with its service's name. And consuming/producing messages need access to the service names(here is dns records on zookeeper-1, zookeeper-2, zookeeper-3), which are only accessible through kubernetes' dns. So only application running on kubernetes can access my kafka. Therefore I cannot use the external IP of kafka-service or port-forwarding the kafka pod to localhost and then access it.
But why can I create, list and describe topics outside kubernetes cluster? I guess it is because zookeepers can do those operation by themselves. But consuming/producing message will need access to the ADVERTISED_HOST_NAME provided by kafka.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With