vert-x3 / vertx-mqtt

Vert.x MQTT
Apache License 2.0
185 stars 88 forks source link

MQTT client uses the main event loop to execute an SSL handshake even if it is being invoked from a worker verticle #153

Closed macjunk-ie closed 4 years ago

macjunk-ie commented 4 years ago

Scenario: I have a SSL enabled Active MQ broker. There are two types of clients: 1) A verticle that publishes a single message to the broker 2) A worker verticle that publishes a single message to the broker

Issue:

Log when run as a standard verticle:

****************** As a standard verticle! ****************** 
16:32:02.850 [vert.x-eventloop-thread-0] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkAccessible: true
16:32:02.851 [vert.x-eventloop-thread-0] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkBounds: true
16:32:02.851 [vert.x-eventloop-thread-0] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@3d355d16
16:32:02.861 [vert.x-eventloop-thread-0] DEBUG io.vertx.mqtt.impl.MqttClientImpl - Trying to connect with localhost:8883
16:32:02.887 [vert.x-eventloop-thread-0] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numHeapArenas: 32
16:32:02.887 [vert.x-eventloop-thread-0] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numDirectArenas: 32
16:32:02.887 [vert.x-eventloop-thread-0] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.pageSize: 8192
16:32:02.887 [vert.x-eventloop-thread-0] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxOrder: 11
16:32:02.887 [vert.x-eventloop-thread-0] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.chunkSize: 16777216
16:32:02.887 [vert.x-eventloop-thread-0] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.tinyCacheSize: 512
16:32:02.888 [vert.x-eventloop-thread-0] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.smallCacheSize: 256
16:32:02.888 [vert.x-eventloop-thread-0] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.normalCacheSize: 64
16:32:02.888 [vert.x-eventloop-thread-0] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedBufferCapacity: 32768
16:32:02.888 [vert.x-eventloop-thread-0] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimInterval: 8192
16:32:02.888 [vert.x-eventloop-thread-0] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimIntervalMillis: 0
16:32:02.888 [vert.x-eventloop-thread-0] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.useCacheForAllThreads: true
16:32:02.888 [vert.x-eventloop-thread-0] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedByteBuffersPerChunk: 1023
16:32:02.893 [vert.x-eventloop-thread-0] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: pooled
16:32:02.893 [vert.x-eventloop-thread-0] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 0
16:32:02.893 [vert.x-eventloop-thread-0] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.maxThreadLocalCharBufferSize: 16384
16:32:03.224 [vert.x-eventloop-thread-0] DEBUG io.netty.handler.ssl.JdkSslContext - Default protocols (JDK): [TLSv1.2, TLSv1.1, TLSv1] 
16:32:03.224 [vert.x-eventloop-thread-0] DEBUG io.netty.handler.ssl.JdkSslContext - Default cipher suites (JDK): [TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA, TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA, TLS_RSA_WITH_AES_128_GCM_SHA256, TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA, TLS_AES_128_GCM_SHA256, TLS_AES_256_GCM_SHA384]
16:32:03.254 [vert.x-eventloop-thread-0] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.processId: 24934 (auto-detected)
16:32:03.257 [vert.x-eventloop-thread-0] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.machineId: ac:de:48:ff:fe:00:11:22 (auto-detected)
16:32:03.335 [vert.x-eventloop-thread-0] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxCapacityPerThread: 4096
16:32:03.335 [vert.x-eventloop-thread-0] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxSharedCapacityFactor: 2
16:32:03.335 [vert.x-eventloop-thread-0] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.linkCapacity: 16
16:32:03.335 [vert.x-eventloop-thread-0] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.ratio: 8
16:32:03.399 [vert.x-eventloop-thread-0] DEBUG io.netty.handler.ssl.SslHandler - [id: 0xbe618f0d, L:/127.0.0.1:64689 - R:localhost/127.0.0.1:8883] HANDSHAKEN: TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA
16:32:03.409 [vert.x-eventloop-thread-0] INFO  io.vertx.mqtt.impl.MqttClientImpl - Connection with localhost:8883 established successfully
16:32:03.419 [vert.x-eventloop-thread-0] DEBUG io.vertx.mqtt.impl.MqttClientImpl - Sending packet MqttConnectMessage[fixedHeader=MqttFixedHeader[messageType=CONNECT, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=0], variableHeader=MqttConnectVariableHeader[name=MQTT, version=4, hasUserName=false, hasPassword=false, isWillRetain=false, isWillFlag=false, isCleanSession=true, keepAliveTimeSeconds=30], payload=MqttConnectPayload[clientIdentifier=7134541c-aad6-4316-bd91-b49c5d10c77b, willTopic=null, willMessage=null, userName=null, password=null]]
16:32:03.428 [vert.x-eventloop-thread-0] DEBUG io.vertx.mqtt.impl.MqttClientImpl - Incoming packet MqttConnAckMessage[fixedHeader=MqttFixedHeader[messageType=CONNACK, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=2], variableHeader=MqttConnAckVariableHeader[connectReturnCode=CONNECTION_ACCEPTED, sessionPresent=false], payload=]
16:32:03.429 [vert.x-eventloop-thread-0] DEBUG io.vertx.mqtt.impl.MqttClientImpl - Sending packet MqttPublishMessage[fixedHeader=MqttFixedHeader[messageType=PUBLISH, isDup=false, qosLevel=AT_LEAST_ONCE, isRetain=false, remainingLength=0], variableHeader=MqttPublishVariableHeader[topicName=test1/data, packetId=1], payload=UnpooledHeapByteBuf(ridx: 0, widx: 12, cap: 12/12)]
16:32:03.429 [vert.x-eventloop-thread-0] INFO  SSLClient - Published!
16:32:03.432 [vert.x-eventloop-thread-0] DEBUG io.vertx.mqtt.impl.MqttClientImpl - Incoming packet MqttPubAckMessage[fixedHeader=MqttFixedHeader[messageType=PUBACK, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=2], variableHeader=MqttMessageIdVariableHeader[messageId=1], payload=]

