Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka in Kubernetes - Marking the coordinator dead for group

I am pretty new to Kubernetes and wanted to setup Kafka and zookeeper with it. I was able to setup Apache Kafka and Zookeeper in Kubernetes using StatefulSets. I followed this and this to build my manifest file. I made 1 replica of kafka and zookeeper each and also used persistent volumes. All pods are running and ready.

I tried to expose kafka and used Service for this by specifying a nodePort(30010). Seemingly this would expose kafka to the outside world where they can send messages to the kafka broker and also consume from it.

But in my Java application, I made a consumer and added the bootstrapServer as <ip-address>:30010, the following logs were displayed:

INFO o.a.k.c.c.i.AbstractCoordinator - Discovered coordinator kafka-0.kafka-hs.default.svc.cluster.local:9093 (id: 2147483647 rack: null) for group workerListener.
INFO o.a.k.c.c.i.AbstractCoordinator - Marking the coordinator kafka-0.kafka-hs.default.svc.cluster.local:9093 (id: 2147483647 rack: null) dead for group workerListener

Interestingly, when I tested the cluster using kubectl commands, I was able to produce and consume messages:

kubectl run -ti --image=gcr.io/google_containers/kubernetes-kafka:1.0-10.2.1 produce --restart=Never --rm \
 -- kafka-console-producer.sh --topic test --broker-list kafka-0.kafka-hs.default.svc.cluster.local:9093 done;

kubectl run -ti --image=gcr.io/google_containers/kubernetes-kafka:1.0-10.2.1 consume --restart=Never --rm -- kafka-console-consumer.sh --topic test --bootstrap-server kafka-0.kafka-hs.default.svc.cluster.local:9093

Can someone point me in the right direction why it is marking the coordinator as dead?

kafka.yml

apiVersion: v1
kind: Service
metadata:
  name: kafka-hs
  labels:
    app: kafka
spec:
  ports:
  - port: 9093
    name: server
  clusterIP: None
  selector:
    app: kafka
---
apiVersion: v1
kind: Service
metadata:
  name: kafka-cs
  labels:
    app: kafka
spec:
  type: NodePort
  ports:
  - port: 9093
    nodePort: 30010
    protocol: TCP
  selector:
    app: kafka
---
apiVersion: apps/v1beta1
kind: StatefulSet
metadata:
  name: kafka
