vert-x3 / issues

Apache License 2.0
37 stars 7 forks source link

Messages are received in a disorderly order #635

Closed wjw465150 closed 1 year ago

wjw465150 commented 1 year ago

Questions

When there are multiple consumers at one address,Messages are received in a disorderly order!

Version

Vertx 4.4.2

Test Source Code

  1. SenderVerticle
    
    package org.wjw.vertx.eventbus;

import java.text.MessageFormat; import java.util.concurrent.TimeUnit;

import org.slf4j.Logger; import org.slf4j.LoggerFactory;

import io.vertx.core.AbstractVerticle; import io.vertx.core.Promise; import io.vertx.core.eventbus.EventBus; import io.vertx.core.eventbus.MessageProducer; import io.vertx.core.json.JsonObject;

public class SenderVerticle extends AbstractVerticle { private Logger logger;

public SenderVerticle() { this.logger = LoggerFactory.getLogger(this.getClass()); }

@Override public void start(Promise startPromise) { EventBus eventBus = vertx.eventBus();

for (int i = 0; i < 10; i++) {
  JsonObject jsonMessage = new JsonObject().put("seq", i);

  MessageProducer<Object> sender = eventBus.sender("msg_seq");
  sender.write(jsonMessage).onSuccess(v -> {
    logger.info(MessageFormat.format("send_msg: {0}", jsonMessage.getInteger("seq")));
  }).onFailure(ex -> {
    logger.info(ex.getMessage());
  });

}

vertx.setTimer(10 * 1000L, id -> {
  startPromise.complete();
});

} }


