apache / pekko

Build highly concurrent, distributed, and resilient message-driven applications using Java/Scala
https://pekko.apache.org/
Apache License 2.0
1.13k stars 137 forks source link

Add Flow#onBackpressureDrop() operator #1363

Open He-Pin opened 1 month ago

He-Pin commented 1 month ago

Motivation:

I want to do some rate limiting and just fail the single task, eg when combined with mapAsync. Currently seems I can't observe the backpressure.

onBackpressure is only been called once downstream backpressure.

        queue = Source.<TranslateTask>queue(10240)
            .groupedWeightedWithin(
                dynamicDictionaryConfigs.getMaxBatchTokens(), //最大的字符数,1000 个以内
                dynamicDictionaryConfigs.getMaxBatchSize(), //最大的批量大小,50 个以内
                costFn, //字符数计算
                Duration.ofMillis(dynamicDictionaryConfigs.getMaxBatchIntervalInMillis())) //最大的聚合时间,比如 3ms
            .onBackpressure(task -> task.fail(...)) //
            .buffer(dynamicDictionaryConfigs.getOvsQpsLimit() * 2, OverflowStrategy.backpressure())
            .toMat(Sink.foreach(this::batchTranslate), Keep.left())
            ....
            .run(actorSystem);

What do you think?

In reactor-core, there is:

This is what I would like to make use.

He-Pin commented 3 weeks ago

Update:

@InternalApi private[pekko] trait Buffer[T] {
  def capacity: Int
  def used: Int
  def isFull: Boolean
  def isEmpty: Boolean
  def nonEmpty: Boolean

  def enqueue(elem: T): Unit
  def dequeue(): T

  def peek(): T
  def clear(): Unit
  def dropHead(): Unit
  def dropTail(): Unit
}

as the current Buffer's clear, dropHead and dropTail just returns Unit, change it to returns Seq[T] or T will help , but that can involve: 1. large amount of change across the codebase, 2. hurt performance for clear case.

So seems better to just add a new dedicated implementation with onBackPressureDrop as reactor-core?

image