spec:
  serviceName: kafka-hs
  replicas: 1
  podManagementPolicy: Parallel
  updateStrategy:
    type: RollingUpdate
  template:
    metadata:
      labels:
        app: kafka
    spec:
      affinity:
        podAntiAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            - labelSelector:
                matchExpressions:
                  - key: "app"
                    operator: In
                    values:
                    - kafka
              topologyKey: "kubernetes.io/hostname"
        podAffinity:
          preferredDuringSchedulingIgnoredDuringExecution:
             - weight: 1
               podAffinityTerm:
                 labelSelector:
                    matchExpressions:
                      - key: "app"
                        operator: In
                        values:
                        - zk
                 topologyKey: "kubernetes.io/hostname"
      terminationGracePeriodSeconds: 300
      containers:
      - name: k8skafka
        imagePullPolicy: Always
        image: gcr.io/google_containers/kubernetes-kafka:1.0-10.2.1
        resources:
          requests:
            memory: "1Gi"
            cpu: "0.5"
        ports:
        - containerPort: 9093
          name: server
        command:
        - sh
        - -c
        - "exec kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=${HOSTNAME##*-} \
          --override listeners=PLAINTEXT://:9093 \
          --override zookeeper.connect=zk-cs.default.svc.cluster.local:2181 \
          --override log.dir=/var/lib/kafka \
          --override auto.create.topics.enable=true \
          --override auto.leader.rebalance.enable=true \
          --override background.threads=10 \
          --override compression.type=producer \
          --override delete.topic.enable=false \
          --override leader.imbalance.check.interval.seconds=300 \
          --override leader.imbalance.per.broker.percentage=10 \
          --override log.flush.interval.messages=9223372036854775807 \
          --override log.flush.offset.checkpoint.interval.ms=60000 \
          --override log.flush.scheduler.interval.ms=9223372036854775807 \
          --override log.retention.bytes=-1 \
          --override log.retention.hours=168 \
          --override log.roll.hours=168 \
          --override log.roll.jitter.hours=0 \
          --override log.segment.bytes=1073741824 \
          --override log.segment.delete.delay.ms=60000 \
          --override message.max.bytes=1000012 \
          --override min.insync.replicas=1 \
          --override num.io.threads=8 \
          --override num.network.threads=3 \
          --override num.recovery.threads.per.data.dir=1 \
          --override num.replica.fetchers=1 \
          --override offset.metadata.max.bytes=4096 \
          --override offsets.commit.required.acks=-1 \
          --override offsets.commit.timeout.ms=5000 \
          --override offsets.load.buffer.size=5242880 \
          --override offsets.retention.check.interval.ms=600000 \
          --override offsets.retention.minutes=1440 \
          --override offsets.topic.compression.codec=0 \
          --override offsets.topic.num.partitions=50 \
          --override offsets.topic.replication.factor=3 \
          --override offsets.topic.segment.bytes=104857600 \
          --override queued.max.requests=500 \
          --override quota.consumer.default=9223372036854775807 \
          --override quota.producer.default=9223372036854775807 \
          --override replica.fetch.min.bytes=1 \
          --override replica.fetch.wait.max.ms=500 \
          --override replica.high.watermark.checkpoint.interval.ms=5000 \
          --override replica.lag.time.max.ms=10000 \
          --override replica.socket.receive.buffer.bytes=65536 \
          --override replica.socket.timeout.ms=30000 \
          --override request.timeout.ms=30000 \
          --override socket.receive.buffer.bytes=102400 \
          --override socket.request.max.bytes=104857600 \
          --override socket.send.buffer.bytes=102400 \
          --override unclean.leader.election.enable=true \
          --override zookeeper.session.timeout.ms=6000 \
          --override zookeeper.set.acl=false \
          --override broker.id.generation.enable=true \
          --override connections.max.idle.ms=600000 \
          --override controlled.shutdown.enable=true \
          --override controlled.shutdown.max.retries=3 \
          --override controlled.shutdown.retry.backoff.ms=5000 \
          --override controller.socket.timeout.ms=30000 \
          --override default.replication.factor=1 \
          --override fetch.purgatory.purge.interval.requests=1000 \
          --override group.max.session.timeout.ms=300000 \
          --override group.min.session.timeout.ms=6000 \
          --override inter.broker.protocol.version=0.10.2-IV0 \
          --override log.cleaner.backoff.ms=15000 \
          --override log.cleaner.dedupe.buffer.size=134217728 \
          --override log.cleaner.delete.retention.ms=86400000 \
          --override log.cleaner.enable=true \
          --override log.cleaner.io.buffer.load.factor=0.9 \
          --override log.cleaner.io.buffer.size=524288 \
          --override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \
          --override log.cleaner.min.cleanable.ratio=0.5 \
          --override log.cleaner.min.compaction.lag.ms=0 \
          --override log.cleaner.threads=1 \
          --override log.cleanup.policy=delete \
          --override log.index.interval.bytes=4096 \
          --override log.index.size.max.bytes=10485760 \
          --override log.message.timestamp.difference.max.ms=9223372036854775807 \
          --override log.message.timestamp.type=CreateTime \
          --override log.preallocate=false \
          --override log.retention.check.interval.ms=300000 \
          --override max.connections.per.ip=2147483647 \
          --override num.partitions=1 \
          --override producer.purgatory.purge.interval.requests=1000 \
          --override replica.fetch.backoff.ms=1000 \
          --override replica.fetch.max.bytes=1048576 \
          --override replica.fetch.response.max.bytes=10485760 \
          --override reserved.broker.max.id=1000 "
        env:
        - name: KAFKA_HEAP_OPTS
          value : "-Xmx512M -Xms512M"
        - name: KAFKA_OPTS
          value: "-Dlogging.level=INFO"
        volumeMounts:
        - name: kafka-pv-volume
          mountPath: /var/lib/kafka
        readinessProbe:
          exec:
           command:
            - sh
            - -c
            - "/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server=localhost:9093"
      securityContext:
        runAsUser: 0
        fsGroup: 1000
  volumeClaimTemplates:
  - metadata:
      name: kafka-pv-volume
    spec:
      storageClassName: manual
      accessModes: [ "ReadWriteOnce" ]
      resources:
        requests:
          storage: 1Gi

