I have a usecase:
I need to read and aggregate messages from a kafka topic at regular intervals and publish to a different topic. Localstorage is not an option. This is how I am planning to address this, any suggestions to improve are welcome
To schedule the aggregation and publishing of kafka messages, planning to use completionInterval option of Aggregator EIP. Here is the code.
@Autowired ObjectMapper objectMapper;
JacksonDataFormat jacksonDataFormat;
@PostConstruct
public void initialize(){
//objectMapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE);
jacksonDataFormat = new JacksonDataFormat(objectMapper,EventMessage.class);
}
and the route:
public void configure() throws Exception {
from("kafka:localhost:9092?topic=item-events" +
"&groupId=aggregator-group-id&autoCommitIntervalMs=25000&autoOffsetReset=earliest&consumersCount=1")
.routeId("kafkapoller")
.unmarshal(jacksonDataFormat)
.aggregate(body().method("getItemId"), new EventAggregationStrategy()).completionInterval(20000)
.marshal().json(JsonLibrary.Jackson)
.to("kafka:localhost:9092?topic=item-events-aggregated&serializerClass=org.apache.kafka.common.serialization.ByteArraySerializer");
}
Kafka for event streaming and Camel for ETLCamel is used in a business domain for application integration. Kafka is the central nervous system between the Camel integration application and many other applications.
It is usually used for Request Reply messaging, for instance to invoke an external web service. Poll Enrich EIP - Uses a Polling Consumer to obtain the additional data. It is usually used for Event Message messaging, for instance to read a file or download a FTP file.
The ConsumerTemplate interface allows you to receive message exchanges from endpoints in a variety of different ways to make it easy to work with Camel Endpoint instances from Java code.
This looks ok. Things to keep in mind:
PersistentAggregationRepository
to store/replay messages, although you can replay the messages you lost from kafka (this would be my biggest operational concern) AggregateController
to enable you to externally force completion of the exchange, so you can do things like issue a shutdown to camel and then call this to complete the inflight exchange If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With