apache / pekko

Build highly concurrent, distributed, and resilient message-driven applications using Java/Scala
https://pekko.apache.org/
Apache License 2.0
1.17k stars 140 forks source link

`Source.combine` failing with exception for input sources #1458

Closed mdedetrich closed 2 weeks ago

mdedetrich commented 2 weeks ago

I am having an issue where Source.combine is failing for a List of Source's, throwing this exception. In this specific case the Source happens to be 6 elements long, currently trying to diagnose what is causing the issue (as you can see the exception isn't too helpful)

2024-08-29 10:07:26.284 [default-pekko.actor.default-dispatcher-4] ERROR o.a.pekko.actor.RepointableActorRef - Error during preStart in [org.apache.pekko.stream.impl.SetupSourceStage$$anon$2-setup(org.apache.pekko.stream.javadsl-Source.scala:699)]: 2 is out of bounds (min 0, max 1)
java.lang.IndexOutOfBoundsException: 2 is out of bounds (min 0, max 1)
        at scala.collection.immutable.Vector.ioob(Vector.scala:285)
        at scala.collection.immutable.Vector1.apply(Vector.scala:390)
        at org.apache.pekko.stream.UniformFanInShape.in(UniformFanInShape.scala:50)
        at org.apache.pekko.stream.scaladsl.Source$.$anonfun$combine$7(Source.scala:865)
        at org.apache.pekko.stream.scaladsl.Source$.$anonfun$combine$7$adapted(Source.scala:864)
        at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:619)
        at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:617)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:935)
        at scala.collection.IterableOps$WithFilter.foreach(Iterable.scala:905)
        at org.apache.pekko.stream.javadsl.Source.combine(Source.scala:754)
        <REDACTED BUT POINTS to Source.combine(sources, Concat::create).withAttributes(attr).runWith(Sink.collect(Collectors.toList()), mat) inside of Source.completionStage>
        at org.apache.pekko.stream.javadsl.Source$.$anonfun$fromMaterializer$1(Source.scala:699)
        at org.apache.pekko.stream.impl.SetupSourceStage$$anon$2.preStart(SetupStage.scala:100)
        at org.apache.pekko.stream.impl.fusing.GraphInterpreter.init(GraphInterpreter.scala:317)
        at org.apache.pekko.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:631)
        at org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:740)
        at org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter.finishShellRegistration(ActorGraphInterpreter.scala:783)
        at org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter.org$apache$pekko$stream$impl$fusing$ActorGraphInterpreter$$shortCircuitBatch(ActorGraphInterpreter.scala:801)
        at org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:832)
        at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
        at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
        at org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:729)
        at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
        at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
        at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:272)
        at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:233)
        at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:245)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
        at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
        at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
        at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
        at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
pjfanning commented 2 weeks ago

For the record, what version of Pekko are you using? Ideally, this should be tried with a Pekko 1.0 release (eg 1.0.3) and 1.1.0-M1 - so that we know how long this has been an issue.

mdedetrich commented 2 weeks ago

I'm using one of the latest 1.1.0-M1 snapshots but I don't think this part of the code has been touched since the inception of the fork

mdedetrich commented 2 weeks ago

I am closing this because the error with PEBCAK, instead of using Concat::create (which is what you should be using) I instead had Concat.create(). The former is just passing a function reference which means the size arg is correctly passed into Concat.create(arg) where as the latter ignores the size arg (hence creating the index out of bounds exception).

There is an argument to be made that we should try and catch the IndexOutOfBoundsException and rethrow another exception with a more helpful error message, @pjfanning wdyt?

In any case will close the issue, sorry for the circus.

pjfanning commented 2 weeks ago

I am closing this because the error with PEBCAK, instead of using Concat::create (which is what you should be using) I instead had Concat.create(). The former is just passing a function reference which means the size arg is correctly passed into Concat.create(arg) where as the latter ignores the size arg (hence creating the index out of bounds exception).

There is an argument to be made that we should try and catch the IndexOutOfBoundsException and rethrow another exception with a more helpful error message, @pjfanning wdyt?

In any case will close the issue, sorry for the circus.

Seems like a good idea to try to add more informative exception messages.