vert-x3 / vertx-lang-kotlin

Vert.x for Kotlin
Apache License 2.0
293 stars 67 forks source link

Implement a Vert.x Web Router that provides support for suspending functions #253

Closed tsegismont closed 11 months ago

tsegismont commented 11 months ago

See #194

This router implements CoroutineScope to define a scope for new coroutines. Typically, this is the scope of a CoroutineVerticle.

tsegismont commented 11 months ago

@bbyk would you mind taking a look at this one?

sanyarnd commented 11 months ago

I can add more context from our experience with coroutines and point out possible challenges.

  1. Tracing is hard, especially if you bridge vertx to coroutines and back, e.g. vertx -> cohandler -> Dispatchers.IO -> HTTP/SQL client -> back to vertx. It's somewhat confusing, it is hard make right and it has limitations (for example, ctx.fail).
  2. Logging is also somewhat hard, because you need correlation traces dumped, but logging libraries are colorless, so they don't know about coroutines. It can be solved via custom ThreadContextElement (e.g. MDCThreadContext), but Vertx uses it's own variant of MDC (local data) and you need to connect them together.

For example, we're wrapping all client calls in "proxies" (transparent for user via custom CoroutineWebClient class with suspend functions):

suspend fun <T> onVertxContext(vertx: Vertx, block: suspend () -> T): T {
    val ctx = coroutineContext.tracingContext()
    val vertxCtx = when (val currentContext = Vertx.currentContext()) {
        null -> (vertx as VertxInternal).getOrCreateContext()
        else -> currentContext
    }.dispatcher()

    return withContext(vertxCtx) {
        VertxTracingUtils.setContext(ctx)
        try {
            block()
        } finally {
            VertxTracingUtils.clearContext()
        }
    }
}

Otherwise vertx HTTP/SQL client will not propagate context.

