In Spring 5.0.0.RC4 reference documentation, it said:
Publisher or Flow.Publisher — any type implementing Reactive Streams Publisher is supported.
https://docs.spring.io/spring/docs/5.0.0.RC4/spring-framework-reference/reactive-web.html#webflux
But when I created a simple project based on Spring 5.0.0.RC4, I got failure when return Flow.Publisher
in a controller. It seems the Flow.Publisher
can not be serialized by jackson.
org.springframework.core.codec.CodecException: Type definition error: [simple type, class java.util.concurrent.ForkJoinPool$DefaultForkJoinWorkerThreadFactory]; nested exception is com.fasterxml.jackson.databind.exc.InvalidDefinitionException: No serializer found for class java.util.concurrent.ForkJoinPool$DefaultForkJoinWorkerThreadFactory and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS) (through reference chain: java.util.concurrent.SubmissionPublisher["executor"]->java.util.concurrent.ForkJoinPool["factory"])
at org.springframework.http.codec.json.AbstractJackson2Encoder.encodeValue(AbstractJackson2Encoder.java:132)
at org.springframework.http.codec.json.AbstractJackson2Encoder.lambda$encode$0(AbstractJackson2Encoder.java:96)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:107)
at reactor.core.publisher.FluxJust$WeakScalarSubscription.request(FluxJust.java:91)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:156)
at org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.onSubscribe(ChannelSendOperator.java:143)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90)
at reactor.core.publisher.FluxJust.subscribe(FluxJust.java:68)
at reactor.core.publisher.FluxMapFuseable.subscribe(FluxMapFuseable.java:63)
at org.springframework.http.server.reactive.ChannelSendOperator.subscribe(ChannelSendOperator.java:76)
at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1068)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:241)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:72)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:198)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:198)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1068)
at reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onNext(MonoIgnoreThen.java:290)
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:1625)
at reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onSubscribe(MonoIgnoreThen.java:279)
at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:161)
at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:53)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:148)
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56)
at reactor.core.publisher.MonoPeekFuseable.subscribe(MonoPeekFuseable.java:74)
at reactor.core.publisher.MonoPeekFuseable.subscribe(MonoPeekFuseable.java:74)
at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:271)
at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:798)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115)
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:1625)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:156)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1439)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:1313)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90)
at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54)
at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59)
at reactor.core.publisher.Mono.subscribe(Mono.java:2757)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:418)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:210)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:91)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:55)
at reactor.core.publisher.FluxConcatMap.subscribe(FluxConcatMap.java:121)
at reactor.core.publisher.MonoNext.subscribe(MonoNext.java:40)
at reactor.core.publisher.MonoSwitchIfEmpty.subscribe(MonoSwitchIfEmpty.java:44)
at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60)
at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60)
at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44)
at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44)
at reactor.core.publisher.Mono.subscribe(Mono.java:2757)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:167)
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56)
at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44)
at reactor.core.publisher.MonoPeekTerminal.subscribe(MonoPeekTerminal.java:61)
at reactor.ipc.netty.channel.ChannelOperations.applyHandler(ChannelOperations.java:380)
at reactor.ipc.netty.http.server.HttpServerOperations.onHandlerStart(HttpServerOperations.java:354)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at java.base/java.lang.Thread.run(Thread.java:844)
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: No serializer found for class java.util.concurrent.ForkJoinPool$DefaultForkJoinWorkerThreadFactory and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS) (through reference chain: java.util.concurrent.SubmissionPublisher["executor"]->java.util.concurrent.ForkJoinPool["factory"])
at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:77)
at com.fasterxml.jackson.databind.SerializerProvider.reportBadDefinition(SerializerProvider.java:1191)
at com.fasterxml.jackson.databind.DatabindContext.reportBadDefinition(DatabindContext.java:312)
at com.fasterxml.jackson.databind.ser.impl.UnknownSerializer.failForEmpty(UnknownSerializer.java:71)
at com.fasterxml.jackson.databind.ser.impl.UnknownSerializer.serialize(UnknownSerializer.java:33)
at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:727)
at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:719)
at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:155)
at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:727)
at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:719)
at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:155)
at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:480)
at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:319)
at com.fasterxml.jackson.databind.ObjectWriter$Prefetch.serialize(ObjectWriter.java:1396)
at com.fasterxml.jackson.databind.ObjectWriter._configAndWriteValue(ObjectWriter.java:1120)
at com.fasterxml.jackson.databind.ObjectWriter.writeValue(ObjectWriter.java:950)
at org.springframework.http.codec.json.AbstractJackson2Encoder.encodeValue(AbstractJackson2Encoder.java:129)
... 65 common frames omitted
Update 1: The complete source codes for Java 9.
@GetMapping
public Flow.Publisher<Post> all() {
SubmissionPublisher publisher = new SubmissionPublisher();
publisher.submit(new Post(1L, "post one", "content of post one"));
publisher.submit(new Post(2L, "post two", "content of post two"));
return publisher;
}
Update 2: In the reference doc of Spring 5.0.0.RELEASE, I can not find the statement there.
Update 3: Updated the Spring 5.0.2(managed by Spring Boot 2.0.0.M7), the codes is running well without any exceptions, but it will block the request when access it. Check my updated sample codes.
Indeed, this is not supported right now.
Please subscribe to the issue SPR-16052 to know when it's available.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With