I'm using akka streams graphDSL to create a runnable graph. There are no compile-time errors wrt inlet / outlet of the stream components. Runtime throws following error:
Any ideas what should I verify to make it run ?
requirement failed: The inlets [] and outlets [] must correspond to the inlets [in] and outlets [out]
at scala.Predef$.require(Predef.scala:219)
at akka.stream.Shape.requireSamePortsAs(Shape.scala:168)
at akka.stream.impl.StreamLayout$CompositeModule.replaceShape(StreamLayout.scala:390)
at akka.stream.scaladsl.GraphApply$class.create(GraphApply.scala:18)
at akka.stream.scaladsl.GraphDSL$.create(Graph.scala:813)
at com.flipkart.connekt.busybees.streams.Topology$.bootstrap(Topology.scala:109)
at com.flipkart.connekt.busybees.BusyBeesBoot$.start(BusyBeesBoot.scala:65)
at com.flipkart.connekt.boot.Boot$.delayedEndpoint$com$flipkart$connekt$boot$Boot$1(Boot.scala:39)
at com.flipkart.connekt.boot.Boot$delayedInit$body.apply(Boot.scala:13)
The graph structure:
source ~> flowRate ~> render ~> platformPartition.in
platformPartition.out(0) ~> formatIOS ~> apnsDispatcher ~> apnsEventCreator ~> merger.in(0)
platformPartition.out(1) ~> formatAndroid ~> httpDispatcher ~> gcmPoolFlow ~> rHandlerGCM ~> merger.in(1)
merger.out ~> evtCreator ~> Sink.ignore
You have an unused inlet or outlet (one of your flows or something isn't connected on all sides). Here are some examples:
This works:
val workingFlow =
Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val intFlow = b.add(Flow[Int])
FlowShape(intFlow.in, intFlow.out)
})
The following code produces an error similar to yours, because it has an entire unused flow:
val buggyFlow =
Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val intFlow = b.add(Flow[Int])
val unusedFlow = b.add(Flow[Int]) // ERROR: This flow is unused
FlowShape(intFlow.in, intFlow.out)
})
A slightly more complex example: Here there isn't an entire unused flow, there's just an unused outlet. It produces the same error:
val buggyFlow =
Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val broadcast = b.add(Broadcast[Int](2))
val intFlow = b.add(Flow[Int])
val unusedFlow = b.add(Flow[Int]) // ERROR: This flow's outlet isn't used
broadcast ~> intFlow
broadcast ~> unusedFlow
FlowShape(broadcast.in, intFlow.out)
})
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With