There's VertxTracer interface that provides receive/send methods, but it's, again, colorless, and the only way to pass traces is local data (not MDC):

    @Override
    public <R> Span sendRequest(
        final Context context,
        final SpanKind kind,
        final TracingPolicy policy,
        final R request,
        final String operation,
        final BiConsumer<String, String> headers,
        final TagExtractor<R> tagExtractor
    ) {
        if (TracingPolicy.IGNORE.equals(policy) || request == null) {
            return null;
        }

        io.opentelemetry.context.Context propagatedContext = context.getLocal(ACTIVE_CONTEXT);

        // ...

Point is, adding coroutine support is not so easy as adding router, and if done w/o preparations it'll make code non-compatible with other modules :( Coroutines like a plague, you need to color a lot of code, otherwise they don't work as expected.

bbyk commented 11 months ago

@tsegismont I am curious if you considered going extension route. Basically, for the coHandler you can do something like

class CoRoute(scope: CoroutineScope) : CoroutineScope by scope {
  fun Route.coHandler(fn: suspend (RoutingContext) -> Unit) = handler {
      launch {
          try {
              fn(it)
          } catch (e: Exception) {
              it.fail(e)
          }
      }
  }
}

So then you can use with block to have both receivers e.g.

val scope = CoroutineScope(vertx.orCreateContext.dispatcher())
with(CoRoute(scope)) {
  Router.router(vertx).apply {
    get("/somePath").coHandler({ ... })
    get("/someOtherPath").coHandler({ ... })
    ...// more routes
  }
}

Could be less code this way

tsegismont commented 11 months ago

@bbyk thanks for the tip, I thought about this then:

fun withCoroutineSupport(
  vertx: Vertx,
  scope: CoroutineScope,
  block: CoroutineVertxSupport.() -> Unit
) {
  val coroutineSupport = object : CoroutineVertxSupport {
    override fun getVertx() = vertx
    override val coroutineContext = scope.coroutineContext
  }
  with(coroutineSupport) { block() }
}

interface CoroutineVertxSupport : CoroutineScope {

  fun getVertx(): Vertx

  fun Router.coErrorHandler(statusCode: Int, errorHandler: suspend (RoutingContext) -> Unit): Router =
    errorHandler(statusCode) {
      launch {
        try {
          errorHandler(it)
        } catch (e: Exception) {
          it.fail(e)
        }
      }
  }

  fun Route.coHandler(fn: suspend (RoutingContext) -> Unit): Route = handler {
    launch {
      try {
        fn(it)
      } catch (e: Exception) {
        it.fail(e)
      }
    }
  }

  fun Route.coFailureHandler(fn: suspend (RoutingContext) -> Unit): Route = failureHandler {
    launch {
      try {
        fn(it)
      } catch (e: Exception) {
        it.fail(e)
      }
    }
  }

  fun <T> Route.coRespond(fn: suspend (RoutingContext) -> T): Route = respond {
    val vertx = it.vertx() as VertxInternal
    val promise = vertx.promise<T>()
    launch {
      try {
        promise.complete(fn.invoke(it))
      } catch (e: Exception) {
        promise.fail(e)
      }
    }
    promise.future()
  }
}

We can make CoroutineVerticle implement CoroutineVertxSupport:

abstract class CoroutineVerticle : Verticle, CoroutineVertxSupport

And we still get a good UX when implementing the webapp:

class TestVerticle : CoroutineVerticle() {

  @Volatile
  var actualPort: Int = 0

  override suspend fun start() {
    val router = Router.router(vertx)
    router.coErrorHandler(404) { rc ->
      delay(100)
      rc.response().setStatusCode(404).end("Too bad...")
    }
    router.route().handler { rc ->
      rc.put("capturedContext", ContextInternal.current())
      rc.next()
    }
    router.get("/suspendingHandler").coHandler { rc ->
      delay(100)
      val current = ContextInternal.current()
      if (current.isDuplicate && current == rc.get("capturedContext")) {
        rc.end()
      } else {
        rc.fail(500)
      }
    }
    router.get("/suspendingRespond").coRespond { rc ->
      delay(100)
      val current = ContextInternal.current()
      if (!current.isDuplicate || current != rc.get("capturedContext")) {
        throw RuntimeException()
      }
      "foobar"
    }
    router.get("/suspendingFailureHandler").coHandler { it.fail(RuntimeException()) }.coFailureHandler { rc ->
      delay(100)
      val current = ContextInternal.current()
      if (current.isDuplicate && current == rc.get("capturedContext")) {
        rc.end("baz")
      } else {
        rc.response().setStatusCode(500).end()
      }
    }
    val externalRouteHandler = ExternalRouteHandler()
    router.get("/externalRoute").coHandler { externalRouteHandler.handle(it) }
    router.route("/parent/*").subRouter(createSubRouter(vertx, this))
    val httpServer = vertx.createHttpServer()
      .requestHandler(router)
      .listen(0)
      .await()
    actualPort = httpServer.actualPort()
  }
}

class ExternalRouteHandler {
  suspend fun handle(rc: RoutingContext) {
    delay(100)
    val current = ContextInternal.current()
    if (!current.isDuplicate || current != rc.get("capturedContext")) {
      rc.fail(500)
    }
    rc.end("someone kicked the ball")
  }
}

suspend fun createSubRouter(vertx: Vertx, scope: CoroutineScope): Router {
  val router = Router.router(vertx)
  withCoroutineSupport(vertx, scope) {
    router.get("/child").coRespond { rc ->
      delay(100)
      val current = ContextInternal.current()
      if (!current.isDuplicate || current != rc.get("capturedContext")) {
        throw RuntimeException()
      }
      "Hello, IT"
    }
  }
  return router
}

It's very similar to your proposal except that:

tsegismont commented 11 months ago

Otherwise vertx HTTP/SQL client will not propagate context.

@sanyarnd this may be solved #234

bbyk commented 11 months ago

@tsegismont It is great! A couple of minor notes:

  1. I know I used e: Exception myself in my excerpt but maybe it's safer to use
      } catch (e: Throwable) {
        it.fail(e)
      }
  2. For coRespond you might be able to re-use the vertxFuture fn from VertxCoroutine.kt ( off-topic: I like the ergonomics of https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-guava/kotlinx.coroutines.guava/future.html more and having similarly modeled - extension on CoroutineScope plus a start param - API instead of / along with vertxFuture would be great )
bbyk commented 11 months ago

@tsegismont

  1. In the same vein, you might want to consider withCoroutineSupport to be an extension on CoroutineScope e.g.
    
    fun CoroutineScope.withCoroutineSupport(
    vertx: Vertx,
    block: CoroutineVertxSupport.() -> Unit
    ) {
    val coroutineSupport = object : CoroutineVertxSupport {
    override fun getVertx() = vertx
    override val coroutineContext = scope.coroutineContext
    }
    with(coroutineSupport) { block() }
    }

so that you would just have

suspend fun CoroutineScope.createSubRouter(vertx: Vertx): Router { val router = Router.router(vertx) withCoroutineSupport(vertx) { router.get("/child").coRespond { rc -> delay(100) val current = ContextInternal.current() if (!current.isDuplicate || current != rc.get("capturedContext")) { throw RuntimeException() } "Hello, IT" } } return router }

