knative-extensions / eventing-kafka-broker

Alternate Kafka Broker implementation.
Apache License 2.0
169 stars 116 forks source link

kafka-broker-dispatcher "java.lang.IllegalArgumentException: Cannot schedule a timer with delay < 1 ms" with "backoffDelay: PT0.01S" #2814

Closed maschmid closed 11 months ago

maschmid commented 1 year ago

Describe the bug Having a Kafka Broker with

apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  annotations:
    eventing.knative.dev/broker.class: Kafka
  name: broker
spec:
  delivery:
    backoffDelay: PT0.01S
    backoffPolicy: linear
    retry: 5
...

and making the delivery fail, does not do the expected 5 retries with 10ms linear retries, and instead logs

{"@timestamp":"2022-11-25T19:52:28.963Z","@version":"1","message":"Failed to send event to subscriber topic=knative-broker-test-qyxpppux-broker partition=2 offset=0","logger_name":"dev.knative.eventing.kafka.broker.dispatcher.impl.RecordDispatcherImpl","thread_name":"vert.x-worker-thread-11","level":"ERROR","level_value":40000,"stack_trace":"java.lang.IllegalArgumentException: Cannot schedule a timer with delay < 1 ms\n\tat io.vertx.core.impl.VertxImpl.scheduleTimeout(VertxImpl.java:509)\n\tat io.vertx.core.impl.VertxImpl.setTimer(VertxImpl.java:363)\n\tat dev.knative.eventing.kafka.broker.dispatcher.impl.http.WebClientCloudEventSender.retry(WebClientCloudEventSender.java:145)\n\tat dev.knative.eventing.kafka.broker.dispatcher.impl.http.WebClientCloudEventSender.lambda$send$2(WebClientCloudEventSender.java:134)\n\tat io.vertx.core.impl.future.Composition.onFailure(Composition.java:50)\n\tat io.vertx.core.impl.future.FutureBase.emitFailure(FutureBase.java:75)\n\tat io.vertx.core.impl.future.FutureImpl.tryFail(FutureImpl.java:230)\n\tat io.vertx.core.impl.future.PromiseImpl.tryFail(PromiseImpl.java:23)\n\tat dev.knative.eventing.kafka.broker.dispatcher.impl.http.WebClientCloudEventSender.lambda$send$5(WebClientCloudEventSender.java:174)\n\tat io.vertx.core.impl.future.FutureImpl$1.onSuccess(FutureImpl.java:91)\n\tat io.vertx.core.impl.future.FutureImpl$ListenerArray.onSuccess(FutureImpl.java:262)\n\tat io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:60)\n\tat io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:211)\n\tat io.vertx.core.impl.future.PromiseImpl.tryComplete(PromiseImpl.java:23)\n\tat io.vertx.core.impl.future.PromiseImpl.onSuccess(PromiseImpl.java:49)\n\tat io.vertx.core.impl.future.PromiseImpl.handle(PromiseImpl.java:41)\n\tat io.vertx.core.impl.future.PromiseImpl.handle(PromiseImpl.java:23)\n\tat io.vertx.ext.web.client.impl.HttpContext.handleDispatchResponse(HttpContext.java:400)\n\tat io.vertx.ext.web.client.impl.HttpContext.execute(HttpContext.java:387)\n\tat io.vertx.ext.web.client.impl.HttpContext.next(HttpContext.java:365)\n\tat io.vertx.ext.web.client.impl.HttpContext.fire(HttpContext.java:332)\n\tat io.vertx.ext.web.client.impl.HttpContext.dispatchResponse(HttpContext.java:294)\n\tat io.vertx.ext.web.client.impl.HttpContext.lambda$handleReceiveResponse$8(HttpContext.java:550)\n\tat io.vertx.core.impl.AbstractContext.dispatch(AbstractContext.java:100)\n\tat io.vertx.core.impl.WorkerContext.lambda$run$1(WorkerContext.java:83)\n\tat io.vertx.core.impl.TaskQueue.run(TaskQueue.java:76)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)\n\tat java.base/java.lang.Thread.run(Thread.java:833)\n","topic":"knative-broker-test-qyxpppux-broker","partition":2,"offset":0}

