akka / alpakka

Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
https://doc.akka.io/libraries/alpakka/current/
Other
1.26k stars 644 forks source link

Not systematic StatusRuntimeException with error code INTERNAL_ERROR #1854

Closed ivan71kmayshan27 closed 4 years ago

ivan71kmayshan27 commented 5 years ago

Versions used

Akka version: 2.5.23 Alpakka version: 1.1.0 Scala version: 2.12.8 JDK: openjdk:11.0.2

Expected Behavior

Alpakka for google pub/sub successfuly performs non-batch(single) publish/pull operations without failing, with retry on internal errors.

Actual Behavior

Both alpakka publisher(GooglePubSub.publish) and subscriber(GooglePubSub.subscribe) under unclear circumstances produce error that crash the stream with error listed in logs section. Errors emerge in non-systematic manner after certain lapse of time of their normal function.

Relevant logs

If you identified a section in your logs that explains the bug or might be important to understand it, please add it.

Restarting graph due to failure. stack_trace: 
io.grpc.StatusRuntimeException: INTERNAL: HTTP/2 error code: INTERNAL_ERROR
Received Rst Stream
        at io.grpc.Status.asRuntimeException(Status.java:533)
        at akka.grpc.internal.AkkaNettyGrpcClientGraphStage$$anon$1.onCallClosed(AkkaNettyGrpcClientGraphStage.scala:170)
        at akka.grpc.internal.AkkaNettyGrpcClientGraphStage$$anon$1.$anonfun$callback$1(AkkaNettyGrpcClientGraphStage.scala:74)
        at akka.grpc.internal.AkkaNettyGrpcClientGraphStage$$anon$1.$anonfun$callback$1$adapted(AkkaNettyGrpcClientGraphStage.scala:70)
        at akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:452)
        at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:481)
        at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:581)
        at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:749)
        at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:764)
        at akka.actor.Actor.aroundReceive(Actor.scala:539)
        at akka.actor.Actor.aroundReceive$(Actor.scala:537)
        at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:671)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612)
        at akka.actor.ActorCell.invoke(ActorCell.scala:581)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
        at akka.dispatch.Mailbox.run(Mailbox.scala:229)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:241)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Reproducible Test Case

Due to my NDA I can't publish the code which yields this behaviour. Attempts to create unrelated to original sources reproduction code were unsuccessfull.

ivan71kmayshan27 commented 5 years ago

Probably, this code sample reproduces the issue. Still, I'm unable to extract the exception from this particular code sample. It runs for near ~30-35k messages and then refuses to work.

build.sbt

lazy val reproduce = (project in file("."))
  .settings(commonSettings: _*)
  .settings(
    name := "reproduce",
    libraryDependencies ++= Seq(
      library.akkaActor       % Compile,
      library.circeGeneric    % Compile,
      library.circeParser     % Compile,
      library.scalaLogging    % Compile,
      library.logback         % Runtime,
      library.grpcLogback     % Runtime,
      library.alpakkaPubSubRpc  % Compile,
      library.scalaCache        % Compile,
    ),
  )
lazy val library = new {
  object Version {
    val akka            = "2.5.23"
    val cats            = "1.1.0"
    val circe           = "0.11.1"
    val shapeless       = "2.3.3"
    val logback         = "1.2.3"
    val scalaLogging    = "3.9.0"
    val alpakka         = "1.1.0"
    val grpcLogback     = "0.92.0-alpha"
    val scalaCache      = "0.28.0"
  }

  val akkaActor         = "com.typesafe.akka"           %% "akka-actor"                                     % Version.akka
  val cats              = "org.typelevel"               %% "cats-core"                                      % Version.cats
  val circeGeneric      = "io.circe"                    %% "circe-generic"                                  % Version.circe
  val circeParser       = "io.circe"                    %% "circe-parser"                                   % Version.circe
  val logback           = "ch.qos.logback"              % "logback-classic"                                 % Version.logback
  val scalaLogging      = "com.typesafe.scala-logging"  %% "scala-logging"                                  % Version.scalaLogging
  val alpakkaPubSubRpc  = "com.lightbend.akka"          %% "akka-stream-alpakka-google-cloud-pub-sub-grpc"  % Version.alpakka
  val grpcLogback       = "com.google.cloud"            % "google-cloud-logging-logback"                    % Version.grpcLogback
  val scalaCache        = "com.github.cb372"            %% "scalacache-caffeine"                            % Version.scalaCache
}