... router.route("/parent/*").subRouter(createSubRouter(vertx))

tsegismont commented 11 months ago

@bbyk I tried your proposal with EventBus and it works quite well too:

fun CoroutineScope.withCoroutineEventBus(
  vertx: Vertx,
  block: CoroutineEventBusSupport.() -> Unit
) {
  val coroutineSupport = object : CoroutineEventBusSupport {
    override fun getVertx() = vertx
    override val coroutineContext = this@withCoroutineEventBus.coroutineContext
  }
  with(coroutineSupport) { block() }
}

interface CoroutineEventBusSupport : CoroutineScope {

  fun getVertx(): Vertx

  fun <T> MessageConsumer<T>.coHandler(block: suspend (Message<T>) -> Unit): MessageConsumer<T> = handler {
    launch {
      try {
        block(it)
      } catch (e: Exception) {
        it.fail(RECIPIENT_FAILURE.toInt(), e.message)
      }
    }
  }

  fun <T> EventBus.coConsumer(address: String, block: suspend (Message<T>) -> Unit): MessageConsumer<T> =
    consumer<T>(address).coHandler(block)
}

This is a quite compact form compared to the existing code

tsegismont commented 11 months ago

So in the second iteration, I pushed this:

fun CoroutineScope.coroutineRouter(
  vertx: Vertx,
  block: CoroutineRouterSupport.() -> Unit
) = with(object : CoroutineRouterSupport {
  override fun getVertx() = vertx
  override val coroutineContext = this@coroutineRouter.coroutineContext
}) { block() }

/**
 * Adds support for suspending function in the Vert.x Web [Router].
 *
 * Objects of this type implement [CoroutineScope] to define a scope for new coroutines.
 * Typically, this is the scope of [CoroutineVerticle].
 */
interface CoroutineRouterSupport : CoroutineScope {

  /**
   * The [Vertx] instance related to this scope.
   */
  fun getVertx(): Vertx

  /**
   * The [CoroutineDispatcher] used to dispatch new coroutines.
   *
   * By default, this is the [Vertx.dispatcher].
   */
  fun getDispatcher(): CoroutineDispatcher = getVertx().dispatcher()

  /**
   * Similar to [Router.errorHandler] but using a suspended [errorHandler].
   */
  fun Router.coErrorHandler(statusCode: Int, errorHandler: suspend (RoutingContext) -> Unit): Router =
    errorHandler(statusCode) {
      launch(getDispatcher()) {
        try {
          errorHandler(it)
        } catch (t: Throwable) {
          it.fail(t)
        }
      }
    }

  /**
   * Similar to [Route.handler] but using a suspended [requestHandler].
   */
  fun Route.coHandler(requestHandler: suspend (RoutingContext) -> Unit): Route = handler {
    launch(getDispatcher()) {
      try {
        requestHandler(it)
      } catch (t: Throwable) {
        it.fail(t)
      }
    }
  }

  /**
   * Similar to [Route.failureHandler] but using a suspended [failureHandler].
   */
  fun Route.coFailureHandler(failureHandler: suspend (RoutingContext) -> Unit): Route = failureHandler {
    launch(getDispatcher()) {
      try {
        failureHandler(it)
      } catch (t: Throwable) {
        it.fail(t)
      }
    }
  }

  /**
   * Similar to [Route.respond] but using a suspended [function].
   */
  fun <T> Route.coRespond(function: suspend (RoutingContext) -> T): Route = respond {
    val vertx = it.vertx() as VertxInternal
    val promise = vertx.promise<T>()
    launch(getDispatcher()) {
      try {
        promise.complete(function.invoke(it))
      } catch (t: Throwable) {
        it.fail(t)
      }
    }
    promise.future()
  }
}

Then, a user only has to add the CoroutineRouterSupport to their CoroutineVerticle to get the extensions:

class TestVerticle : CoroutineVerticle(), CoroutineRouterSupport

By default, CoroutineRouterSupport uses the vertx.dispatcher() when lauching coroutines. And, in order to get more control, the getDispatcher can be overriden.

sanyarnd commented 11 months ago

Otherwise vertx HTTP/SQL client will not propagate context.

@sanyarnd this may be solved #234

thanks, that looks exactly what I've been waiting for -- a bridge for vertx and coroutine on the library level, though I can't see changes for context restoration (eg ThreadContextElement)

For example:

Inbound request (Vertx context/thread)
  => switch to computation sensetive task via Dispatchers.IO or ExecutorService
     => Vertx HTTP/SQL client call (need to restore context here and propagate it)
  => continue on vertx thread

