Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Streams - Send on different topics depending on Streams Data

I have a kafka streams application waiting for records to be published on topic user_activity. It will receive json data and depending on the value of against a key I want to push that stream into different topics.

This is my streams App code:

KStream<String, String> source_user_activity = builder.stream("user_activity");
        source_user_activity.flatMapValues(new ValueMapper<String, Iterable<String>>() {
            @Override
            public Iterable<String> apply(String value) {
                System.out.println("value: " +  value);
                ArrayList<String> keywords = new ArrayList<String>();
                try {
                    JSONObject send = new JSONObject();
                    JSONObject received = new JSONObject(value);

                    send.put("current_date", getCurrentDate().toString());
                    send.put("activity_time", received.get("CreationTime"));
                    send.put("user_id", received.get("UserId"));
                    send.put("operation_type", received.get("Operation"));
                    send.put("app_name", received.get("Workload"));
                    keywords.add(send.toString());
                    // apply regex to value and for each match add it to keywords

                } catch (Exception e) {
                    // TODO: handle exception
                    System.err.println("Unable to convert to json");
                    e.printStackTrace();
                }

                return keywords;
            }
        }).to("user_activity_by_date");

In this code, I want to check operation type and then depending on that I want to push the streams into the relevant topic.

How can I achieve this?

EDIT:

I have updated my code to this:

final StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> source_o365_user_activity = builder.stream("o365_user_activity");
KStream<String, String>[] branches = source_o365_user_activity.branch( 
      (key, value) -> (value.contains("Operation\":\"SharingSet") && value.contains("ItemType\":\"File")),
      (key, value) -> (value.contains("Operation\":\"AddedToSecureLink") && value.contains("ItemType\":\"File")),
      (key, value) -> true
     );

branches[0].to("o365_sharing_set_by_date");
branches[1].to("o365_added_to_secure_link_by_date");
branches[2].to("o365_user_activity_by_date");
like image 423
el323 Avatar asked Feb 23 '18 14:02

el323


People also ask

Which Kafka API is used to transform streams of data from input topics to output topics?

Kafka stream processing is often done using Apache Spark or Apache Storm. Kafka version 1.1. 0 (in HDInsight 3.5 and 3.6) introduced the Kafka Streams API. This API allows you to transform data streams between input and output topics.

Is Kafka streams distributed?

In Kafka Stream , stream tasks will be distributed between instances in multi-instance(and hence partitions will be distributed).

How do you aggregate data in a Kafka topic?

Introducing the aggregation in Kafka and explained this in easy way to implement the Aggregation on real time streaming. In order to aggregate the stream we need do two steps operations. Group the stream — groupBy(k,v) (if Key exist in stream) or groupByKey() — Data must partitioned by key.

How do I create a dynamic Kafka topic?

To create a Kafka topic programmatically introduce a configuration class that annotated with @Configuration : this annotation indicates that the Java class can be used by Spring as a source of bean definitions. Next to the name of the Kafka topic name you can specify: the number of partitions for the topic.


1 Answers

You can use branch method in order to split your stream. This method takes predicates for splitting the source stream into several streams.

The code below is taken from kafka-streams-examples:

KStream<String, OrderValue>[] forks = ordersWithTotals.branch(
    (id, orderValue) -> orderValue.getValue() >= FRAUD_LIMIT,
    (id, orderValue) -> orderValue.getValue() < FRAUD_LIMIT);

forks[0].mapValues(
    orderValue -> new OrderValidation(orderValue.getOrder().getId(), FRAUD_CHECK, FAIL))
    .to(ORDER_VALIDATIONS.name(), Produced
        .with(ORDER_VALIDATIONS.keySerde(), ORDER_VALIDATIONS.valueSerde()));

forks[1].mapValues(
    orderValue -> new OrderValidation(orderValue.getOrder().getId(), FRAUD_CHECK, PASS))
    .to(ORDER_VALIDATIONS.name(), Produced
  .with(ORDER_VALIDATIONS.keySerde(), ORDER_VALIDATIONS.valueSerde()));
like image 100
codejitsu Avatar answered Sep 25 '22 20:09

codejitsu