I would like to send data to different Kafka messages based on configuration:
ResponseFactory processingPeply = null;
switch(endpointType)
{
case "email":
ProducerRecord<String, Object> record = new ProducerRecord<>("tp-email.request", tf);
RequestReplyFuture<String, Object, Object> replyFuture = processingTransactionEmailReplyKafkaTemplate.sendAndReceive(record);
SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
ConsumerRecord<String, Object> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
processingPeply = (ResponseFactory) consumerRecord.value();
break;
case "sms":
ProducerRecord<String, Object> record = new ProducerRecord<>("tp-sms.request", tf);
RequestReplyFuture<String, Object, Object> replyFuture = processingTransactionSmsReplyKafkaTemplate.sendAndReceive(record);
SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
ConsumerRecord<String, Object> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
processingPeply = (ResponseFactory) consumerRecord.value();
break;
case "network":
ProducerRecord<String, Object> record = new ProducerRecord<>("tp-network.request", tf);
RequestReplyFuture<String, Object, Object> replyFuture = processingTransactionNetworkReplyKafkaTemplate.sendAndReceive(record);
SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
ConsumerRecord<String, Object> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
processingPeply = (ResponseFactory) consumerRecord.value();
break;
default:
processingPeply = ResponseFactory.builder().status("error").build();
}
I currently get:
Do you know how I can redesign the code in some better way so I can solve the issue?
Suggesting here 4 possible approaches, in order to avoid some switch blocks in core code and honor one of DRY's principles, which is avoid duplicated code. (DRY represents a much bigger concept than just not repeating code).
1- GeneralHandler and endpoint type childs
Something like a hierarchical class' tree here, with the different endpoints being an extension of an abstract/general father.
[GeneralKafkaHandler] - core/common logic
_______________ | ________________
| | |
v v v
{SmsHandler} {EmailHandler} {NetworkHandler} -- specific params/methods
For example, getTopic()
and getFuture()
could be abstract
on the father, and implemented by each child with its own logic. Another option would be making getKafkaTemplate()
another abstract method (choose between getFuture()
or getKafkaTemplate()
). This is a simplification of the hierarchy, and the topic is retrieved from the constructor.
Abstract father
abstract class GeneralKafkaHandler
{
public abstract RequestReplyFuture<String, Object, Object>
getFuture(ProducerRecord<>r);
public abstract String getName();
protected String topic;
protected int id;
ResponseFactory processingPeply = null;
public GeneralKafkaHandler(String topic, int id)
{
this.topic = topic;
this.id = id;
}
public void handle(Object tf) //the main/common logic is implemented here
{
ProducerRecord<String, Object> record = new ProducerRecord<>(topic, tf);
RequestReplyFuture<String, Object, Object> rf = getFuture(record);
SendResult<String, Object> sr = rf.getSendFuture().get(10, TimeUnit.SECONDS);
ConsumerRecord<String, Object> consumerRecord = rf.get(10,TimeUnit.SECONDS);
processingPeply = (ResponseFactory) consumerRecord.value();
}
//...
}
SmsHandler
class SmsKafkaHandler extends GeneralKafkaHandler
{
//Sms specific variables, methods,..
public SmsKafkaHandler(String topic, int id)
{
super(topic, id);
//sms code
}
@Override
public String getName()
{
return "SMSHandler_"+topic+"_"+id);
}
@Override
public RequestReplyFuture<String, Object, Object> getFuture(ProducerRecord<> r)
{
//sms code
return processingTransactionSmsReplyKafkaTemplate.sendAndReceive(r);
}
//...
}
Main
(just an example)
Map<String, GeneralKafkaHandler> handlerMap = new HashMap<>();
handlerMap.put("sms", new SmsKafkaHandler("tp-sms.request",1));
handlerMap.put("smsplus", new SmsKafkaHandler("tp-sms-plus.request",2));
handlerMap.put("email", new EmailKafkaHandler("tp-email.request",1));
//...
handlerMap.get(endpointType.toLowerCase()).handle(tf);
There are different options here; For example, sendAndReceive is also a common method for all types, so the getFuture()
could be altered by just a getTemplate()
method. There are so many options to play with here.
This approach would be a good idea if you need/wish to manage each endpoint more in dept; you could consider it if you think the different management is worth, or will be worth in the future; As the core mechanism is the same, different extensions would allow you to fastly implement different endpoint types.
2- Custom entity
In essence, there are just 2 different elements regarding the endpoint type:
Topic
ReplyingKafkaTemplate
You could wrap them into a single Object. For example:
public class TopicEntity
{
public final String topic;
public final ReplyingKafkaTemplate<String,Object,Object> template;
public TopicEntity(String topic, ReplyingKafkaTemplate<String,Object,Object> template)
{
this.topic = topic;
this.template = template;
}
}
So then you can get this without modifying your current code (here I assume your templates are already initialized):
TopicEntity smsE = new TopicEntity("tp-sms.request",
processingTransactionSmsReplyKafkaTemplate);
TopicEntity mailE = new TopicEntity("tp-email.request",
processingTransactionEmailReplyKafkaTemplate);
Map<String, TopicEntity> handlerMap = new HashMap<>();
handlerMap.put("sms", smsE);
handlerMap.put("email",mailE);
//...
TopicEntity te = handlerMap.get(endpointType.toLowerCase());
//Based on endpoint
ProducerRecord<String, Object> record = new ProducerRecord<>(te.topic, tf);
RequestReplyFuture<String, Object, Object> rf = te.template.sendAndReceive(record);
//Common regardless of endpoint
SendResult<String, Object> sr = rf.getSendFuture().get(10, TimeUnit.SECONDS);
ConsumerRecord<String, Object> consumerRecord = rf.get(10,TimeUnit.SECONDS);
processingPeply = (ResponseFactory) consumerRecord.value();
Pretty simple and also avoids duplicate code; The entity would also allow you to define specific characteristics for each endpoint.
3- Getter methods
The simpler way, just to make the main code look cleaner.
ProducerRecord<String, Object> record = new ProducerRecord<>(getTopic(endpointType),tf);
RequestReplyFuture<String, Object, Object> replyFuture = getFuture(endpointType,record);
/*rest of the code here (common regardless type)*/
And the getters:
String getTopic(String e)
{
switch(e.toLowerCase())
{
case "email" : return "tp-email.request";
case "sms" : return "tp-sms.request";
case "network": return "tp-network.request";
default : /*handle error*/ return null;
/*kafka's response - "topic cannot be null");*/
}
}
RequestReplyFuture<String, Object, Object> getFuture(String e, ProducerRecord<> r)
{
switch(e.toLowerCase())
{
case "email":
return processingTransactionEmailReplyKafkaTemplate.sendAndReceive(r);
case "sms" :
return processingTransactionSmsReplyKafkaTemplate.sendAndReceive(r);
case "network":
return processingTransactionNetworkReplyKafkaTemplate.sendAndReceive(r);
default : /*handle error*/ return null;
} /*this one should never be executed*/
}
4- Single setter
Well, maybe this one is the simpler way...it would be a fight between approach 3 and 4.
ReplyingKafkaTemplate template;
String topic;
//...
void setParameters(String e)
{
switch(e.toLowerCase())
{
case "email" :
topic = "tp-email.request";
template = processingTransactionEmailReplyKafkaTemplate;
break;
case "sms" :
topic = "tp-sms.request";
template = processingTransactionSmsReplyKafkaTemplate;
break;
//...
}
}
//...
setParameters(endpointType);
ProducerRecord<String, Object> r= new ProducerRecord<>(topic,tf);
RequestReplyFuture<String, Object, Object> replyFuture = template.sendAndReceive(r);
SendResult<String, Object> sr = rf.getSendFuture().get(10, TimeUnit.SECONDS);
ConsumerRecord<String, Object> consumerRecord = rf.get(10,TimeUnit.SECONDS);
processingPeply = (ResponseFactory) consumerRecord.value();
1.a)- Spring and GeneralHandler
Spoiler: I don't know sh#! about Spring, so this may be totally incorrect.
From what I've read here, the abstract class doesn't need any annotation, just the fields that may be accessed by the childs should need @Autowired
.
abstract class GeneralKafkaHandler
{
public abstract RequestReplyFuture<String, Object, Object>
getFuture(ProducerRecord<>r);
public abstract String getName();
@Autowired
protected String topic;
@Autowired
protected int id;
ResponseFactory processingPeply = null;
public GeneralKafkaHandler(String topic, int id)
{
this.topic = topic;
this.id = id;
}
public void handle(Object tf) //the main/common logic is implemented here
{
ProducerRecord<String, Object> record = new ProducerRecord<>(topic, tf);
RequestReplyFuture<String, Object, Object> rf = getFuture(record);
SendResult<String, Object> sr = rf.getSendFuture().get(10, TimeUnit.SECONDS);
ConsumerRecord<String, Object> consumerRecord = rf.get(10,TimeUnit.SECONDS);
processingPeply = (ResponseFactory) consumerRecord.value();
}
//...
}
And the children should have the @Component
annotation, as well as @Autowired
in the constructor; I'm not really sure about the last one, as the examples I've seen also include fields that are also defined in the child.
@Component
class SmsKafkaHandler extends GeneralKafkaHandler
{
//Sms specific variables, methods,..
@Autowired //not sure about this..
public SmsKafkaHandler(String topic, int id)
{
super(topic, id);
//sms code
}
@Override
public String getName()
{
return "SMSHandler_"+topic+"_"+id);
}
@Override
public RequestReplyFuture<String, Object, Object> getFuture(ProducerRecord<> r)
{
//sms code
return processingTransactionSmsReplyKafkaTemplate.sendAndReceive(r);
}
//...
}
Really, I don't know what I'm talking about regarding this Spring solution, I don't even know what those annotations are, the meme of the dog looking at a computer represents me at this moment. So take this carefully...
DRY is for losers
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With