eclipse-vertx / vert.x

Vert.x is a tool-kit for building reactive applications on the JVM
http://vertx.io
Other
14.25k stars 2.06k forks source link

Class Gateway EventBus address → BlockingQueue #4837

Open magicprinc opened 1 year ago

magicprinc commented 1 year ago

If one need to process Messages from EventBus in batches in blocking code, he has to implement quite complex code with hand-made batch (ArrayList) creation, timer/timeout and blocking code execution.

It would be great to have an alternative implementation of MessageConsumerImpl with less "layers" (aka Gateway Class) for message-batching purposes.

See proof of concept with blazing speed and low memory consumption! https://github.com/magicprinc/vert.x/commit/3de41f0bb069c230b8fe27987f18a2ecbcad8304

vietj commented 1 year ago

what is wrong with using a consumer and fetching from it by batches using fetch(n) to fill the blocking queue, given that you know the queue capacity and will never block ?

magicprinc commented 1 year ago

My initial wish was actually to have batches in message consumer.

But all solutions look more complicated and with an unnecessary timeout. Nothing close to:

  public static <T> int lazyDrainTo2 (BlockingQueue<T> fromQ, Collection<? super T> toC, int maxBatchSize, int waitTimeoutMills) throws InterruptedException {
    if (maxBatchSize > 1){
      int n = fromQ.drainTo(toC, maxBatchSize);
      if (n > 0){ return n;}// non empty queue, take and run!
    }
    // the queue is empty: wait but as little as possible: instant reaction
    T item = fromQ.poll(waitTimeoutMills, TimeUnit.MILLISECONDS);
    if (item != null){
      toC.add(item);
      return 1;
    }
    //else still nothing
    return 0;
  }

Probably MessageConsumerImpl#pending could be not private?...

So I ended with BlockingQueue and good-old Thread to drain batches and insert into database.

But then MessageConsumerImpl's extra steps to run code in proper thread are in vain, so I made this class