I have a spring-cloud-stream application with kafka binding. I would like to create a Test Junit.
I have my class definitions such as below:-
@EnableBinding(Sink.class)
@Slf4j
public class Messaging {
@Autowired
private RestTemplate restTemplate;
@Value("${messaging}")
private String url;
@Value("${messaging.prefix}")
private String messaging;
@StreamListener(Sink.INPUT)
public void handle(Message<String> request) {
log.info("Topic name ==> %s :", request.getPayload());
try {
String jsonString = request.getPayload().replace("\\", "").replace("\"{", "{").replace("}\"", "}");
JsonObject jsonObject = (JsonObject)jsonParser.parse(request.getPayload());
String urlRequest =url.concat(jsonObject.get("targetClass").getAsString()).concat(messaging);
HttpEntity<Object> entity = new HttpEntity<Object>(jsonString, getHeaderMap(request.getHeaders()));
ResponseEntity<String> response = restTemplate.postForEntity(urlRequest, entity, String.class);
} catch (ValidationException validationException) {
log.error("Error de validación: {}", validationException.getMessage());
} catch (Exception e) {
log.error("Error ", e.getMessage());
}
}
I have my channel definitions such as below:-
public interface Sink {
String INPUT = "input";
@Input(Sink.INPUT)
SubscribableChannel input();
}
And this my application properties to bind your channels to the same queue:
spring:
cloud:
bus:
destination:CONFIG
enabled: true
stream:
bindings:
input:
group: input_messaging
contentType: application/json
destination: CONFIG_Test1,CONFIG_Test2
This is the test that I have created but it give this erros : Caused by: org.springframework.context.ApplicationContextException: Unable to start ServletWebServerApplicationContext due to missing ServletWebServerFactory bean.
@RunWith(SpringRunner.class)
@SpringBootTest
@TestPropertySource("classpath:test.properties")
@EnableConfigurationProperties
@ContextConfiguration(classes = MessagingTestConfig.class)
public class MessagingListenerTest {
@Before
public void setup() {
}
@Test
@SuppressWarnings("unchecked")
public void testReturn() throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(MessagingTest.class,
"--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.input.contentType=text/plain",
"--spring.cloud.stream.bindings.output.contentType=text/plain");
MessageCollector collector = context.getBean(MessageCollector.class);
Processor processor = context.getBean(Processor.class);
Sink inputChannel = context.getBean(Sink.class);
Message<String> request = MessageBuilder.withPayload("headers").setHeader("topicName", "topic-1").build();
inputChannel.input()
.send(request);
Message<String> message = (Message<String>) collector
.forChannel(processor.output()).poll(1, TimeUnit.SECONDS);
assertThat(message).isNotNull();
assertThat(message.getPayload()).contains("topicName");
context.close();
}
@EnableBinding(Sink.class)
@EnableAutoConfiguration
public class TestProcessor {
@StreamListener(Sink.INPUT)
public Message<String> hundle(Message<String> messageHundle) {
return messageHundle;
}
}
}
I wonder if there is and a way to test my class, how to do it and thanks for your help
This is the change using the latest version of Spring Cloud Stream
package com.common.messaging;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.nio.charset.StandardCharsets;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.binder.test.InputDestination;
import org.springframework.cloud.stream.binder.test.OutputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.util.ReflectionTestUtils;
import org.springframework.web.client.RestTemplate;
import lombok.extern.slf4j.Slf4j;
@EnableIntegration
@Configuration
@TestPropertySource(locations="classpath:/msc-test.properties")
@Slf4j
@RunWith(SpringRunner.class)
@ActiveProfiles("test")
@ContextConfiguration(classes = MessagingListenerTestConfig.class)
@Import(TestChannelBinderConfiguration.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
@DirtiesContext
public class MessagingListenerTest {
@Autowired
private MessagingListener listener;
@Autowired
private InputDestination inputDestination;
@Autowired
private OutputDestination outputDestination;
@Mock
private RestTemplate restTemplate;
private static final String URL = "http://localhost:8080/";
@Before
public void setup() {
restTemplate = mock(RestTemplate.class);
ReflectionTestUtils.setField(listener, "restTemplate", restTemplate);
ResponseEntity<String> mockResponse = new ResponseEntity<>("{}", HttpStatus.ACCEPTED);
when(restTemplate.postForEntity(any(), any(), eq(String.class))).thenReturn(mockResponse);
}
@Test
public void testHundleMessage() {
String expectedUrl = URL;
Message<String> request = MessageBuilder.withPayload("headers").setHeader("topicName", "topic-1").build();
log.info("request Test :", request.getPayload());
//inputDestination.send(new GenericMessage<byte[]>(request.getPayload().getBytes(StandardCharsets.UTF_8)));
listener.handle(request);
//Verificar la recepción de los mensajes
assertThat(outputDestination.receive()).isNotNull();
assertThat(outputDestination.receive().getPayload().toString()).contains("topicName");
//Verificar la url del restTemplate
Mockito.verify(restTemplate, Mockito.times(1)).postForEntity(expectedUrl, any(), eq(String.class));
}
}
but given this error, and I do not know why
java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
at java.util.ArrayList.rangeCheck(ArrayList.java:653)
at java.util.ArrayList.get(ArrayList.java:429)
at org.springframework.cloud.stream.binder.test.OutputDestination.receive(OutputDestination.java:59)
at org.springframework.cloud.stream.binder.test.OutputDestination.receive(OutputDestination.java:73)
at com.common.messaging.MessagingListenerTest.testHundleMessage(MessagingListenerTest.java:87)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.springframework.test.context.junit4.statements.RunBeforeTestExecutionCallbacks.evaluate(RunBeforeTestExecutionCallbacks.java:74)
at org.springframework.test.context.junit4.statements.RunAfterTestExecutionCallbacks.evaluate(RunAfterTestExecutionCallbacks.java:84)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
I still having the same problems
@EnableIntegration
@Configuration
@TestPropertySource(locations="classpath:/msc-test.properties")
@Slf4j
@RunWith(SpringRunner.class)
@ActiveProfiles("test")
@ContextConfiguration(classes = MessagingListenerTestConfig.class)
@Import(TestChannelBinderConfiguration.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
@DirtiesContext
public class MessagingListenerTest {
@Autowired
private MessagingListener listener;
@Autowired
private InputDestination inputDestination;
@Autowired
private OutputDestination outputDestination;
@Mock
private RestTemplate restTemplate;
private static final String EXPECTED_URL = "http://localhost:11000/test/v2/verification/messaging";
@Before
public void setup() {
restTemplate = mock(RestTemplate.class);
ReflectionTestUtils.setField(listener, "restTemplate", restTemplate);
ResponseEntity<String> mockResponse = new ResponseEntity<>("{}", HttpStatus.ACCEPTED);
when(restTemplate.postForEntity(any(), any(), eq(String.class))).thenReturn(mockResponse);
}
@Test
public void testHundleMessage() {
JSONObject obj1 = new JSONObject()
.put("id", 1)
.put("targetClass", "/test/v2/verification");
Message<String> request = MessageBuilder.withPayload(obj1.toString()).build();
log.info("request Test : "+ request.getPayload());
inputDestination.send(new GenericMessage<byte[]>(request.getPayload().getBytes(StandardCharsets.UTF_8)));
listener.handle(request);
//Verificar la url del restTemplate
Mockito.verify(restTemplate, Mockito.times(1)).postForEntity(eq(EXPECTED_URL), any(), eq(String.class));
//Verificar la recepción de los mensajes
assertThat(outputDestination.receive()).isNotNull();
assertThat(outputDestination.receive().getPayload().toString()).contains("topicName");
}
}
just in this line
inputDestination.send(new GenericMessage<byte[]>(request.getPayload().getBytes(StandardCharsets.UTF_8)));
and this the error Junit
java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
at java.util.ArrayList.rangeCheck(ArrayList.java:653)
at java.util.ArrayList.get(ArrayList.java:429)
at org.springframework.cloud.stream.binder.test.AbstractDestination.getChannel(AbstractDestination.java:34)
at org.springframework.cloud.stream.binder.test.InputDestination.send(InputDestination.java:37)
at com.common.messaging.MessagingListenerTest.testHundleMessage(MessagingListenerTest.java:93)
and the console error
2020-07-14 11:29:16.850 INFO 25240 --- [ main] c.b.a.m.c.m.MessagingListenerTest : The following profiles are active: test
2020-07-14 11:29:18.171 INFO 25240 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2020-07-14 11:29:18.192 INFO 25240 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2020-07-14 11:29:18.212 INFO 25240 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2020-07-14 11:29:18.392 INFO 25240 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-07-14 11:29:18.429 INFO 25240 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-07-14 11:29:20.113 INFO 25240 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService 'taskScheduler'
2020-07-14 11:29:20.356 INFO 25240 --- [ main] o.s.i.e.EventDrivenConsumer : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2020-07-14 11:29:20.358 INFO 25240 --- [ main] o.s.i.c.PublishSubscribeChannel : Channel 'application.errorChannel' has 1 subscriber(s).
2020-07-14 11:29:20.361 INFO 25240 --- [ main] o.s.i.e.EventDrivenConsumer : started bean '_org.springframework.integration.errorLogger'
2020-07-14 11:29:20.382 INFO 25240 --- [ main] c.b.a.m.c.m.MessagingListenerTest : Started MessagingListenerTest in 4.629 seconds (JVM running for 9.331)
2020-07-14 11:29:23.255 INFO 25240 --- [ main] c.b.a.m.c.m.MessagingListenerTest : request Test : {"targetClass":"/test/v2/verification","id":1}
2020-07-14 11:29:28.207 INFO 25240 --- [ main] o.s.i.e.EventDrivenConsumer : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2020-07-14 11:29:28.207 INFO 25240 --- [ main] o.s.i.c.PublishSubscribeChannel : Channel 'application.errorChannel' has 0 subscriber(s).
2020-07-14 11:29:28.207 INFO 25240 --- [ main] o.s.i.e.EventDrivenConsumer : stopped bean '_org.springframework.integration.errorLogger'
2020-07-14 11:29:28.208 INFO 25240 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Shutting down ExecutorService 'taskScheduler'
Picked up JAVA_TOOL_OPTIONS: -agentpath:"C:\windows\system32\Aternity\Java\JavaHookLoader.dll"="C:\ProgramData\Aternity\hooks"
what could be the problem
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With