Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is it possible to ReKey a GlobalKTable?

I want to ReKey a GlobalKTable (probably while initializing it, as I believe they are read only once created).

Is this possible?

I have two topics that I am working with in a Spring/Java Kafka Streams app. The first is not compacted, the second is. Both use Avro for their keys and values.

The app streams records from the first (non-compacted) topic, and attaches additional data from the compacted topic via KStream#leftJoin. The compacted topic has been brought into the app as a GlobalKTable, created via StreamsBuilder#globalTable() and needs to stay this way (I need every record from all partitions of the topic available in each instance of the app).

I know there is talk of supporting non primary key joins (https://issues.apache.org/jira/browse/KAFKA-3705), but to my knowledge, I can't do this yet...

@Configuration
@EnableKafkaStreams
public class StreamsConfig {

  @Autowired
  private MyCustomSerdes serdes;

  @Bean
  public KStream<AvroKeyOne, AvroValueOne> reKeyJoin(StreamsBuilder streamsBuilder) {

    GlobalKTable<AvroKeyOne, AvroValueOne> globalTable = streamsBuilder.globalTable("topicOne", Consumed.with(
      serdes.getAvroKeyOne()
      serdes.getAvroValueOne()
    ));

    KStream<AvroKeyTwo, AvroValueOne> kStream = streamsBuilder.stream("topicTwo", Consumed.with(
      serdes.getAvroKeyTwo(),
      serdes.getAvroValueOne()
    ));

    kStream.join(
      globalTable,
       /**
        * the KeyValueMapper. I need to rekey the Global table as well to the
        * corresponding String (which it's data will have) if I want this join
        * to return results
        */
      (streamKey, streamValue) -> {return streamKey.getNewStringKey()},
      (/**ValueJoiner Deal**/)
    );
  }

}
like image 427
Casey Avatar asked Apr 17 '19 00:04

Casey


Video Answer


1 Answers

I want to ReKey a GlobalKTable (probably while initializing it, as I believe they are read only once created).

Is this possible?

There is no direct support for this today. You already mentioned upcoming work such as adding support to global tables for non-primary-key joins, but this is not available yet.

What you could do today: you can re-key (re-partition) the original Kafka topic into a new topic, and then read the re-keyed topic into your global KTable. Maybe this is an option for you.

like image 82
Michael G. Noll Avatar answered Sep 24 '22 07:09

Michael G. Noll