unrolled:

java.lang.IllegalArgumentException: Cannot schedule a timer with delay < 1 ms
    at io.vertx.core.impl.VertxImpl.scheduleTimeout(VertxImpl.java:509)
    at io.vertx.core.impl.VertxImpl.setTimer(VertxImpl.java:363)
    at dev.knative.eventing.kafka.broker.dispatcher.impl.http.WebClientCloudEventSender.retry(WebClientCloudEventSender.java:145)
    at dev.knative.eventing.kafka.broker.dispatcher.impl.http.WebClientCloudEventSender.lambda(WebClientCloudEventSender.java:134)
    at io.vertx.core.impl.future.Composition.onFailure(Composition.java:50)
    at io.vertx.core.impl.future.FutureBase.emitFailure(FutureBase.java:75)
    at io.vertx.core.impl.future.FutureImpl.tryFail(FutureImpl.java:230)
    at io.vertx.core.impl.future.PromiseImpl.tryFail(PromiseImpl.java:23)
    at dev.knative.eventing.kafka.broker.dispatcher.impl.http.WebClientCloudEventSender.lambda(WebClientCloudEventSender.java:174)
    at io.vertx.core.impl.future.FutureImpl.onSuccess(FutureImpl.java:91)
    at io.vertx.core.impl.future.FutureImpl.onSuccess(FutureImpl.java:262)
    at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:60)
    at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:211)
    at io.vertx.core.impl.future.PromiseImpl.tryComplete(PromiseImpl.java:23)
    at io.vertx.core.impl.future.PromiseImpl.onSuccess(PromiseImpl.java:49)
    at io.vertx.core.impl.future.PromiseImpl.handle(PromiseImpl.java:41)
    at io.vertx.core.impl.future.PromiseImpl.handle(PromiseImpl.java:23)
    at io.vertx.ext.web.client.impl.HttpContext.handleDispatchResponse(HttpContext.java:400)
    at io.vertx.ext.web.client.impl.HttpContext.execute(HttpContext.java:387)
    at io.vertx.ext.web.client.impl.HttpContext.next(HttpContext.java:365)
    at io.vertx.ext.web.client.impl.HttpContext.fire(HttpContext.java:332)
    at io.vertx.ext.web.client.impl.HttpContext.dispatchResponse(HttpContext.java:294)
    at io.vertx.ext.web.client.impl.HttpContext.lambda(HttpContext.java:550)
    at io.vertx.core.impl.AbstractContext.dispatch(AbstractContext.java:100)
    at io.vertx.core.impl.WorkerContext.lambda(WorkerContext.java:83)
    at io.vertx.core.impl.TaskQueue.run(TaskQueue.java:76)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor.run(ThreadPoolExecutor.java:635)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:833)

Expected behavior retries should be attempted at the specified 10ms + N*10ms linear intervals per policy

To Reproduce Create the broker as above, point it to something that fails (e.g. an URL returning a 404)

Knative release version 1.5.0

Additional context Installed via OpenShift Serverless operator https://github.com/openshift-knative/serverless-operator/tree/release-1.26

github-actions[bot] commented 1 year ago

This issue is stale because it has been open for 90 days with no activity. It will automatically close after 30 more days of inactivity. Reopen the issue with /reopen. Mark the issue as fresh by adding the comment /remove-lifecycle stale.

pierDipi commented 1 year ago

/remove-lifecycle stale

github-actions[bot] commented 1 year ago

This issue is stale because it has been open for 90 days with no activity. It will automatically close after 30 more days of inactivity. Reopen the issue with /reopen. Mark the issue as fresh by adding the comment /remove-lifecycle stale.

pierDipi commented 1 year ago

/remove-lifecycle stale

github-actions[bot] commented 1 year ago

This issue is stale because it has been open for 90 days with no activity. It will automatically close after 30 more days of inactivity. Reopen the issue with /reopen. Mark the issue as fresh by adding the comment /remove-lifecycle stale.