Log when run as a worker verticle:

****************** As a worker verticle! ****************** 
16:31:41.420 [Custom-Pool-0] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkAccessible: true
16:31:41.421 [Custom-Pool-0] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkBounds: true
16:31:41.422 [Custom-Pool-0] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@2c348430
16:31:41.434 [Custom-Pool-0] DEBUG io.vertx.mqtt.impl.MqttClientImpl - Trying to connect with localhost:8883
16:31:41.457 [Custom-Pool-0] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numHeapArenas: 32
16:31:41.457 [Custom-Pool-0] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numDirectArenas: 32
16:31:41.458 [Custom-Pool-0] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.pageSize: 8192
16:31:41.458 [Custom-Pool-0] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxOrder: 11
16:31:41.458 [Custom-Pool-0] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.chunkSize: 16777216
16:31:41.458 [Custom-Pool-0] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.tinyCacheSize: 512
16:31:41.458 [Custom-Pool-0] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.smallCacheSize: 256
16:31:41.458 [Custom-Pool-0] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.normalCacheSize: 64
16:31:41.458 [Custom-Pool-0] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedBufferCapacity: 32768
16:31:41.458 [Custom-Pool-0] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimInterval: 8192
16:31:41.458 [Custom-Pool-0] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimIntervalMillis: 0
16:31:41.458 [Custom-Pool-0] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.useCacheForAllThreads: true
16:31:41.458 [Custom-Pool-0] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedByteBuffersPerChunk: 1023
16:31:41.463 [Custom-Pool-0] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: pooled
16:31:41.463 [Custom-Pool-0] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 0
16:31:41.463 [Custom-Pool-0] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.maxThreadLocalCharBufferSize: 16384
16:31:41.765 [Custom-Pool-0] DEBUG io.netty.handler.ssl.JdkSslContext - Default protocols (JDK): [TLSv1.2, TLSv1.1, TLSv1] 
16:31:41.765 [Custom-Pool-0] DEBUG io.netty.handler.ssl.JdkSslContext - Default cipher suites (JDK): [TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA, TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA, TLS_RSA_WITH_AES_128_GCM_SHA256, TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA, TLS_AES_128_GCM_SHA256, TLS_AES_256_GCM_SHA384]
16:31:41.793 [Custom-Pool-0] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.processId: 24928 (auto-detected)
16:31:41.795 [Custom-Pool-0] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.machineId: ac:de:48:ff:fe:00:11:22 (auto-detected)
16:31:41.880 [vert.x-eventloop-thread-0] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxCapacityPerThread: 4096
16:31:41.881 [vert.x-eventloop-thread-0] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxSharedCapacityFactor: 2
16:31:41.881 [vert.x-eventloop-thread-0] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.linkCapacity: 16
16:31:41.881 [vert.x-eventloop-thread-0] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.ratio: 8
16:31:41.947 [vert.x-eventloop-thread-0] DEBUG io.netty.handler.ssl.SslHandler - [id: 0x7ee9097e, L:/127.0.0.1:64683 - R:localhost/127.0.0.1:8883] HANDSHAKEN: TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA
16:31:41.956 [Custom-Pool-1] INFO  io.vertx.mqtt.impl.MqttClientImpl - Connection with localhost:8883 established successfully
16:31:41.964 [Custom-Pool-1] DEBUG io.vertx.mqtt.impl.MqttClientImpl - Sending packet MqttConnectMessage[fixedHeader=MqttFixedHeader[messageType=CONNECT, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=0], variableHeader=MqttConnectVariableHeader[name=MQTT, version=4, hasUserName=false, hasPassword=false, isWillRetain=false, isWillFlag=false, isCleanSession=true, keepAliveTimeSeconds=30], payload=MqttConnectPayload[clientIdentifier=6267fa5a-ea2c-4201-86a5-ea77b6ea7858, willTopic=null, willMessage=null, userName=null, password=null]]
16:31:41.972 [Custom-Pool-2] DEBUG io.vertx.mqtt.impl.MqttClientImpl - Incoming packet MqttConnAckMessage[fixedHeader=MqttFixedHeader[messageType=CONNACK, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=2], variableHeader=MqttConnAckVariableHeader[connectReturnCode=CONNECTION_ACCEPTED, sessionPresent=false], payload=]
16:31:41.974 [Custom-Pool-2] DEBUG io.vertx.mqtt.impl.MqttClientImpl - Sending packet MqttPublishMessage[fixedHeader=MqttFixedHeader[messageType=PUBLISH, isDup=false, qosLevel=AT_LEAST_ONCE, isRetain=false, remainingLength=0], variableHeader=MqttPublishVariableHeader[topicName=test1/data, packetId=1], payload=UnpooledHeapByteBuf(ridx: 0, widx: 12, cap: 12/12)]
16:31:41.974 [Custom-Pool-2] INFO  SSLClient - Published!
16:31:41.988 [Custom-Pool-3] DEBUG io.vertx.mqtt.impl.MqttClientImpl - Incoming packet MqttPubAckMessage[fixedHeader=MqttFixedHeader[messageType=PUBACK, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=2], variableHeader=MqttMessageIdVariableHeader[messageId=1], payload=]

