akka / akka-meta

This repository is dedicated to high-level feature discussions and for persisting design decisions.
Apache License 2.0
201 stars 23 forks source link

Akka-Http/WebSocket - Substream Source has not been materialized in 5k millis #52

Closed KalyanKadiyala closed 7 years ago

KalyanKadiyala commented 7 years ago

Lib dependency: "com.typesafe.akka" %% "akka-http" % "10.0.7"

Issue Background - I am running a Websocket performance test for a WebSocket Proxy - custom implementation using Akka-Http; connected as a client i.e., test for In-bound messages. Websocket end point happens to be Riemann stream processor. Riemann gets events from a future in my client (scala test) via Riemann TCP and the events are read back over Websocket. Mention of Riemann is for context only.

Regarding API usage...

Socket Request -

http.singleWebSocketRequest(WebSocketRequest(s"ws://${proxyDependency.wsHost}:${proxyDependency.wsPort}/${uriPath}"), clientFlow)

Flow Definition: Flow.fromSinkAndSource(defineSinkForIncoming(fn2HandleInBoundData), Source.maybe) .viaMat(KillSwitches.single)(Keep.right) .takeWhile(nothing => isSinkInUse.get) .watchTermination()((uniqueKillSwitch: UniqueKillSwitch, futureDone: Future[Done]) => { futureDone.map { result => { logThatMessage("Flow is terminated (probably disconnected by the Server or via kill switch.)", None, LogLevelIndicators.INFO, fallBackLogger, localizedLoggerFn) if (!promise2Listen.isCompleted) { promise2Listen success gBoxWebSocketProxyFlowInitStatus.FLOW_TERMINATION_DETECTED } else { logThatMessage("Promise to listen is already completed. Nothing to return here.", None, LogLevelIndicators.INFO, fallBackLogger, localizedLoggerFn) } } } recover { case ex: Throwable => { logThatMessage("Listen channel is already open.", Option(ex), LogLevelIndicators.ERROR, fallBackLogger, localizedLoggerFn) promise2Listen success gBoxWebSocketProxyFlowInitStatus.UNTRACKED_EXCEPTION_DETECTED } }

For context call for defineSinkForIncoming... returns Sink[Message, Future[Done]]!

I've seen posts on this topic in closed state, so was wondering if any one else faced similar issue?

Here's the stack trace: [default-akka.actor.default-dispatcher-16] [akka://default/user/StreamSupervisor-5/flow-0-0-unknown-operation] Error in stage [Split]: OnError(akka.stream.impl.SubscriptionTimeoutException: Substream Source has not been materialized in 5000 milliseconds) (of class akka.stream.actor.ActorSubscriberMessage$OnError) scala.MatchError: OnError(akka.stream.impl.SubscriptionTimeoutException: Substream Source has not been materialized in 5000 milliseconds) (of class akka.stream.actor.ActorSubscriberMessage$OnError) at akka.stream.impl.fusing.SubSource.failSubstream(StreamOfStreams.scala:706) at akka.stream.stage.GraphStageLogic$SubSourceOutlet.fail(GraphStage.scala:1138) at akka.stream.impl.fusing.Split$$anon$2$SubstreamHandler.onUpstreamFailure(StreamOfStreams.scala:552) at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:733) at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:616) at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:471) at akka.stream.impl.fusing.GraphInterpreterShell.receive(ActorGraphInterpreter.scala:423) at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:603) at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:618) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:529) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

johanandren commented 7 years ago

Hi @ndimensions, this repository is for abstraction discussions about Akka and not for concrete issues, in addition to that I think this sounds more like a question than an issue, so please use the mailing list or gitter channel for questions like this (found from the links if you scroll down on the root repository page: https://github.com/akka/akka) rather than the issue tracker.

If you do ask in the community and confirm if this is in fact a bug or a improvement/feature request, then please open a ticket in the Akka HTTP repository, over here: https://github.com/akka/akka-http

Thanks!

KalyanKadiyala commented 7 years ago

@johanandren thank you for pointing to right place. I've reposted the same on akka-user list.

ctataryn commented 7 years ago

This is a very meta question: I've had limited success getting people to engage on Gitter. It seems like a lot of people just idle there, but don't necessarily contribute so I stopped joining. Do others have similar experiences?


From: @Ganaakruti notifications@github.com Sent: Saturday, June 3, 2017 11:01:56 AM To: akka/akka-meta Cc: Subscribed Subject: Re: [akka/akka-meta] Akka-Http/WebSocket - Substream Source has not been materialized in 5k millis (#52)

@johanandrenhttps://github.com/johanandren thank you for pointing to right place. I've reposted the same on akka-user list.

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHubhttps://github.com/akka/akka-meta/issues/52#issuecomment-305984030, or mute the threadhttps://github.com/notifications/unsubscribe-auth/AAamB3xxSTjkmvLkepeP6PpksPUQdEPVks5sAYN0gaJpZM4Nu8Qv.

olja-xvela commented 5 years ago

@ctataryn Did you resolve this issue?

ktoso commented 5 years ago

Hi @olja-xvela, as Johan requested - please ask questions like this on the issue tracker – if you have a specific piece of code you have trouble with, please post it. In general asking on http://discuss.akka.io is also a good place, but not the "meta" forums. Thanks!