What I am doing currently is as below:
val topic = "mytopic"
val zkhosts = "localhost"
val zkports = "2181"
Setting it in my code and then sending it to kafkastream function works, but I want to read it from .properties
file. Is there any possible solution?
The Kafka configuration files are located at the /opt/bitnami/kafka/config/ directory.
With acks = -1, the producer waits for the ack. This ack is sent by the broker as above but only after having the messages replicated to all the replica followers on the other brokers.
connect property and change it as per your needs. The Kafka broker will connect to this ZooKeeper instance. Go to the Kafka home directory and execute the command ./bin/kafka-server-start.sh config/server. properties .
Apache Kafka is bundled with Log Analysis. The sample configuration files for Apache Kafka are in the <HOME>/IBM/LogAnalysis/kafka/test-configs/kafka-configs directory. Create one partition per topic for every two physical processors on the server where the broker is installed.
Given this property file at /tmp/sample.properties
kafka.topic = "mytopic"
kafka.zkhost = "localhost"
kafka.zkports = 2191
We could use the plain-old java Property
API to load the properties:
import java.io.FileReader
val configFile = new java.io.File("/tmp/sample.properties")
val reader = new FileReader(configFile)
val props = new Properties()
props.load(reader)
reader.close()
You could also use your favorite config library to load the property file as you would on any other program.
For example, you could use the popular typesafe config lib. There're many wrappers for Scala, but in its raw form you could do something like:
import com.typesafe.config.ConfigFactory
val configFile = new java.io.File("/tmp/sample.properties")
val kafkaConfig = ConfigFactory.parseFile(configFile)
import java.util.Properties
val kafkaProperties = new Properties()
kafkaProperties.put("zookeeper.hosts", kafkaConfig.getString("kafka.zkhost"))
kafkaProperties.put("zookeeper.port", kafkaConfig.getInt("kafka.zkports"):java.lang.Integer)
kafkaProperties.put("kafka.topic", kafkaConfig.getString("kafka.topic"))
(There're many ways to make that nice and compact. Here I'm using the most common form)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With