I'm evaluating AWS Managed Service Kafka (MSK) and I know that currently, it is in preview mode, hence might not have all features or proper documentation. I tried setting up the msk cluster and was validating whether msk could fulfill all the use case/requirement of our company, but currently, it lacks documentation and example.
https://docs.aws.amazon.com/msk/latest/developerguide/what-is-msk.html
I have the following queries:
i) How can I access AWS MSK with the Kafka clients running on my on-premise system?
ii) Does MSK support schema evolution and exactly once semantics?
iii) Will MSK provide some way to update some cluster or tuning configuration? Like aws glue provides parameter change for spark executr and driver memory in their managed environment.
iv) Is it possible to integrate MSK with other AWS service (e.g Redshift,EMR,etc)?
v) Can I use streaming sql with MSK through ksql? How can I set up KSQL with MSK?
vi) How can I perform real-time predictive analysis of data flowing through MSK?
vii) Also how reliable is MSK compared to other cloud-based kafka cluster from Azure/confluent and any performance benchmark compared to vanilla kafka? And what is the maximum number of brokers that can be lauched in cluster?
Amazon Managed Streaming for Apache Kafka (Amazon MSK) is a fully managed service that enables you to build and run applications that use Apache Kafka to process streaming data. Amazon MSK provides the control-plane operations, such as those for creating, updating, and deleting clusters.
Yes, Amazon MSK uses Apache ZooKeeper and manages Apache ZooKeeper within each cluster as a part of the Amazon MSK service. Apache ZooKeeper nodes are included with each cluster at no additional cost.
MSK Connect is fully compatible with Kafka Connect, which means you can migrate your existing connectors without code changes. You don't need an MSK cluster to use MSK Connect.
SEATTLE --(BUSINESS WIRE)--May 30, 2019-- Today, Amazon Web Services Inc. , an Amazon.com company (NASDAQ: AMZN), announced the general availability of Amazon MSK, a fully managed service for Apache Kafka that makes it easy for developers to build and run highly available, secure, and scalable applications based on ...
MSK is basically the vanilla apache kafka cluster customized and managed by aws (with predefined configuration settings based on cluster instance type, number of brokers,etc) tuned for the cloud environment.
Ideally, it should be able to perform all/most things that open source Kafka supports. Also if you have specific use case or requirement that is not documented, I will suggest you contact AWS support for further clarification regarding the managed part of kafka cluster (maximum number of broker allowed, reliability, cost).
I will try to answer your questions based on my personal experience:
i) How can I access AWS MSK with the kafka clients running on my on premise system?
You cannot access MSK directly from on-premise or local machine using kafka client or kafka stream. Because the broker url, zookeeper connection string are private ip's of the msk cluster vpc/subnet. To access through kafka client, you need to launch ec2 instance in the same vpc of MsK and execute kafka client(producer/consumer) to acess msk cluster.
To access the MSK cluster from local machine or on-premise systems, you can set up kafka Rest Proxy framework open-sourced by Confluent to acess the MSK cluster from the outside world via rest api. This framework is not full-fledged kafka client and doesn't allow all operation of kafka client, but you can do most operation on the cluster starting from fetching metadata of cluster, topic information, producing and consuming the message,etc.
First set up the confluent repo and ec2 instance security group (Refer - Section-1: Pre Install or set up- additional kafka components) and then install/setup kafka rest proxy.
sudo yum install confluent-kafka-rest
Create file name kafka-rest.properties and add the following content-
bootstrap.servers=PLAINTEXT://10.0.10.106:9092,PLAINTEXT://10.0.20.27:9092,PLAINTEXT://10.0.0.119:9092
zookeeper.connect=10.0.10.83:2181,10.0.20.22:2181,10.0.0.218:2181
schema.registry.url=http://localhost:8081
** modify the bootstrapserver and zookeeper url/ips.
Start rest server
kafka-rest-start kafka-rest.properties &
Access the MSK via rest API with curl or rest client/browser.
Get a list of topics
curl "http://localhost:8082/topics"
curl "http://<ec2 instance public ip>:8082/topics"
In order to the access from on-premise or local machine, makesure the ec2 instance on which the rest server is running has public ip or elastic ip attached.
More Rest API operation https://github.com/confluentinc/kafka-rest
ii) Does MSK support schema evolution and exactly once semantics?
You can use avro message along with 'Schema Registry' to achieve schema evolution and schema maintenance.
Installing and setting up schema registry is similar to confluent kafka-rest proxy.
sudo yum install confluent-schema-registry
Create file name schema-registry.propertie and add the following content-
listeners=http://0.0.0.0:8081
kafkastore.connection.url=10.0.10.83:2181,10.0.20.22:2181,10.0.0.218:2181
kafkastore.bootstrap.servers=PLAINTEXT://10.0.10.106:9092,PLAINTEXT://10.0.20.27:9092,PLAINTEXT://10.0.0.119:9092
kafkastore.topic=_schemas
debug=false
** modify the bootstrapserver and zookeeper(connection) url/ips.
Start the schema registry service
schema-registry-start schema-registry.properties &
Refer for more information: https://github.com/confluentinc/schema-registry
https://docs.confluent.io/current/schema-registry/docs/schema_registry_tutorial.html
Exactly once semantics is apache kafka's feature and although I haven't tested it on msk, I believe it should support this feature since it's part of open-source apache kafka only.
iii) Will MSK provide some way to update some cluster or tuning configuration? Like aws glue provides parameter change for spark executor and driver memory in their managed environment.
Yes, it is possible to change the configuration parameter at runtime. I have tested by changing the retention.ms parameter using kafka config tool and the change was applied on the topic immediately. So I think you can update other parameters as well, but MSK might not allow all config changes, just like AWS glue allow only few spark config parameter changes, because allowing all parameter to be changed by the user might be vulnerable to the managed environment.
Change through kafka config tool
kafka-configs.sh --zookeeper 10.0.10.83:2181,10.0.20.22:2181,10.0.0.218:2181 --entity-type topics --entity-name jsontest --alter --add-config retention.ms=128000
Verified change using rest
curl "http://localhost:8082/topics/jsontest"
Now Amazon MSK enables you to create a custom MSK configuration.
Please refer to the doc below for config/parameters that can be updated:
https://docs.aws.amazon.com/msk/latest/developerguide/msk-configuration-properties.html
Also the default configuration of MSK Kafka:
https://docs.aws.amazon.com/msk/latest/developerguide/msk-default-configuration.html
iv) Is it possible to integrate MSK with other AWS service (e.g Redshift,EMR,etc)?
Yes, you can connect/integrate to other aws service with MSK. For example you can run Kafka client (consumer) to read data from kafka and write to redshift, rds,s3 or dynamodb. Make sure the kafka client is running on the ec2 instance (inside the msk vpc) that has proper iam role to access those service and the ec2 instance is in public subnet or private subnet ( having NAT or vpc endpoint for s3).
Also you can launch EMR inside MSK clusters vpc/subnet and then through EMR(spark) you can connect to other service.
Spark structure streaming with AWS Managed Service Kafka
Launch EMR cluster in vpc of the MSK cluster Allow the EMR Master and Slave security group in the inbound rule of MSK clusters security group for port 9092
Start Spark shell
spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0
Connect to MSK cluster from spark structure streaming
val kafka = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "10.0.10.106:9092,10.0.20.27:9092,10.0.0.119:9092").option("subscribe", "jsontest") .load()
Start reading/printing the message on the console
val df=kafka.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").writeStream.format("console").start()
or
val df=kafka.selectExpr("CAST(value AS STRING)").writeStream.format("console").start()
v) Can I use streaming sql with MSK through ksql? How can I set up KSQL with MSK?
Yes you can set up KSQL with MSK cluster. Basically, you need to launch an ec2 instance in the same vpc/subnet of the MSK cluster. And then install the ksql server+ client in the ec2 instance and use it.
First set up confluent repo and ec2 instance security group (refer - Section-1: Pre Install or set up- additional kafka components) and then install/setup Ksql server/client.
After that install ksql server
sudo yum install confluent-ksql
Create file name ksql-server.properties and add the following content-
bootstrap.servers=10.0.10.106:9092,10.0.20.27:9092,10.0.0.119:9092
listeners=http://localhost:8088
** modify bootstrap server ips/url.
Start the ksql server
ksql-server-start ksql-server.properties &
After that start the ksql cli
ksql http://localhost:8088
And finally run the command to get list of topics
ksql> SHOW TOPICS;
Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
-----------------------------------------------------------------------------------------
_schemas | false | 1 | 3 | 0 | 0
jsontest | false | 1 | 3 | 1 | 1
----------------------------- --------------------------------------------------
Refer for more info- https://github.com/confluentinc/ksql
vi) How can I perform real time predictive analysis of data flowing through MSK?
Doing predictive analysis or real-time machine learning stuff is really not specific to MSK. The way you will do with kafka cluster(or any streaming pipeline), same is applicable to MSK. There are various ways to acheive as per your exact requirement, but I will describe the most common or widely used one across the industry:
Using Spark with MSK (kafka) and do analysis through structure streaming and MLIB ( having your predictive model).
You can train your predictive model in H20.ai framework, then export the model as java pojo. And then integrate the java pojo model with kafka consumer code that will process the message from msk(kafka) topic and do the real-time analysis.
You can train model and deploy in sagemaker, and then invoke from kafka client consumer code to get real-time prediction by invoking sagemaker model inference endpoint based on kafka data/message.
vii) Also how reliable is MSK compared to other cloud-based kafka cluster from Azure/confluent and any performance benchmark compared to vanilla kafka? And what is the maximum number of brokers that can be lauched in cluster?
MSK is in preview as you already know, so its little too early to say about its reliability. But in general, like all other AWS services, it should get more reliable with time along with new features and better documentation hopefully.
I dont think AWS or any cloud vendor azure,google cloud provides performance benchmark of their services, so you have to try out the performance testing from your side. And kafka clients/tools (kafka-producer-perf-test.sh, kafka-consumer-perf-test.sh) provide a performance benchmark script that could be executed to have performance idea of the cluster. Again performance testing of a service in real production scenario will vary a lot depending on various factor such as ( message size, volume of data coming to kafka, sync or async producer, how many consumer,etc) and the performance will come down to specific usecase rather than a generic benchmark.
Regarding the max number of broker supported in cluster, it is better to ask the AWS guys through their support system.
Section-1: Pre Install or set up- additional kafka components:
Launch Ec2 instance in the MSK cluster's vpc/subnet.
Login to the ec2 instance
Set up yum repo to download confluent kafka component packages through yum
sudo yum install curl which
sudo rpm --import https://packages.confluent.io/rpm/5.1/archive.key
Navigate to /etc/yum.repos.d/ and create a file named confluent.repo and add following contents
[Confluent.dist]
name=Confluent repository (dist)
baseurl=https://packages.confluent.io/rpm/5.1/7
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
enabled=1
[Confluent]
name=Confluent repository
baseurl=https://packages.confluent.io/rpm/5.1
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
enabled=1
Next clean yum repo
sudo yum clean all
Allow the security group of the ec2 instance in the MSK clusters security group's inbound rules for port 9092(connecting broker) and 2081(connecting zookeeper).
Section-2: Command to get the MSK clusters broker and zookeeper url/ip info
Zookeeper connection url port
aws kafka describe-cluster --region us-east-1 --cluster-arn <cluster arn>
Broker connection url port
aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn <cluster arn>
----------------------------------------------------------------------
Note:
MSK overview and components settings:
Kindly refer to the MSK high-level architecture and how to set up various components (rest, schema registry, sticky load balancer, etc). Also how it will connect with other aws services. It's just a simple reference architecture only.
Also instead of setting up rest, schema registry and ksql on the ec2 instance, you can also dockerize inside the container.
And if you are setting up multiple rest proxy, then you need to put that rest-proxy service behind a sticky load balancer like (nginx using ip hash) to make sure same client consumer map to same consumer group to avoid data fetching mismatch/inconsistencies across data reading.
Hope you find the above information useful !!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With