Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Camel Kafka - aggregate kafka messages and publish to a different topic at regular intervals

I have a usecase:

I need to read and aggregate messages from a kafka topic at regular intervals and publish to a different topic. Localstorage is not an option. This is how I am planning to address this, any suggestions to improve are welcome

To schedule the aggregation and publishing of kafka messages, planning to use completionInterval option of Aggregator EIP. Here is the code.

  @Autowired ObjectMapper objectMapper;
  JacksonDataFormat jacksonDataFormat;

  @PostConstruct
  public void initialize(){
    //objectMapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE);
    jacksonDataFormat = new JacksonDataFormat(objectMapper,EventMessage.class);
  }

and the route:

public void configure() throws Exception {
    from("kafka:localhost:9092?topic=item-events" +
            "&groupId=aggregator-group-id&autoCommitIntervalMs=25000&autoOffsetReset=earliest&consumersCount=1")
            .routeId("kafkapoller")
            .unmarshal(jacksonDataFormat)
            .aggregate(body().method("getItemId"), new EventAggregationStrategy()).completionInterval(20000)
            .marshal().json(JsonLibrary.Jackson)
            .to("kafka:localhost:9092?topic=item-events-aggregated&serializerClass=org.apache.kafka.common.serialization.ByteArraySerializer");
  }
like image 744
so-random-dude Avatar asked Apr 28 '17 06:04

so-random-dude


People also ask

What's the difference between Apache Camel and Kafka?

Kafka for event streaming and Camel for ETLCamel is used in a business domain for application integration. Kafka is the central nervous system between the Camel integration application and many other applications.

What is enrich in camel?

It is usually used for Request Reply messaging, for instance to invoke an external web service. Poll Enrich EIP - Uses a Polling Consumer to obtain the additional data. It is usually used for Event Message messaging, for instance to read a file or download a FTP file.

What is Consumer template in Apache Camel?

The ConsumerTemplate interface allows you to receive message exchanges from endpoints in a variety of different ways to make it easy to work with Camel Endpoint instances from Java code.


1 Answers

This looks ok. Things to keep in mind:

  • What happens if/when the JVM dies half way through an aggregation cycle? Dont care, then cool otherwise you may want to investigate the PersistentAggregationRepository to store/replay messages, although you can replay the messages you lost from kafka (this would be my biggest operational concern)
  • Following on, think about runtime control. Camel is a shocker for not really telling you very clearly what's going at runtime. Things like runaway methods in your Aggregator (ie a very greedy regex) will leave you with little idea about the current state of the aggregated exchanges and JMX probably wont tell you too much about what's happening.
  • I would use a AggregateController to enable you to externally force completion of the exchange, so you can do things like issue a shutdown to camel and then call this to complete the inflight exchange
like image 153
stringy05 Avatar answered Sep 21 '22 13:09

stringy05