Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring Webflux(Mono/Flux) with AOP triggering REST call at interception and working with Mono/Flux

I have written an @Aspect to intercept Reactive Methods that return values in Mono/Flux. Using @AfterReturning advice, i'm trying to fire an APNS notification by calling a webservice.

unfortunately the processNotification Mono services is immediately returning onComplete signal without executing the chain of calls. Below is my sample program.

@Aspect
@Component
@Slf4j
public class NotifyAspect{
    private final NotificationServiceHelper notificationServiceHelper;

    @Autowired
    public NotifyAspect(NotificationServiceHelper notificationServiceHelper) {
        this.notificationServiceHelper = notificationServiceHelper;
    }

    @AfterReturning(pointcut = "@annotation(com.cupid9.api.common.annotations.Notify)", returning = "returnValue")
    public void generateNotification(JoinPoint joinPoint, Object returnValue) throws Throwable {
        log.info("AfterReturning Advice - Intercepting Method : {}", joinPoint.getSignature().getName());

        //Get Intercepted method details.
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        Method method = signature.getMethod();

        //Get the Notification Details.
        Notify myNotify = method.getAnnotation(Notify.class);
        if (Mono.class.isAssignableFrom(returnValue.getClass())) {
            Mono<Object> result = (Mono<Object>) returnValue;
            result.doOnSubscribe(o -> {
                log.debug("On Subscription...");
                notificationServiceHelper.processNotification(myNotify.notificationType())
                .doOnError(throwable -> {
                    log.error("Exception in notification processor",throwable);
                });
            });
        }
    }

}
@Slf4j
@Service
public class NotificationServiceHelper {
    private ReactiveUserProfileRepository userProfileRepository;

    @Value("${services.notification.url}")
    private String notificationServiceUrl;

    private RestWebClient restWebClient;

    @Autowired
    public NotificationServiceHelper(RestWebClient restWebClient,
                                     ReactiveUserProfileRepository reactiveUserProfileRepository) {
        this.restWebClient = restWebClient;
        this.userProfileRepository = reactiveUserProfileRepository;
    }

    public Flux<Notification> processNotification(NotificationSchema.NotificationType notificationType) {
        /*Get user profile details*/
        return SessionHelper.getProfileId()
                .switchIfEmpty( Mono.error(new BadRequest("Invalid Account 1!")))
                .flatMap(profileId ->
                        Mono.zip(userProfileRepository.findByIdAndStatus(profileId, Status.Active), SessionHelper.getJwtToken()))
                .switchIfEmpty( Mono.error(new BadRequest("Invalid Account 2!")))
                .flatMapMany(tuple2 ->{
                    //Get user details and make sure there are some valid devices associated.
                    var userProfileSchema = tuple2.getT1();
                    log.info("Processing Notifications for User Profile : {}", userProfileSchema.getId());
                    if (Objects.isNull(userProfileSchema.getDevices()) || (userProfileSchema.getDevices().size() < 1)) {
                        return Flux.error(new InternalServerError("No Devices associate with this user. Can not send notifications."));
                    }

                    //Build Notification message from the Notification Type
                    var notificationsMap = new LinkedHashSet<Notification>();
                    userProfileSchema.getDevices().forEach(device -> {
                        var notificationPayload = Notification.builder()
                                .notificationType(notificationType)
                                .receiverDevice(device)
                                .receiverProfileRef(userProfileSchema.getId())
                                .build();
                        notificationsMap.add(notificationPayload);
                    });

                    //Get session token for authorization
                    var jwtToken = tuple2.getT2();

                    //Build the URI needed to make the rest call.
                    var uri = UriComponentsBuilder.fromUriString(notificationServiceUrl).build().toUri();
                    log.info("URI built String : {}", uri.toString());

                    //Build the Headers needed to make the rest call.
                    var headers = new HttpHeaders();
                    headers.add(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE);
                    headers.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
                    headers.add(HttpHeaders.AUTHORIZATION, jwtToken);

                    var publishers = new ArrayList<Mono<ClientResponse>>();
                    notificationsMap.forEach(notification -> {
                        publishers.add(restWebClient.post(uri, headers, notification));
                    });
                    return Flux.merge(publishers).flatMap(clientResponse -> {
                        var httpStatus = clientResponse.statusCode();
                        log.info("NotificationService HTTP status code : {}", httpStatus.value());
                        if (httpStatus.is2xxSuccessful()) {
                            log.info("Successfully received response from Notification Service...");
                            return clientResponse.bodyToMono(Notification.class);
                        } else {
                            // return Flux.empty();
                            return clientResponse.bodyToMono(Error.class)
                                    .flatMap(error -> {
                                        log.error("Error calling Notification Service :{}", httpStatus.getReasonPhrase());
                                        if (httpStatus.value() == 400) {
                                            return Mono.error(new BadRequest(error.getMessage()));
                                        }
                                        return Mono.error(new InternalServerError(String.format("Error calling Notification Service : %s", error.getMessage())));
                                    });
                        }
                    });
                }).doOnError(throwable -> {
                    throw new InternalServerError(throwable.getMessage(), throwable);
                });
    }

}

How can we trigger this call in async without making the interception wait.. right now processNotification is always returning onComplete signal without executing. The chain is not executing as expected

like image 678
Ahamed Avatar asked Oct 21 '25 01:10

Ahamed


1 Answers

@Target({ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Log {

     public String title() default "";

}


@SuppressWarnings({"unchecked"})
@Around("@annotation(operlog)")
public Mono<Result> doAround(ProceedingJoinPoint joinPoint, Log operlog) {
    Mono<Result> mono;

    try {
        mono = (Mono<Result>) joinPoint.proceed();
    } catch (Throwable throwable) {
        throw new RuntimeException(throwable);
    }

    return mono.doOnNext(result -> {
                //doSomething(result);
            };

}
like image 159
oycc Avatar answered Oct 25 '25 12:10

oycc