I have a kafka streams application waiting for records to be published on topic user_activity
. It will receive json data and depending on the value of against a key I want to push that stream into different topics.
This is my streams App code:
KStream<String, String> source_user_activity = builder.stream("user_activity");
source_user_activity.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
System.out.println("value: " + value);
ArrayList<String> keywords = new ArrayList<String>();
try {
JSONObject send = new JSONObject();
JSONObject received = new JSONObject(value);
send.put("current_date", getCurrentDate().toString());
send.put("activity_time", received.get("CreationTime"));
send.put("user_id", received.get("UserId"));
send.put("operation_type", received.get("Operation"));
send.put("app_name", received.get("Workload"));
keywords.add(send.toString());
// apply regex to value and for each match add it to keywords
} catch (Exception e) {
// TODO: handle exception
System.err.println("Unable to convert to json");
e.printStackTrace();
}
return keywords;
}
}).to("user_activity_by_date");
In this code, I want to check operation type and then depending on that I want to push the streams into the relevant topic.
How can I achieve this?
EDIT:
I have updated my code to this:
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source_o365_user_activity = builder.stream("o365_user_activity");
KStream<String, String>[] branches = source_o365_user_activity.branch(
(key, value) -> (value.contains("Operation\":\"SharingSet") && value.contains("ItemType\":\"File")),
(key, value) -> (value.contains("Operation\":\"AddedToSecureLink") && value.contains("ItemType\":\"File")),
(key, value) -> true
);
branches[0].to("o365_sharing_set_by_date");
branches[1].to("o365_added_to_secure_link_by_date");
branches[2].to("o365_user_activity_by_date");
Kafka stream processing is often done using Apache Spark or Apache Storm. Kafka version 1.1. 0 (in HDInsight 3.5 and 3.6) introduced the Kafka Streams API. This API allows you to transform data streams between input and output topics.
In Kafka Stream , stream tasks will be distributed between instances in multi-instance(and hence partitions will be distributed).
Introducing the aggregation in Kafka and explained this in easy way to implement the Aggregation on real time streaming. In order to aggregate the stream we need do two steps operations. Group the stream — groupBy(k,v) (if Key exist in stream) or groupByKey() — Data must partitioned by key.
To create a Kafka topic programmatically introduce a configuration class that annotated with @Configuration : this annotation indicates that the Java class can be used by Spring as a source of bean definitions. Next to the name of the Kafka topic name you can specify: the number of partitions for the topic.
You can use branch
method in order to split your stream. This method takes predicates for splitting the source stream into several streams.
The code below is taken from kafka-streams-examples:
KStream<String, OrderValue>[] forks = ordersWithTotals.branch(
(id, orderValue) -> orderValue.getValue() >= FRAUD_LIMIT,
(id, orderValue) -> orderValue.getValue() < FRAUD_LIMIT);
forks[0].mapValues(
orderValue -> new OrderValidation(orderValue.getOrder().getId(), FRAUD_CHECK, FAIL))
.to(ORDER_VALIDATIONS.name(), Produced
.with(ORDER_VALIDATIONS.keySerde(), ORDER_VALIDATIONS.valueSerde()));
forks[1].mapValues(
orderValue -> new OrderValidation(orderValue.getOrder().getId(), FRAUD_CHECK, PASS))
.to(ORDER_VALIDATIONS.name(), Produced
.with(ORDER_VALIDATIONS.keySerde(), ORDER_VALIDATIONS.valueSerde()));
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With