lazy val commonSettings = Seq(
  scalaVersion := "2.12.7",
  scalacOptions ++= Seq(
    "-unchecked",
    "-deprecation",
    "-language:_",
    "-encoding",
    "UTF-8",
    "-Xfatal-warnings",
    "-Yno-adapted-args",
    "-Ywarn-inaccessible",
    "-Ywarn-infer-any",
    "-Ywarn-nullary-override",
    "-Ywarn-nullary-unit",
    "-Ywarn-unused-import",
    "-Ypartial-unification",
    "-Xmacro-settings:materialize-derivations",
  ),
  scalacOptions in (Compile, console) ~= {
    _ filterNot (_ == "-Ywarn-unused-import")
  },
  javacOptions ++= Seq(
    "-source", "11",
    "-target", "11"
  ),
  javaOptions ++= Seq(
    "-J-Xms1g",
    "-J-Xmx10g",
    "-J-Djdk.tls.client.protocols=\"TLSv1,TLSv1.1,TLSv1.2\""
  ),
  cancelable in Global := true,
  resolvers += "dnvriend" at "http://dl.bintray.com/dnvriend/maven",
  resolvers += Resolver.jcenterRepo,
  resolvers += Resolver.sonatypeRepo("snapshots"),
  resolvers += "Typesafe repository releases" at "http://repo.typesafe.com/typesafe/releases/",
)

code

package com.example.reproduction

import java.io.PrintWriter
import java.util.concurrent.TimeUnit

import akka.stream.alpakka.googlecloud.pubsub.grpc.scaladsl.GooglePubSub
import akka.NotUsed
import akka.actor.{Actor, ActorSystem, Props}
import akka.stream._
import akka.stream.scaladsl.{Flow, Sink, Source}
import com.github.benmanes.caffeine.cache._
import com.google.protobuf.ByteString
import com.google.pubsub.v1.pubsub._
import io.circe._
import io.circe.generic.semiauto._
import scalacache.caffeine.CaffeineCache
import scalacache.Entry

import scala.concurrent.duration.FiniteDuration
import scala.util.Random
import scala.concurrent.duration._

object reproduce extends App{

  object Foo{
    implicit val encoder: Encoder[Foo] = deriveEncoder
    implicit val decoder: Decoder[Foo] = deriveDecoder
  }
  case class Foo(id: Int, dummyPayload: String, bar: Bar)

  object Bar {
    implicit val encoder: Encoder[Bar] = deriveEncoder
    implicit val decoder: Decoder[Bar] = deriveDecoder
  }
  case class Bar(dummyPayload: String)

  def randomString(len: Int): String = Random.nextString(len)

  def randomFoo(id: Int): Foo = Foo(id, randomString(100), Bar(randomString(200)))

  case class RegisterFoo(foo: Foo)
  case class DeliverPubsusbFoo(foo: Foo)

  case class AckCacheConfig(
    minimumSize: Int = 100,
    maximumSize: Int = 500,
    recordStats: Boolean = false,
    timeToLive: FiniteDuration = 3 minutes,
  )

  case class AckActorEntry(
    fooRegistered: Option[Foo],
    fooReceived: Option[Foo],
  ) {
    def id: Int = fooReceived map {_.id} getOrElse (fooRegistered map {_.id} getOrElse -1)
    def isComplete(implicit actorSystem: ActorSystem): Boolean =
      (for {
        foo1 <- fooRegistered
        foo2 <- fooReceived
      } yield foo1 == foo2) exists logIncorrect

    private def logIncorrect(boolean: Boolean)(implicit actorSystem: ActorSystem): Boolean = {
      if(!boolean){
        actorSystem.log.error(s"Non equal: expected [$fooRegistered], got: [$fooReceived]")
      }
      boolean
    }
  }

  def setupCache(
    removalListner: RemovalListener[String, Entry[AckActorEntry]],
    config: AckCacheConfig = AckCacheConfig(),
  ): CaffeineCache[AckActorEntry] = {
    val underlying =
      Caffeine.newBuilder initialCapacity
        config.minimumSize maximumSize
        config.maximumSize expireAfterWrite
        (config.timeToLive.toMillis, TimeUnit.MILLISECONDS) removalListener // must be last method call according to the docs
        removalListner

    val underlyingWithOrWithoutStats = if (config.recordStats) underlying.recordStats else underlying
    CaffeineCache apply underlyingWithOrWithoutStats.build[String, Entry[AckActorEntry]]
  }

  def buildRemovalListener(
    implicit
    actorSystem: ActorSystem,
  ): RemovalListener[String, Entry[AckActorEntry]] =
    (_: String, value: Entry[AckActorEntry], cause: RemovalCause) => cause match {
      case RemovalCause.EXPLICIT =>
        actorSystem.log.info(s"Foo processing successful: ${value.value.id}")

      case RemovalCause.REPLACED => ()
      case RemovalCause.COLLECTED => ()
      case RemovalCause.EXPIRED =>
        actorSystem.log.error(s"block ack failed due to expiration: ${value.value}")

      case RemovalCause.SIZE =>
        actorSystem.log.error(s"block ack failed due to size: ${value.value}")

    }