I suppose to solve such cases one still should implement its own helper methods, right?

bbyk commented 11 months ago

I wonder if it makes sense to double down on some more idiomatic Kotlin coroutine patterns here. For example, with the following code:

data class VertxContext(val context: Context) : AbstractCoroutineContextElement(VertxContext) {
    companion object Key : CoroutineContext.Key<VertxContext>
}

fun CoroutineScope.vertxContext() = coroutineContext[VertxContext] ?: error("Current context doesn't contain VertxContext in it: $this")
fun CoroutineScope.vertx() = vertxContext().owner()

fun Context.coroutineContext(): CoroutineContext = dispatcher() + VertxContext(this)

you can basically drop #getVertx, #getDispatcher from CoroutineRouterSupport. For example for coHandler you no longer need to to call launch(getDispatcher()) { just launch { to use the current dispatcher which can be overridden with more idiomatic withContext pattern.

And coroutineRouter can be like this

fun CoroutineScope.coroutineRouter(
  context: CoroutineContext = EmptyCoroutineContext,
  block: CoroutineRouterSupport.(vertx: Vertx) -> Unit
) {
  val newContext = coroutineContext.newCoroutineContext(context)
  val vertx = newContext.vertx()
  with(object : CoroutineRouterSupport {
    override val coroutineContext = newContext
  }) { block(vertx) }
}

If your CoroutineScope is not that off of Vertx, you can call the function like so

coroutineRouter(vertx.orCreateContext.coroutineContext()) {
  router.get("/child").coRespond { rc ->

Or if you want to run on different dispatcher, you can always do this

withContext(differentDispatcher) {
 coroutineRouter {
      router.get("/child").coRespond { rc ->
bbyk commented 11 months ago

I might be wrong on this one:

For example for coHandler you no longer need to to call launch(getDispatcher()) { just launch { to use the current dispatcher which can be overridden with more idiomatic withContext pattern.

It seems that the coHandler would need to explicitly pick up the current Vertx context in which the handler is called because it's going to be that DuplicatedContext for just that http call and the coroutine should stay on it

  fun Route.coHandler(requestHandler: suspend (RoutingContext) -> Unit): Route = handler {
    launch(Vertx.currentContext().coroutineContext()) {
tsegismont commented 11 months ago

I suppose to solve such cases one still should implement its own helper methods, right?

@sanyarnd Possibly, yes. Would you mind sharing with me a small reproducer on a public Git repo? So I fully understand the case you're talking about. Thanks

tsegismont commented 11 months ago

I might be wrong on this one:

For example for coHandler you no longer need to to call launch(getDispatcher()) { just launch { to use the current dispatcher which can be overridden with more idiomatic withContext pattern.

It seems that the coHandler would need to explicitly pick up the current Vertx context in which the handler is called because it's going to be that DuplicatedContext for just that http call and the coroutine should stay on it

  fun Route.coHandler(requestHandler: suspend (RoutingContext) -> Unit): Route = handler {
    launch(Vertx.currentContext().coroutineContext()) {

It might not be a problem, thanks to the changes made recently. A dispatcher created for a base context shall take into account duplicated context when evaluating if dispatch is needed and when dispatching.

tsegismont commented 11 months ago

@bbyk I've given your proposal some thought today.

I agree, we can drop the getVertx and getDispatcher methods. Instead, we shall let the user provide a CoroutineContext (empty by default) when they invoke coHandler and others. That's how launch is designed anyway.

I believe your idea of creating a coroutine context element key for the Vert.x context is a good one. However, it can be done in another pull-request and is not required to implement a Vert.x Web Router that provides support for suspending functions.

You're welcome to make another review if you have time for this. I'm going to merge the PR tomorrow so that we can get this enhancement in Vert.x 4.5

Thanks again for your very good feedback. It is much appreciated.

bbyk commented 11 months ago

@tsegismont Thank you!

I still think the correct implementation might be to always launch on current context to cover all corner cases. Please see my comment on the PR: https://github.com/vert-x3/vertx-lang-kotlin/pull/253/files#r1369034860 . For example, delay(100) only works because Vertx#setTimer captures current context but any functionality that solely relies on the current dispatcher instead will not end up on the DuplicatedContext

bbyk commented 11 months ago

You're welcome to make another review if you have time for this. I'm going to merge the PR tomorrow so that we can get this enhancement in Vert.x 4.5

@tsegismont Sorry for the off-topic: please check out https://github.com/vert-x3/vertx-lang-kotlin/issues/256 for the upcoming release