2. ConsumerVerticle
```java
package org.wjw.vertx.eventbus;

import java.text.MessageFormat;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.json.JsonObject;

public class ConsumerVerticle extends AbstractVerticle {
  //日志对象
  private Logger logger;

  public ConsumerVerticle() {
    this.logger = LoggerFactory.getLogger(this.getClass());
  }

  @Override
  public void start(Promise<Void> startPromise) {
    EventBus eventBus = vertx.eventBus();

    MessageConsumer<JsonObject> messageConsumer = eventBus.consumer("msg_seq");
    messageConsumer.handler(message -> {
      JsonObject jsonMessage = message.body();
      logger.info(MessageFormat.format("receive_msg: {0}", jsonMessage.encode()));
    });

    messageConsumer.completionHandler(v -> {
      startPromise.complete();
    });

  }

}
  1. Test Code
    
    package org.wjw.vertx.eventbus;

import java.util.Arrays; import java.util.concurrent.TimeUnit; import java.util.function.Supplier;

import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory;

import io.vertx.core.CompositeFuture; import io.vertx.core.DeploymentOptions; import io.vertx.core.Future; import io.vertx.core.Promise; import io.vertx.core.Verticle; import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; import io.vertx.junit5.RunTestOnContext; import io.vertx.junit5.Timeout; import io.vertx.junit5.VertxExtension; import io.vertx.junit5.VertxTestContext;

@ExtendWith(VertxExtension.class) @Timeout(value = 10, timeUnit = TimeUnit.MINUTES) public class TestEventBusOrder { private static Logger logger = LoggerFactory.getLogger(TestEventBusOrder.class);

static VertxOptions options = new VertxOptions(); static { long blockedThreadCheckInterval = 60 60 1000L; if (System.getProperties().getProperty("vertx.options.blockedThreadCheckInterval") != null) { blockedThreadCheckInterval = Long.valueOf(System.getProperties().getProperty("vertx.options.blockedThreadCheckInterval")); } options.setBlockedThreadCheckInterval(blockedThreadCheckInterval); }

@RegisterExtension static RunTestOnContext rtoc = new RunTestOnContext(options);

@BeforeAll public static void setUp() { // }

@AfterAll public static void endUp() { rtoc.vertx().close(); }

@Test @Timeout(value = 10, timeUnit = TimeUnit.MINUTES) void verticle_deployed(VertxTestContext testContext) throws Throwable {

Promise<String> startPromise = Promise.promise();

Vertx vertx = rtoc.vertx();

DeploymentOptions serverDeploymentOptions = new DeploymentOptions()
    .setInstances(2);
DeploymentOptions dataDeploymentOptions   = new DeploymentOptions()
    .setInstances(1);

CompositeFuture
    .all(Arrays.asList(
        deployVerticleIfNeeded(vertx, () -> new ConsumerVerticle(), serverDeploymentOptions),
        deployVerticleIfNeeded(vertx, () -> new SenderVerticle(), dataDeploymentOptions)
    ))
    .onSuccess(compositeFuture -> {
      startPromise.complete();
      logger.info("Start OK");

      testContext.completeNow();
    })
    .onFailure(throwable -> {
      logger.error("Start Failure!  ", throwable);
      startPromise.fail(throwable);

      testContext.failNow(throwable);          
    });

}

private Future deployVerticleIfNeeded(Vertx vertx, Supplier verticleSupplier, DeploymentOptions options) { if (options.getInstances() == 0) { return Future.succeededFuture(""); } else { return vertx.deployVerticle(verticleSupplier, options); } }

}


4. log out

2023-06-11 13:03:11.706 [vert.x-eventloop-thread-3] INFO org.wjw.vertx.eventbus.SenderVerticle - [lambda$0,33] - send_msg: 0 2023-06-11 13:03:11.709 [vert.x-eventloop-thread-3] INFO org.wjw.vertx.eventbus.SenderVerticle - [lambda$0,33] - send_msg: 1 2023-06-11 13:03:11.710 [vert.x-eventloop-thread-3] INFO org.wjw.vertx.eventbus.SenderVerticle - [lambda$0,33] - send_msg: 2 2023-06-11 13:03:11.710 [vert.x-eventloop-thread-3] INFO org.wjw.vertx.eventbus.SenderVerticle - [lambda$0,33] - send_msg: 3 2023-06-11 13:03:11.711 [vert.x-eventloop-thread-3] INFO org.wjw.vertx.eventbus.SenderVerticle - [lambda$0,33] - send_msg: 4 2023-06-11 13:03:11.711 [vert.x-eventloop-thread-3] INFO org.wjw.vertx.eventbus.SenderVerticle - [lambda$0,33] - send_msg: 5 2023-06-11 13:03:11.711 [vert.x-eventloop-thread-3] INFO org.wjw.vertx.eventbus.SenderVerticle - [lambda$0,33] - send_msg: 6 2023-06-11 13:03:11.712 [vert.x-eventloop-thread-3] INFO org.wjw.vertx.eventbus.SenderVerticle - [lambda$0,33] - send_msg: 7 2023-06-11 13:03:11.712 [vert.x-eventloop-thread-3] INFO org.wjw.vertx.eventbus.SenderVerticle - [lambda$0,33] - send_msg: 8 2023-06-11 13:03:11.713 [vert.x-eventloop-thread-3] INFO org.wjw.vertx.eventbus.SenderVerticle - [lambda$0,33] - send_msg: 9 2023-06-11 13:03:11.740 [vert.x-eventloop-thread-1] INFO org.wjw.vertx.eventbus.ConsumerVerticle - [lambda$0,29] - receive_msg: {"seq":0} 2023-06-11 13:03:11.740 [vert.x-eventloop-thread-2] INFO org.wjw.vertx.eventbus.ConsumerVerticle - [lambda$0,29] - receive_msg: {"seq":1} 2023-06-11 13:03:11.741 [vert.x-eventloop-thread-2] INFO org.wjw.vertx.eventbus.ConsumerVerticle - [lambda$0,29] - receive_msg: {"seq":3} 2023-06-11 13:03:11.741 [vert.x-eventloop-thread-1] INFO org.wjw.vertx.eventbus.ConsumerVerticle - [lambda$0,29] - receive_msg: {"seq":2} 2023-06-11 13:03:11.741 [vert.x-eventloop-thread-2] INFO org.wjw.vertx.eventbus.ConsumerVerticle - [lambda$0,29] - receive_msg: {"seq":5} 2023-06-11 13:03:11.741 [vert.x-eventloop-thread-2] INFO org.wjw.vertx.eventbus.ConsumerVerticle - [lambda$0,29] - receive_msg: {"seq":7} 2023-06-11 13:03:11.741 [vert.x-eventloop-thread-1] INFO org.wjw.vertx.eventbus.ConsumerVerticle - [lambda$0,29] - receive_msg: {"seq":4} 2023-06-11 13:03:11.741 [vert.x-eventloop-thread-2] INFO org.wjw.vertx.eventbus.ConsumerVerticle - [lambda$0,29] - receive_msg: {"seq":9} 2023-06-11 13:03:11.741 [vert.x-eventloop-thread-1] INFO org.wjw.vertx.eventbus.ConsumerVerticle - [lambda$0,29] - receive_msg: {"seq":6} 2023-06-11 13:03:11.741 [vert.x-eventloop-thread-1] INFO org.wjw.vertx.eventbus.ConsumerVerticle - [lambda$0,29] - receive_msg: {"seq":8} 2023-06-11 13:03:21.715 [vert.x-eventloop-thread-0] INFO org.wjw.vertx.eventbus.TestEventBusOrder - [lambda$2,77] - Start OK



### Extra

* Anything that can be relevant such as OS version, JVM version
vietj commented 1 year ago

The only order we can guarantee are messages received on the same event-loop.

When messages are processed on different event-loops the order is not guaranteed, e.g an event loop might have to do other work before it can deliver the message to the handler, so there is always a random effect that will make you observe messages received in a different order on different event loops.