Using kubernetes-kafka as a starting point with minikube.
This uses a StatefulSet and a headless service for service discovery within the cluster.
The goal is to expose the individual Kafka Brokers externally which are internally addressed as:
kafka-0.broker.kafka.svc.cluster.local:9092 kafka-1.broker.kafka.svc.cluster.local:9092 kafka-2.broker.kafka.svc.cluster.local:9092
The constraint is that this external service be able to address the brokers specifically.
Whats the right (or one possible) way of going about this? Is it possible to expose a external service per kafka-x.broker.kafka.svc.cluster.local:9092
?
A StatefulSet can use a Headless Service to control the domain of its Pods. The domain managed by this Service takes the form: $(service name).
You can create individual Services for each instance that use that label as their selector to expose the individual instances of the StatefulSet. To remove a potential extra hop, create the Services with the attribute externalTrafficPolicy and set it to Local .
From the Service type drop-down list, select Cluster IP. Click Expose. When your Service is ready, the Service details page opens, and you can see details about your Service. Under Cluster IP, make a note of the IP address that Kubernetes assigned to your Service.
We have solved this in 1.7 by changing the headless service to Type=NodePort
and setting the externalTrafficPolicy=Local
. This bypasses the internal load balancing of a Service and traffic destined to a specific node on that node port will only work if a Kafka pod is on that node.
apiVersion: v1 kind: Service metadata: name: broker spec: externalTrafficPolicy: Local ports: - nodePort: 30000 port: 30000 protocol: TCP targetPort: 9092 selector: app: broker type: NodePort
For example, we have two nodes nodeA and nodeB, nodeB is running a kafka pod. nodeA:30000 will not connect but nodeB:30000 will connect to the kafka pod running on nodeB.
https://kubernetes.io/docs/tutorials/services/source-ip/#source-ip-for-services-with-typenodeport
Note this was also available in 1.5 and 1.6 as a beta annotation, more can be found here on feature availability: https://kubernetes.io/docs/tasks/access-application-cluster/create-external-load-balancer/#preserving-the-client-source-ip
Note also that while this ties a kafka pod to a specific external network identity, it does not guarantee that your storage volume will be tied to that network identity. If you are using the VolumeClaimTemplates in a StatefulSet then your volumes are tied to the pod while kafka expects the volume to be tied to the network identity.
For example, if the kafka-0 pod restarts and kafka-0 comes up on nodeC instead of nodeA, kafka-0's pvc (if using VolumeClaimTemplates) has data that it is for nodeA and the broker running on kafka-0 starts rejecting requests thinking that it is nodeA not nodeC.
To fix this, we are looking forward to Local Persistent Volumes but right now we have a single PVC for our kafka StatefulSet and data is stored under $NODENAME
on that PVC to tie volume data to a particular node.
https://github.com/kubernetes/features/issues/121 https://kubernetes.io/docs/concepts/storage/volumes/#local
Solutions so far weren't quite satisfying enough for myself, so I'm going to post an answer of my own. My goals:
Starting with Yolean/kubernetes-kafka, the only thing missing is exposing the service externally and two challenges in doing so.
Per pod labels and external services:
To generate labels per pod, this issue was really helpful. Using it as a guide, we add the following line to the 10broker-config.yml init.sh
property with:
kubectl label pods ${HOSTNAME} kafka-set-component=${HOSTNAME}
We keep the existing headless service, but we also generate an external Service per pod using the label (I added them to 20dns.yml):
apiVersion: v1 kind: Service metadata: name: broker-0 namespace: kafka spec: type: NodePort ports: - port: 9093 nodePort: 30093 selector: kafka-set-component: kafka-0
Configure Kafka with internal/external listeners
I found this issue incredibly useful in trying to understand how to configure Kafka.
This again requires updating the init.sh
and server.properties
properties in 10broker-config.yml with the following:
Add the following to the server.properties
to update the security protocols (currently using PLAINTEXT
):
listener.security.protocol.map=INTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_PLAINTEXT:PLAINTEXT inter.broker.listener.name=INTERNAL_PLAINTEXT
Dynamically determine the external IP and for external port for each Pod in the init.sh
:
EXTERNAL_LISTENER_IP=<your external addressable cluster ip> EXTERNAL_LISTENER_PORT=$((30093 + ${HOSTNAME##*-}))
Then configure listeners
and advertised.listeners
IPs for EXTERNAL_LISTENER
and INTERNAL_LISTENER
(also in the init.sh
property):
sed -i "s/#listeners=PLAINTEXT:\/\/:9092/listeners=INTERNAL_PLAINTEXT:\/\/0.0.0.0:9092,EXTERNAL_PLAINTEXT:\/\/0.0.0.0:9093/" /etc/kafka/server.properties sed -i "s/#advertised.listeners=PLAINTEXT:\/\/your.host.name:9092/advertised.listeners=INTERNAL_PLAINTEXT:\/\/$HOSTNAME.broker.kafka.svc.cluster.local:9092,EXTERNAL_PLAINTEXT:\/\/$EXTERNAL_LISTENER_IP:$EXTERNAL_LISTENER_PORT/" /etc/kafka/server.properties
Obviously, this is not a full solution for production (for example addressing security for the externally exposed brokers) and I'm still refining my understanding of how to also let internal producer/consumers to also communicate with the brokers.
However, so far this is the best approach for my understanding of Kubernetes and Kafka.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With