sky-uk / kafka-message-scheduler

Scheduler for low-frequency and long-term scheduling of delayed messages to Kafka topics.
BSD 3-Clause "New" or "Revised" License
32 stars 6 forks source link

PublisherActor - Failed to enqueue errors #89

Open pbuz opened 4 years ago

pbuz commented 4 years ago

We are trying to use KMS to schedule messages from a topic on which we produce 500 messages/second. Unfortunately our test fails while using KMS version 0.22.0 of the Docker image and the pod starts to restart and we can see this error pouring in the logs:

[2019-12-23 14:02:36,814] alto-kms ERROR [kafka-message-scheduler-akka.actor.default-dispatcher-21] com.sky.kms.actors.PublisherActor - Failed to enqueue 8450874c-a1a2-4895-9fc1-4f098d699a38
java.lang.IllegalStateException: You have to wait for the previous offer to be resolved to send another request
    at akka.stream.impl.QueueSource$$anon$1.bufferElem(QueueSource.scala:101)
    at akka.stream.impl.QueueSource$$anon$1.$anonfun$callback$1(QueueSource.scala:114)
    at akka.stream.impl.QueueSource$$anon$1.$anonfun$callback$1$adapted(QueueSource.scala:108)
    at akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:452)
    at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:481)
    at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:581)
    at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:749)
    at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:764)
    at akka.actor.Actor.aroundReceive(Actor.scala:539)
    at akka.actor.Actor.aroundReceive$(Actor.scala:537)
    at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:671)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612)
    at akka.actor.ActorCell.invoke_aroundBody0(ActorCell.scala:581)
    at akka.actor.ActorCell$AjcClosure1.run(ActorCell.scala:1)
    at org.aspectj.runtime.reflect.JoinPointImpl.proceed(JoinPointImpl.java:149)
    at akka.kamon.instrumentation.ActorMonitors$$anon$1.$anonfun$processMessage$1(ActorMonitor.scala:134)
    at kamon.Kamon$.withContext(Kamon.scala:120)
    at akka.kamon.instrumentation.ActorMonitors$$anon$1.processMessage(ActorMonitor.scala:134)
    at akka.kamon.instrumentation.ActorCellInstrumentation.aroundBehaviourInvoke(ActorInstrumentation.scala:45)
    at akka.actor.ActorCell.invoke(ActorCell.scala:574)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
    at akka.dispatch.Mailbox.run(Mailbox.scala:229)
    at kamon.executors.Executors$InstrumentedExecutorService$$anon$7.run(Executors.scala:270)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:49)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

We would like to mention that we were able to run our test successfully for 250 messages/second.

manuelcueto commented 4 years ago

Thanks. We'll take a look.

pbuz commented 4 years ago

Hi @manuelcueto, do you have any updates on this?

manuelcueto commented 4 years ago

Some thoughts on it:

 private def receiveWithQueue(queue: ScheduleQueue): Receive = {
    case Trigger(scheduleId, schedule) =>
      queue.offer((scheduleId, messageFrom(schedule))) onComplete {
        case Success(QueueOfferResult.Enqueued) =>
          log.debug(ScheduleQueueOfferResult(scheduleId, QueueOfferResult.Enqueued).show)
        case Success(res) =>
          log.warning(ScheduleQueueOfferResult(scheduleId, res).show)
        case Failure(t) =>
          log.error(t, s"Failed to enqueue $scheduleId")
          self ! DownstreamFailure(t)
      }
  }

This is the conflicting piece of code, where we offer to the queue and we get a Future back which is the way akka handles backpressure. The future will not complete until the buffer can hold another element. since we're not waiting for the future to complete here, if we call offer again while the buffer it's full, the queue will fail and we're currently not handling that gracefully. A solution would be to context become to another receive which will wait for completion while stashing incoming requests, and once it's completed unstash them and go back to the 'available' state

pbuz commented 4 years ago

@manuelcueto we played a bit with the value of the scheduler.publisher.queue-buffer-size to see whether we can have this passing, but it is still failing for 500 messages/second.

lacarvalho91 commented 4 years ago

have you tried setting the buffer size to max int? i believe thats how it is configured in MAP @manuelcueto

paulaylingdev commented 4 years ago

@pbuz could you post your deployment configuration here where you are seeing the issues?