Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to filter out unnecessary records before materializing GlobalKTable?

With Kafka Stream I always initialize my store from referential compact topics by using this code:

builder.globalTable(kafkaTopic, Materialized.as("storeMerchant"));
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();

I would like to filter the kafkaTopic topic before making the store, in order to eliminate some unnecessary merchants.

Something like this:

GlobalKTable<String, MerchantAvro> merchant$ = builder.globalTable(kafkaTopic);
merchant$.filter((key, value) -> !Optional.ofNullable(value)
         .map(MerchantAvro::getDeletionDate)
         .isPresent());
...

But it's not possible to apply a filter method on a GlobalKTable.

How can I make this filtering?

like image 307
EGRA Avatar asked Oct 17 '22 17:10

EGRA


2 Answers

You will need to filter the topic first and put the result into another topic. Then, you can consume the second topic as GlobalKTable.

As an alternative, you might be able to use a "global store" instead of a GlobalKTable. For this case, you can provide a custom Processor that can implement a filter before you populate the global store. See Defining a Stream Processor.

Global Stores are also local. The difference is, that for "regular store" data is partitioned, ie, each store contains different data, while for global stores, each instance loads all data (ie, data is replicated). Thus, each member of the group has it's own copy of the global store data.

like image 67
Matthias J. Sax Avatar answered Oct 21 '22 07:10

Matthias J. Sax


I made a "Streamer" that transforme person to customer like this : This is the topologie :

    journal.info("Open topic {}...", kafkaTopic);
    StreamsBuilder builder = new StreamsBuilder();
    Topology topology = builder.build();
    topology.addSource("person$", kafkaTopic)
            .addProcessor("selection", PersonProcessor::new, "person$")
            .addSink("customer$", customerTopic, "selection");
    KafkaStreams streams = new KafkaStreams(topology, props);
    streams.start();
    Runtime.getRuntime()
            .addShutdownHook(new Thread(streams::close));

This is the Processor :

public class PersonProcessor extends AbstractProcessor<String, PersonAvro> {
    Logger journal = LoggerFactory.getLogger(PersonProcessor.class);

    @Override
    public void process(String key, PersonAvro avroPerson) {
        journal.debug("traitement objet: {}, {}", key, avroPerson.getActive());
        Optional.ofNullable(avroPerson)
                .filter(person -> Optional.ofNullable(person)
                        .map(PersonAvro::getActive)
                        .filter(activation -> !activation.matches("0"))
                        .isPresent())
                .map(person -> CustomerAvro.newBuilder()
                        .setId(person.getId())
                        .setCompName(person.getCompName())
                        .setSiretCode(person.getSiretCode())
                        .setActive(person.getActive())
                        .setAdd3(person.getAdd3())
                        .setAdd4(person.getAdd4())
                        .setAdd5(person.getAdd5())
                        .setAdd6(person.getAdd6())
                        .setAdd7(person.getAdd7()))
                .map(CustomerAvro.Builder::build)
                .ifPresent(customer -> {
                    context().forward(key, customer);
                    context().commit();
                });
    }
}

And another streamer that load a local store form a GlobalKTable

@PostConstruct
private void init() throws InterruptedException {
    configurer();
    journal.info("Open topic {}...", kafkaTopic);
    StreamsBuilder builder = new StreamsBuilder();
    builder.globalTable(kafkaTopic, Materialized.as("customerStore"));
    Topology topology = builder.build();
    KafkaStreams streams = new KafkaStreams(topology, props);
    streams.start();
    customerStore = waitUntilStoreIsQueryable("customerStore", streams);
    Runtime.getRuntime()
            .addShutdownHook(new Thread(streams::close));
}

And that is able to respond to synchrone requests :

public Optional<CustomerDto> getClient(int idCoclico) {
    journal.debug(Markers.append("idCoclico", idCoclico), "Recherche d'un client COCLICO");
    // Recherche du client dans le cache
    Optional<CustomerDto> optClient = Optional.ofNullable(idCoclico)
            .map(String::valueOf)
            .map(customerStore::get)
            .map(avroCustomer -> {
                journal.debug(Markers.append("idCoclico", idCoclico),
                        "Le client existe dans le store local et n'est pas inactif");
                CustomerDto client = new CustomerDto(avroCustomer.getId());
                client.setCompName(avroCustomer.getCompName());
                client.setSiretCode(avroCustomer.getSiretCode());
                client.setAdd3(avroCustomer.getAdd3());
                client.setAdd4(avroCustomer.getAdd4());
                client.setAdd5(avroCustomer.getAdd5());
                client.setAdd6(avroCustomer.getAdd6());
                client.setAdd7(avroCustomer.getAdd7());
                Optional<String> optAdd = Optional.ofNullable(avroCustomer.getAdd7())
                        .map(String::trim)
                        .filter(add -> !add.isEmpty());
                // Si l'adresse est renseignée dans COCLICO
                if (optAdd.isPresent())
                    client.setCountryCode(avroCustomer.getCountryCode());
                // Les adresses Françaises ne sont pas renseignée
                else
                    client.setCountryCode(fr.laposte.bscc.encaissement.Constantes.CODE_PAYS_FRANCE);
                return client;
            });
    if (!optClient.isPresent())
        journal.info(Markers.append("idCoclico", idCoclico), "Le client n'existe pas dans le store local");
    return optClient;
}

The first tests or ok. I'll try to deploy this on on build environnements...

like image 32
EGRA Avatar answered Oct 21 '22 07:10

EGRA