Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Send message to different Kafka topics based on configuration

I would like to send data to different Kafka messages based on configuration:

ResponseFactory processingPeply = null;

        switch(endpointType)
        {
            case "email":
                ProducerRecord<String, Object> record = new ProducerRecord<>("tp-email.request", tf);
                RequestReplyFuture<String, Object, Object> replyFuture = processingTransactionEmailReplyKafkaTemplate.sendAndReceive(record);
                SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
                ConsumerRecord<String, Object> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);

                processingPeply = (ResponseFactory) consumerRecord.value();
              break;
            case "sms":
                ProducerRecord<String, Object> record = new ProducerRecord<>("tp-sms.request", tf);
                RequestReplyFuture<String, Object, Object> replyFuture = processingTransactionSmsReplyKafkaTemplate.sendAndReceive(record);
                SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
                ConsumerRecord<String, Object> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);

                processingPeply = (ResponseFactory) consumerRecord.value();
              break;
            case "network":
                ProducerRecord<String, Object> record = new ProducerRecord<>("tp-network.request", tf);
                RequestReplyFuture<String, Object, Object> replyFuture = processingTransactionNetworkReplyKafkaTemplate.sendAndReceive(record);
                SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
                ConsumerRecord<String, Object> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);

                processingPeply = (ResponseFactory) consumerRecord.value();
              break;
              
            default:
                processingPeply = ResponseFactory.builder().status("error").build();
        } 

I currently get:

  • Variable 'record' is already defined in the scope
  • Variable 'sendResult' is already defined in the scope
  • Variable 'consumerRecord' is already defined in the scope

Do you know how I can redesign the code in some better way so I can solve the issue?

like image 311
Peter Penzov Avatar asked Mar 02 '23 17:03

Peter Penzov


1 Answers

Suggesting here 4 possible approaches, in order to avoid some switch blocks in core code and honor one of DRY's principles, which is avoid duplicated code. (DRY represents a much bigger concept than just not repeating code).


1- GeneralHandler and endpoint type childs

Something like a hierarchical class' tree here, with the different endpoints being an extension of an abstract/general father.

                      [GeneralKafkaHandler] - core/common logic
               _______________ | ________________
              |                |                |
              v                v                v
         {SmsHandler}    {EmailHandler}   {NetworkHandler}  -- specific params/methods

For example, getTopic() and getFuture() could be abstract on the father, and implemented by each child with its own logic. Another option would be making getKafkaTemplate() another abstract method (choose between getFuture() or getKafkaTemplate()). This is a simplification of the hierarchy, and the topic is retrieved from the constructor.

Abstract father

abstract class GeneralKafkaHandler 
{
   public abstract RequestReplyFuture<String, Object, Object> 
                   getFuture(ProducerRecord<>r);
   public abstract String getName();

   protected String topic;
   protected int id;
   ResponseFactory processingPeply = null;

   public GeneralKafkaHandler(String topic, int id) 
   {
       this.topic = topic; 
       this.id = id;
   }

   public void handle(Object tf) //the main/common logic is implemented here
   {
       ProducerRecord<String, Object> record = new ProducerRecord<>(topic, tf);
       RequestReplyFuture<String, Object, Object> rf = getFuture(record);  
       SendResult<String, Object> sr = rf.getSendFuture().get(10, TimeUnit.SECONDS);
       ConsumerRecord<String, Object> consumerRecord = rf.get(10,TimeUnit.SECONDS);
       processingPeply = (ResponseFactory) consumerRecord.value();
   }

   //...
}

SmsHandler

class SmsKafkaHandler extends GeneralKafkaHandler 
{
   //Sms specific variables, methods,..
    
   public SmsKafkaHandler(String topic, int id) 
   {
      super(topic, id);
      //sms code
   }

   @Override
   public String getName() 
   {
      return "SMSHandler_"+topic+"_"+id);
   }

   @Override
   public RequestReplyFuture<String, Object, Object> getFuture(ProducerRecord<> r)
   {
      //sms code
      return processingTransactionSmsReplyKafkaTemplate.sendAndReceive(r);
   }

   //...
}

Main (just an example)

Map<String, GeneralKafkaHandler> handlerMap = new HashMap<>();
handlerMap.put("sms", new SmsKafkaHandler("tp-sms.request",1));
handlerMap.put("smsplus", new SmsKafkaHandler("tp-sms-plus.request",2));
handlerMap.put("email", new EmailKafkaHandler("tp-email.request",1));
//...

handlerMap.get(endpointType.toLowerCase()).handle(tf);

There are different options here; For example, sendAndReceive is also a common method for all types, so the getFuture() could be altered by just a getTemplate() method. There are so many options to play with here.

This approach would be a good idea if you need/wish to manage each endpoint more in dept; you could consider it if you think the different management is worth, or will be worth in the future; As the core mechanism is the same, different extensions would allow you to fastly implement different endpoint types.


2- Custom entity

In essence, there are just 2 different elements regarding the endpoint type:

  1. Topic
  2. ReplyingKafkaTemplate

You could wrap them into a single Object. For example:

public class TopicEntity
{
  public final String topic;
  public final ReplyingKafkaTemplate<String,Object,Object> template;

  public TopicEntity(String topic, ReplyingKafkaTemplate<String,Object,Object> template)
  {
     this.topic = topic;
     this.template = template;
  }    
}

So then you can get this without modifying your current code (here I assume your templates are already initialized):