The client code:


    private static final Logger LOG = LoggerFactory.getLogger(SSLClient.class);
    public static void main(String[] args) throws InterruptedException {
        System.setProperty("vertx.logger-delegate-factory-class-name", "io.vertx.core.logging.SLF4JLogDelegateFactory");
        worker();
//        standard();
    }
    private static void worker() throws InterruptedException {
        Vertx vertx = Vertx.vertx();
        CountDownLatch latch = new CountDownLatch(1);
        LOG.info("\n\n****************** As a worker verticle! ****************** ");
        vertx.deployVerticle(PublishVerticle.class,
            new DeploymentOptions().setWorker(true).setWorkerPoolName("Custom-Pool").setWorkerPoolSize(5), res -> {
                if (res.succeeded()) {
                    latch.countDown();
                } else {
                    res.cause().printStackTrace();
                }
            });
        latch.await();
    }
    private static void standard() throws InterruptedException {
        CountDownLatch latch1 = new CountDownLatch(1);
        Vertx vertx = Vertx.vertx();
        LOG.info("\n\n****************** As a standard verticle! ****************** ");
        vertx.deployVerticle(PublishVerticle.class, new DeploymentOptions(), res -> {
            if (res.succeeded()) {
                latch1.countDown();
            } else {
                res.cause().printStackTrace();
            }
        });
        latch1.await();
    }
    static public class PublishVerticle extends AbstractVerticle {
        @Override
        public void start(Promise<Void> startFuture) throws Exception {
            MqttClient mqttClient = MqttClient.create(vertx, options());
            mqttClient.connect(8883, "localhost", result -> {
                if (result.succeeded()) {
                    mqttClient
                        .publish("test1/data", Buffer.buffer("Test Message"), MqttQoS.AT_LEAST_ONCE,
                            false, false);
                    LOG.info("Published!");
                } else {
                    result.cause().printStackTrace();
                }
                startFuture.complete();
            });
        }
    }
    private static MqttClientOptions options() {
        String privateKey = "-----BEGIN PRIVATE KEY-----\n"
            + "........."
            + "-----END PRIVATE KEY-----";
        String publicCert = "-----BEGIN CERTIFICATE-----\n"
            + "............"
            + "-----END CERTIFICATE-----\n";
        return new MqttClientOptions()
            .setSsl(true)
            .setTrustAll(true)
            .setKeyCertOptions(
                new PemKeyCertOptions()
                    .setKeyValue(Buffer.buffer(privateKey))
                    .setCertValue(Buffer.buffer(publicCert)))
            .setClientId(UUID.randomUUID().toString());
    }```

POM file dependencies:
```<dependencies>
    <dependency>
      <groupId>io.vertx</groupId>
      <artifactId>vertx-mqtt</artifactId>
      <version>3.8.5</version>
    </dependency>
    <dependency>
      <groupId>io.vertx</groupId>
      <artifactId>vertx-core</artifactId>
      <version>3.8.5</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>1.7.25</version>
    </dependency>
    <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-slf4j-impl</artifactId>
      <version>2.9.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-core</artifactId>
      <version>2.9.1</version>
    </dependency>
  </dependencies>```
Sammers21 commented 4 years ago

@macjunk-ie , i think it would be better if you create a sample application with steps to replicate the issue.

vietj commented 4 years ago

I believe it is a fair point and we should configure the underlying NetSocket to always use event-loop delivered events and then in the MQTT layer handle the worker delegation when going in the application

vietj commented 4 years ago

@macjunk-ie the SSL handshake is performed by Netty and will always be done on the event loop independently of using Vert.x client as a worker or as an event-loop verticle

vietj commented 4 years ago

When using a client from a worker verticle, then the verticle and the actual client code communicate using message passing so it's normal to have this