  class VerifyActor(implicit
    actorSystem: ActorSystem,
    caffeineCache: CaffeineCache[AckActorEntry],
    config: AckCacheConfig,
  ) extends Actor {
    import scalacache.modes.sync._

    private def keyfunc(foo: Foo): String = foo.id.toString

    override def receive: Receive = {
      case RegisterFoo(foo) =>
        val key = keyfunc(foo)
        val fooEntry = (caffeineCache doGet keyfunc(foo) map { _.copy(fooRegistered = Some(foo))}) getOrElse AckActorEntry(Some(foo), None)
        if(fooEntry.isComplete)
          caffeineCache doRemove key
        else
          caffeineCache.doPut(key, fooEntry, Some(config.timeToLive))

      case DeliverPubsusbFoo(foo) =>
        val key = keyfunc(foo)
        val fooEntry = (caffeineCache doGet keyfunc(foo) map { _.copy(fooReceived = Some(foo))} ) getOrElse AckActorEntry(None, Some(foo))
        if(fooEntry.isComplete)
          caffeineCache doRemove key
        else
          caffeineCache.doPut(key, fooEntry, Some(config.timeToLive))
    }
  }

  import io.circe.syntax._

  implicit val actorSystem = ActorSystem()
  implicit val actorMaterializer = ActorMaterializer()
  val topic: String = ???
  val subscription: String = ???
  implicit val config = AckCacheConfig()
  val rl = buildRemovalListener
  implicit val cache = setupCache(rl, config)
  val ackActor = actorSystem.actorOf(Props(new VerifyActor))

  implicit def foo2PM(foo: Foo):PubsubMessage = {
    new PubsubMessage(ByteString.copyFromUtf8(foo.asJson.noSpaces))
  }

  val publish: Sink[Foo, NotUsed] = (Flow[Foo] map {x => PublishRequest(topic, Seq(foo2PM(x)))}) via GooglePubSub.publish(10) to Sink.ignore
  val out = new PrintWriter(System.out)

  //Rate at which issue emerges
  val publishMessages =
    (Source.tick(0 second, 50 millis, ()) statefulMapConcat(() => {
      var i = 0
      x: Unit => {
        i+=1
        randomFoo(i) :: Nil
      }
    }) map {x => ackActor ! RegisterFoo(x); x}) to publish

  val consumeMessages =
    GooglePubSub.subscribe(
      StreamingPullRequest()
        .withSubscription(subscription)
        .withStreamAckDeadlineSeconds(20),
      1 second
    ) via Flow[ReceivedMessage].map{ x =>
      import io.circe.parser._
      val v = DeliverPubsusbFoo((parse(x.message.get.data.toStringUtf8) flatMap { _.as[Foo]}).right.get)
      ackActor ! v
      AcknowledgeRequest().withSubscription(subscription).withAckIds(Seq(x.ackId))
    } to GooglePubSub.acknowledge(10)

  //just for convenience
  val exhaustValve =
    GooglePubSub.subscribe(
      StreamingPullRequest()
        .withSubscription(subscription)
        .withStreamAckDeadlineSeconds(20),
      1 second
    ) via Flow[ReceivedMessage].map{ x =>
      AcknowledgeRequest().withSubscription(subscription).withAckIds(Seq(x.ackId))
    } to GooglePubSub.acknowledge(10)

  publishMessages.withAttributes(ActorAttributes.withSupervisionStrategy{ x =>
    println(x.getMessage)
    println(x.printStackTrace(out))
    Supervision.Stop
  }).run()

  consumeMessages.withAttributes(ActorAttributes.withSupervisionStrategy{ x =>
    println(x.getMessage)
    println(x.printStackTrace(out))
    Supervision.Stop
  }).run()

//  exhaustValve.run()

}
2m commented 5 years ago

Interesting. HTTP/2 error code: INTERNAL_ERROR means a protocol level error. Similar errors have been reported some time ago on the grpc-java issue tracker: https://github.com/grpc/grpc-java/issues/2901

I do not think we can do much here, as were are the users of the grpc libraries, than to keep those libraries up to date as much as possible.

ivan71kmayshan27 commented 5 years ago

Ok, if it is redirection to grpc-java devs, will then open an issue at the grpc-java issue tracker.

ennru commented 4 years ago

We can't do more about this then already stated in https://github.com/grpc/grpc-java/issues/6041#issuecomment-519713602 Closing.