zookeeper.yml

apiVersion: v1
kind: Service
metadata:
  name: zk-hs
  labels:
    app: zk
spec:
  ports:
  - port: 2888
    name: server
  - port: 3888
    name: leader-election
  clusterIP: None
  selector:
    app: zk
---
apiVersion: v1
kind: Service
metadata:
  name: zk-cs
  labels:
    app: zk
spec:
  ports:
  - port: 2181
    name: client
  selector:
    app: zk
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: zk
spec:
  selector:
    matchLabels:
      app: zk
  serviceName: zk-hs
  replicas: 1
  updateStrategy:
    type: RollingUpdate
  podManagementPolicy: Parallel
  template:
    metadata:
      labels:
        app: zk
    spec:
      affinity:
        podAntiAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            - labelSelector:
                matchExpressions:
                  - key: "app"
                    operator: In
                    values:
                    - zk
              topologyKey: "kubernetes.io/hostname"
      containers:
      - name: kubernetes-zookeeper
        imagePullPolicy: Always
        image: "k8s.gcr.io/kubernetes-zookeeper:1.0-3.4.10"
        resources:
          requests:
            memory: "1Gi"
            cpu: "0.5"
        ports:
        - containerPort: 2181
          name: client
        - containerPort: 2888
          name: server
        - containerPort: 3888
          name: leader-election
        command:
        - sh
        - -c
        - "start-zookeeper \
          --servers=1 \
          --data_dir=/var/lib/zookeeper/data \
          --data_log_dir=/var/lib/zookeeper/data/log \
          --conf_dir=/opt/zookeeper/conf \
          --client_port=2181 \
          --election_port=3888 \
          --server_port=2888 \
          --tick_time=2000 \
          --init_limit=10 \
          --sync_limit=5 \
          --heap=512M \
          --max_client_cnxns=60 \
          --snap_retain_count=3 \
          --purge_interval=12 \
          --max_session_timeout=40000 \
          --min_session_timeout=4000 \
          --log_level=INFO"
        readinessProbe:
          exec:
            command:
            - sh
            - -c
            - "zookeeper-ready 2181"
          initialDelaySeconds: 10
          timeoutSeconds: 5
        livenessProbe:
          exec:
            command:
            - sh
            - -c
            - "zookeeper-ready 2181"
          initialDelaySeconds: 10
          timeoutSeconds: 5
        volumeMounts:
        - name: pv-volume
          mountPath: /var/lib/zookeeper
      securityContext:
        runAsUser: 0
        fsGroup: 1000
  volumeClaimTemplates:
  - metadata:
      name: pv-volume
    spec:
      storageClassName: manual
      accessModes: [ "ReadWriteOnce" ]
      resources:
        requests:
          storage: 1Gi

EDIT:

I changed log level to TRACE. These are the logs I got

