vert-x3 / vertx-lang-kotlin

Vert.x for Kotlin
Apache License 2.0
296 stars 68 forks source link

Coroutine dispatcher doesn't execute commands on the duplicated context #234

Closed tsegismont closed 1 year ago

tsegismont commented 1 year ago

Reproducer: (failing test)

  @Test
  fun `coroutine dispatcher executes commands on duplicated context`(testContext: TestContext) {
    val latch = testContext.async()
    val context = (vertx as VertxInternal).getOrCreateContext()
    val duplicatedContext = context.duplicate()
    duplicatedContext.runOnContext {
      duplicatedContext.putLocal("foo", "bar")
      GlobalScope.launch(context.dispatcher()) {
        delay(100)
        testContext.assertEquals(duplicatedContext, ContextInternal.current())
        testContext.assertEquals(duplicatedContext.getLocal("foo"), "bar")
        latch.complete()
      }
    }
  }

This is annoying in particular when the duplicated context holds local data (e.g. tracing or logging).

tsegismont commented 1 year ago

Fixed by #236

bbyk commented 1 year ago

@tsegismont I think you have introduced a bug. With Vert.x you want to set handlers directly in the callbacks, not postponed. If you're on a duplicatedContext then ContextInternal.current()?.unwrap() returns the original and

        if (current != vertxContext || !current.inThread()) {
            vertxContext.runOnContext { command.run() }

always visits the true branch of the if statement and always postpones with vertxContext.runOnContext.

Consider the following test:

  @Test
  fun `this test never completes`(testContext: TestContext) {
    val latch = testContext.async()
    val context = (vertx as VertxInternal).getOrCreateContext()
    val duplicatedContext = context.duplicate()
    val httpClient = vertx.createHttpClient()
    duplicatedContext.runOnContext {
      GlobalScope.launch(Vertx.currentContext().dispatcher()) {
        val resp = httpClient.request(RequestOptions().setMethod(HttpMethod.GET).setAbsoluteURI("https://example.com"))
          .await().apply { end().await() }
          .response()
          .await()
        val body = resp.body().await().toString()
        latch.complete()
      }
    }
  }

With the current fix in place the test never completes. This is because when you're executing the resp.body().await() slice of the coroutine you're already out of the callback for response.await(). Basically in that callback you ran vertxContext.runOnContext { command.run() } instead.

I think what you might have intended was this implementation along these lines:

    override fun execute(command: Runnable) {
        val current = ContextInternal.current()
        //
        // if exactly the same context or duplicate of the dispatcher's and matches context's designated thread mode
        // execute inline thus guarantee that vert.x callback sets all further handlers
        //
        if ((current == vertxContext || current?.unwrap() == vertxContext) && current.inThread()) {
            command.run()
        } else {
            vertxContext.runOnContext { command.run() }
        }
    }
tsegismont commented 1 year ago

Thank you @bbyk , I've filed #243

I had only considered dispatchers created for a CoroutineVerticle context, which is the usual case.