TopicEntity smsE = new TopicEntity("tp-sms.request",
                                   processingTransactionSmsReplyKafkaTemplate);
TopicEntity mailE = new TopicEntity("tp-email.request",
                                   processingTransactionEmailReplyKafkaTemplate);

Map<String, TopicEntity> handlerMap = new HashMap<>();
handlerMap.put("sms", smsE);
handlerMap.put("email",mailE);
//...

TopicEntity te = handlerMap.get(endpointType.toLowerCase()); 
//Based on endpoint
ProducerRecord<String, Object> record = new ProducerRecord<>(te.topic, tf);
RequestReplyFuture<String, Object, Object> rf = te.template.sendAndReceive(record);
//Common regardless of endpoint
SendResult<String, Object> sr = rf.getSendFuture().get(10, TimeUnit.SECONDS);
ConsumerRecord<String, Object> consumerRecord = rf.get(10,TimeUnit.SECONDS);
processingPeply = (ResponseFactory) consumerRecord.value();

Pretty simple and also avoids duplicate code; The entity would also allow you to define specific characteristics for each endpoint.


3- Getter methods

The simpler way, just to make the main code look cleaner.

ProducerRecord<String, Object> record = new ProducerRecord<>(getTopic(endpointType),tf);
RequestReplyFuture<String, Object, Object> replyFuture = getFuture(endpointType,record);
/*rest of the code here (common regardless type)*/

And the getters:

String getTopic(String e)
{
   switch(e.toLowerCase())
   {
      case "email"  : return "tp-email.request"; 
      case "sms"    : return "tp-sms.request";
      case "network": return "tp-network.request";
      default : /*handle error*/ return null; 
                /*kafka's response - "topic cannot be null");*/
    }
}

RequestReplyFuture<String, Object, Object> getFuture(String e, ProducerRecord<> r)
{
  switch(e.toLowerCase())
  {
     case "email": 
          return processingTransactionEmailReplyKafkaTemplate.sendAndReceive(r);
     case "sms" :
           return processingTransactionSmsReplyKafkaTemplate.sendAndReceive(r);
     case "network": 
           return processingTransactionNetworkReplyKafkaTemplate.sendAndReceive(r);
     default : /*handle error*/ return null;
  }            /*this one should never be executed*/
}

4- Single setter

Well, maybe this one is the simpler way...it would be a fight between approach 3 and 4.

ReplyingKafkaTemplate template;
String topic;
//...

void setParameters(String e)
{
  switch(e.toLowerCase())
  {
    case "email"  : 
          topic = "tp-email.request"; 
          template = processingTransactionEmailReplyKafkaTemplate;
          break;         
    case "sms"    :       
          topic = "tp-sms.request"; 
          template = processingTransactionSmsReplyKafkaTemplate;
          break;         
     //...
   }
}
//...

setParameters(endpointType);

ProducerRecord<String, Object> r= new ProducerRecord<>(topic,tf);
RequestReplyFuture<String, Object, Object> replyFuture = template.sendAndReceive(r);
SendResult<String, Object> sr = rf.getSendFuture().get(10, TimeUnit.SECONDS);
ConsumerRecord<String, Object> consumerRecord = rf.get(10,TimeUnit.SECONDS);
processingPeply = (ResponseFactory) consumerRecord.value();

1.a)- Spring and GeneralHandler

Spoiler: I don't know sh#! about Spring, so this may be totally incorrect.

From what I've read here, the abstract class doesn't need any annotation, just the fields that may be accessed by the childs should need @Autowired.

abstract class GeneralKafkaHandler 
{
   public abstract RequestReplyFuture<String, Object, Object> 
                   getFuture(ProducerRecord<>r);
   public abstract String getName();

   @Autowired
   protected String topic;
   @Autowired
   protected int id;

   ResponseFactory processingPeply = null;

   public GeneralKafkaHandler(String topic, int id) 
   {
       this.topic = topic; 
       this.id = id;
   }

   public void handle(Object tf) //the main/common logic is implemented here
   {
       ProducerRecord<String, Object> record = new ProducerRecord<>(topic, tf);
       RequestReplyFuture<String, Object, Object> rf = getFuture(record);  
       SendResult<String, Object> sr = rf.getSendFuture().get(10, TimeUnit.SECONDS);
       ConsumerRecord<String, Object> consumerRecord = rf.get(10,TimeUnit.SECONDS);
       processingPeply = (ResponseFactory) consumerRecord.value();
   }

   //...
}

And the children should have the @Component annotation, as well as @Autowired in the constructor; I'm not really sure about the last one, as the examples I've seen also include fields that are also defined in the child.

@Component
class SmsKafkaHandler extends GeneralKafkaHandler 
{
   //Sms specific variables, methods,..
    
   @Autowired  //not sure about this..
   public SmsKafkaHandler(String topic, int id) 
   {
      super(topic, id);
      //sms code
   }

   @Override
   public String getName() 
   {
      return "SMSHandler_"+topic+"_"+id);
   }

   @Override
   public RequestReplyFuture<String, Object, Object> getFuture(ProducerRecord<> r)
   {
      //sms code
      return processingTransactionSmsReplyKafkaTemplate.sendAndReceive(r);
   }

   //...
}

Really, I don't know what I'm talking about regarding this Spring solution, I don't even know what those annotations are, the meme of the dog looking at a computer represents me at this moment. So take this carefully...


DRY is for losers

like image 117
aran Avatar answered Mar 25 '23 00:03

aran