2018-01-11 18:56:24,617 TRACE o.a.k.c.NetworkClient - Completed receive from node -1, for key 3, received {brokers=[{node_id=0,host=kafka-0.kafka-hs.default.svc.cluster.local,port=9093,rack=null}],cluster_id=LwSLmJpTQf6tSKPsfvriIg,controller_id=0,topic_metadata=[{topic_error_code=0,topic=mdm.worker.request,is_internal=false,partition_metadata=[{partition_error_code=0,partition_id=0,leader=0,replicas=[0],isr=[0]}]}]}
2018-01-11 18:56:24,621 DEBUG o.a.k.c.Metadata - Updated cluster metadata version 2 to Cluster(id = LwSLmJpTQf6tSKPsfvriIg, nodes = [kafka-0.kafka-hs.default.svc.cluster.local:9093 (id: 0 rack: null)], partitions = [Partition(topic = mdm.worker.request, partition = 0, leader = 0, replicas = [0], isr = [0])])
2018-01-11 18:56:24,622 TRACE o.a.k.c.NetworkClient - Completed receive from node -1, for key 10, received {error_code=0,coordinator={node_id=0,host=kafka-0.kafka-hs.default.svc.cluster.local,port=9093}}
2018-01-11 18:56:24,624 DEBUG o.a.k.c.c.i.AbstractCoordinator - Received GroupCoordinator response ClientResponse(receivedTimeMs=1515678984622, latencyMs=798, disconnected=false, requestHeader={api_key=10,api_version=0,correlation_id=0,client_id=consumer-1}, responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=kafka-0.kafka-hs.default.svc.cluster.local:9093 (id: 0 rack: null))) for group workerListener
2018-01-11 18:56:24,625 INFO o.a.k.c.c.i.AbstractCoordinator - Discovered coordinator kafka-0.kafka-hs.default.svc.cluster.local:9093 (id: 2147483647 rack: null) for group workerListener.
2018-01-11 18:56:24,625 DEBUG o.a.k.c.NetworkClient - Initiating connection to node 2147483647 at kafka-0.kafka-hs.default.svc.cluster.local:9093.
2018-01-11 18:56:24,633 DEBUG o.a.k.c.NetworkClient - Error connecting to node 2147483647 at kafka-0.kafka-hs.default.svc.cluster.local:9093:
java.io.IOException: Can't resolve address: kafka-0.kafka-hs.default.svc.cluster.local:9093
    at org.apache.kafka.common.network.Selector.connect(Selector.java:195)
    at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:762)
    at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:224)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.tryConnect(ConsumerNetworkClient.java:462)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$GroupCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:598)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$GroupCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:579)
    at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:488)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:348)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:184)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:214)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:200)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:614)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.nio.channels.UnresolvedAddressException: null
    at sun.nio.ch.Net.checkAddress(Net.java:101)
    at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622)
    at org.apache.kafka.common.network.Selector.connect(Selector.java:192)
    ... 22 common frames omitted
2018-01-11 18:56:24,634 INFO o.a.k.c.c.i.AbstractCoordinator - Marking the coordinator kafka-0.kafka-hs.default.svc.cluster.local:9093 (id: 2147483647 rack: null) dead for group workerListener
2018-01-11 18:56:24,735 TRACE o.a.k.c.NetworkClient - Found least loaded node kafka-0.kafka-hs.default.svc.cluster.local:9093 (id: 0 rack: null)
2018-01-11 18:56:24,735 DEBUG o.a.k.c.c.i.AbstractCoordinator - Sending GroupCoordinator request for group workerListener to broker kafka-0.kafka-hs.default.svc.cluster.local:9093 (id: 0 rack: null)
2018-01-11 18:56:24,735 DEBUG o.a.k.c.NetworkClient - Initiating connection to node 0 at kafka-0.kafka-hs.default.svc.cluster.local:9093.
2018-01-11 18:56:24,736 DEBUG o.a.k.c.NetworkClient - Error connecting to node 0 at kafka-0.kafka-hs.default.svc.cluster.local:9093:
java.io.IOException: Can't resolve address: kafka-0.kafka-hs.default.svc.cluster.local:9093
    at org.apache.kafka.common.network.Selector.connect(Selector.java:195)
    at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:762)
    at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:224)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.trySend(ConsumerNetworkClient.java:408)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:223)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:184)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:214)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:200)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:614)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.nio.channels.UnresolvedAddressException: null
    at sun.nio.ch.Net.checkAddress(Net.java:101)
    at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622)
    at org.apache.kafka.common.network.Selector.connect(Selector.java:192)
    ... 15 common frames omitted
2018-01-11 18:56:24,737 TRACE o.a.k.c.NetworkClient - Removing node kafka-0.kafka-hs.default.svc.cluster.local:9093 (id: 0 rack: null) from least loaded node selection: is-blacked-out: true, in-flight-requests: 0
2018-01-11 18:56:24,737 TRACE o.a.k.c.NetworkClient - Least loaded node selection failed to find an available node
2018-01-11 18:56:24,738 DEBUG o.a.k.c.NetworkClient - Give up sending metadata request since no node is available
like image 561
Sibtain Avatar asked Jan 10 '18 17:01

Sibtain


