With Kafka Stream I always initialize my store from referential compact topics by using this code:
builder.globalTable(kafkaTopic, Materialized.as("storeMerchant"));
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
I would like to filter the kafkaTopic
topic before making the store, in order to eliminate some unnecessary merchants.
Something like this:
GlobalKTable<String, MerchantAvro> merchant$ = builder.globalTable(kafkaTopic);
merchant$.filter((key, value) -> !Optional.ofNullable(value)
.map(MerchantAvro::getDeletionDate)
.isPresent());
...
But it's not possible to apply a filter
method on a GlobalKTable
.
How can I make this filtering?
You will need to filter the topic first and put the result into another topic. Then, you can consume the second topic as GlobalKTable
.
As an alternative, you might be able to use a "global store" instead of a GlobalKTable
. For this case, you can provide a custom Processor
that can implement a filter before you populate the global store. See Defining a Stream Processor.
Global Stores are also local. The difference is, that for "regular store" data is partitioned, ie, each store contains different data, while for global stores, each instance loads all data (ie, data is replicated). Thus, each member of the group has it's own copy of the global store data.
I made a "Streamer" that transforme person to customer like this : This is the topologie :
journal.info("Open topic {}...", kafkaTopic);
StreamsBuilder builder = new StreamsBuilder();
Topology topology = builder.build();
topology.addSource("person$", kafkaTopic)
.addProcessor("selection", PersonProcessor::new, "person$")
.addSink("customer$", customerTopic, "selection");
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
Runtime.getRuntime()
.addShutdownHook(new Thread(streams::close));
This is the Processor :
public class PersonProcessor extends AbstractProcessor<String, PersonAvro> {
Logger journal = LoggerFactory.getLogger(PersonProcessor.class);
@Override
public void process(String key, PersonAvro avroPerson) {
journal.debug("traitement objet: {}, {}", key, avroPerson.getActive());
Optional.ofNullable(avroPerson)
.filter(person -> Optional.ofNullable(person)
.map(PersonAvro::getActive)
.filter(activation -> !activation.matches("0"))
.isPresent())
.map(person -> CustomerAvro.newBuilder()
.setId(person.getId())
.setCompName(person.getCompName())
.setSiretCode(person.getSiretCode())
.setActive(person.getActive())
.setAdd3(person.getAdd3())
.setAdd4(person.getAdd4())
.setAdd5(person.getAdd5())
.setAdd6(person.getAdd6())
.setAdd7(person.getAdd7()))
.map(CustomerAvro.Builder::build)
.ifPresent(customer -> {
context().forward(key, customer);
context().commit();
});
}
}
And another streamer that load a local store form a GlobalKTable
@PostConstruct
private void init() throws InterruptedException {
configurer();
journal.info("Open topic {}...", kafkaTopic);
StreamsBuilder builder = new StreamsBuilder();
builder.globalTable(kafkaTopic, Materialized.as("customerStore"));
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
customerStore = waitUntilStoreIsQueryable("customerStore", streams);
Runtime.getRuntime()
.addShutdownHook(new Thread(streams::close));
}
And that is able to respond to synchrone requests :
public Optional<CustomerDto> getClient(int idCoclico) {
journal.debug(Markers.append("idCoclico", idCoclico), "Recherche d'un client COCLICO");
// Recherche du client dans le cache
Optional<CustomerDto> optClient = Optional.ofNullable(idCoclico)
.map(String::valueOf)
.map(customerStore::get)
.map(avroCustomer -> {
journal.debug(Markers.append("idCoclico", idCoclico),
"Le client existe dans le store local et n'est pas inactif");
CustomerDto client = new CustomerDto(avroCustomer.getId());
client.setCompName(avroCustomer.getCompName());
client.setSiretCode(avroCustomer.getSiretCode());
client.setAdd3(avroCustomer.getAdd3());
client.setAdd4(avroCustomer.getAdd4());
client.setAdd5(avroCustomer.getAdd5());
client.setAdd6(avroCustomer.getAdd6());
client.setAdd7(avroCustomer.getAdd7());
Optional<String> optAdd = Optional.ofNullable(avroCustomer.getAdd7())
.map(String::trim)
.filter(add -> !add.isEmpty());
// Si l'adresse est renseignée dans COCLICO
if (optAdd.isPresent())
client.setCountryCode(avroCustomer.getCountryCode());
// Les adresses Françaises ne sont pas renseignée
else
client.setCountryCode(fr.laposte.bscc.encaissement.Constantes.CODE_PAYS_FRANCE);
return client;
});
if (!optClient.isPresent())
journal.info(Markers.append("idCoclico", idCoclico), "Le client n'existe pas dans le store local");
return optClient;
}
The first tests or ok. I'll try to deploy this on on build environnements...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With