People also ask

What is the role of group coordinator in Kafka?

The coordinator is responsible for managing a list of group members. So, every time a new member joins the group, or an existing member leaves the group, the coordinator modifies the list. On an event of membership change, the coordinator realizes that it is time to rebalance the partition assignment.

What happens when Kafka leader fails?

Once a leader broker shuts down, the "leader" status goes over to another broker which is in sync, that means a broker that has the current state replicated and is not behind.

What happens when a Kafka consumer dies?

Every consumer has its set of partitions assigned exclusively to it and rebalancing is all about maintaining all partitions assigned to active consumers. When one consumer dies Kafka needs to reassign orphaned partitions to the rest of the consumers.

What happens when Kafka cluster goes down?

If Kafka is unavailable to send messages to, then no external activity has taken place. For these systems, a Kafka outage might mean that you do not accept new transactions. In such a case, it may be reasonable to return an error message and allow the external third party to retry later.


1 Answers

I had the same problem as you last week and solved it, so it's possible to expose Kafka outside Kubernetes!

Solution: In your Kafka broker-config.yaml you should map cluster external IP to your local DNS

kafka-I.kafka-hs.default.svc.cluster.local:9093

How To:

add those to your server.properties file:

listener.security.protocol.map=INTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_PLAINTEXT:PLAINTEXT
inter.broker.listener.name=INTERNAL_PLAINTEXT

if you have an init which run before server.properties you should add those:

# add unique label to each pod
kubectl label pods ${HOSTNAME} kafka-set-component=${HOSTNAME}

EXTERNAL_LISTENER_IP=<YOUR_KUBERNETES_CLUSTER_EXTERNAL_IP>
EXTERNAL_LISTENER_PORT=$((30093 + ${HOSTNAME##*-}))

sed -i "s/#listeners=PLAINTEXT:\/\/:9092/listeners=INTERNAL_PLAINTEXT:\/\/0.0.0.0:9092,EXTERNAL_PLAINTEXT:\/\/0.0.0.0:9093/" /etc/kafka/server.properties
sed -i "s/#advertised.listeners=PLAINTEXT:\/\/your.host.name:9092/advertised.listeners=INTERNAL_PLAINTEXT:\/\/$HOSTNAME.broker.kafka.svc.cluster.local:9092,EXTERNAL_PLAINTEXT:\/\/$EXTERNAL_LISTENER_IP:$EXTERNAL_LISTENER_PORT/" /etc/kafka/server.properties

otherwise you should find a way to add replace configurations in your server.properties at runtime.

Notice that you must have those lines commented in your server.properties file

#listeners=PLAINTEXT://:9092
#advertised.listeners=PLAINTEXT://your.host.name:9092

Services: Create headless service to map local DNS and a service for each broker you have:

# A headless service to create DNS records
---
apiVersion: v1
kind: Service
metadata:
  name: broker
  namespace: kafka
spec:
  ports:
  - port: 9092
  # [podname].broker.kafka.svc.cluster.local
  clusterIP: None
  selector:
    app: kafka
---
apiVersion: v1
kind: Service
metadata:
  name: broker-0
  namespace: kafka
spec:
  type: NodePort
  ports:
  - port: 9093
    nodePort: 30093
  selector:
    kafka-set-component: kafka-0
---
apiVersion: v1
kind: Service
metadata:
  name: broker-1
  namespace: kafka
spec:
  type: NodePort
  ports:
  - port: 9093
    nodePort: 30094
  selector:
    kafka-set-component: kafka-1
---
apiVersion: v1
kind: Service
metadata:
  name: broker-2
  namespace: kafka
spec:
  type: NodePort
  ports:
  - port: 9093
    nodePort: 30095
  selector:
    kafka-set-component: kafka-2

Notes: - If you are running on GKE:

  1. YOUR_KUBERNETES_CLUSTER_EXTERNAL_IP which declared in the server.properties init can be found via gcloud compute instances list
  2. Also you must give permission to the firewall gcloud compute firewall-rules create kafka-external --allow tcp:30093,tcp:30094,tcp:30095
like image 190
Ami Hollander Avatar answered Oct 06 '22 01:10

Ami Hollander