line / decaton

High throughput asynchronous task processing on Apache Kafka
Apache License 2.0
336 stars 51 forks source link

Retrying task will be discarded on task deserialization with using DefaultTaskExtractor #123

Open hktechn0 opened 3 years ago

hktechn0 commented 3 years ago

We are consuming Kafka topic using Decaton processor with retrying.

However, ProcessorsBuilder.consuming(String topic, TaskExtractor<T> taskExtractor) is not working correctly with DefaultTaskExtractor.

retryTaskExtractor will unwrap DecatonTaskRequest using DefaultTaskExtractor, then taskExtractor.extract() here. https://github.com/line/decaton/blob/v3.0.2/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java#L76 But if taskExtractor is DefaultTaskExtractor or a TaskExctractor which is delegating deserialization to DefaultTaskExtractor, deserialization will be failed on retryTaskExtractor and the retrying task will be discarded.

Stacktrace:

java.lang.IllegalArgumentException: com.linecorp.decaton.com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).
    at com.linecorp.decaton.processor.runtime.internal.DefaultTaskExtractor.extract(DefaultTaskExtractor.java:45)
    at com.linecorp.bot.commons.decaton.processor.TimedTaskExtractor.extract(TimedTaskExtractor.kt:31)
    at com.linecorp.decaton.processor.runtime.ProcessorsBuilder.lambda$consuming$1(ProcessorsBuilder.java:83)
    at com.linecorp.decaton.processor.runtime.internal.ProcessPipeline.extract(ProcessPipeline.java:96)
    at com.linecorp.decaton.processor.runtime.internal.ProcessPipeline.scheduleThenProcess(ProcessPipeline.java:68)
    at com.linecorp.decaton.processor.runtime.internal.ProcessorUnit.processTask(ProcessorUnit.java:73)
    at com.linecorp.decaton.processor.runtime.internal.ProcessorUnit.lambda$putTask$1(ProcessorUnit.java:60)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: com.linecorp.decaton.com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).
    at com.linecorp.decaton.com.google.protobuf.InvalidProtocolBufferException.invalidTag(InvalidProtocolBufferException.java:101)
    at com.linecorp.decaton.com.google.protobuf.CodedInputStream$ArrayDecoder.readTag(CodedInputStream.java:551)
    at com.linecorp.decaton.com.google.protobuf.CodedInputStream$ArrayDecoder.skipMessage(CodedInputStream.java:649)
    at com.linecorp.decaton.com.google.protobuf.CodedInputStream$ArrayDecoder.skipField(CodedInputStream.java:581)
    at com.linecorp.decaton.protocol.Decaton$DecatonTaskRequest.<init>(Decaton.java:1073)
    at com.linecorp.decaton.protocol.Decaton$DecatonTaskRequest.<init>(Decaton.java:1041)
    at com.linecorp.decaton.protocol.Decaton$DecatonTaskRequest$1.parsePartialFrom(Decaton.java:1638)
    at com.linecorp.decaton.protocol.Decaton$DecatonTaskRequest$1.parsePartialFrom(Decaton.java:1633)
    at com.linecorp.decaton.com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:163)
    at com.linecorp.decaton.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:197)
    at com.linecorp.decaton.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:209)
    at com.linecorp.decaton.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:214)
    at com.linecorp.decaton.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49)
    at com.linecorp.decaton.protocol.Decaton$DecatonTaskRequest.parseFrom(Decaton.java:1250)
    at com.linecorp.decaton.processor.runtime.internal.DefaultTaskExtractor.extract(DefaultTaskExtractor.java:36)
    ... 9 common frames omitted

We passed original com.linecorp.bot.commons.decaton.processor.TimedTaskExtractor to taskExtractor which is like following (written in Kotlin). This TimedTaskExtractor is for observing consuming delay. The issue will cause with delegate = DefaultTaskExtractor.

class TimedTaskExtractor<T>(
    private val delegate: TaskExtractor<T>,
    subscription: String,
    topic: String,
    meterRegistry: MeterRegistry
) : TaskExtractor<T> {
    private val timer = meterRegistry.timer(
        "decaton.processor.${TimedTaskExtractor::class.simpleName?.toLowerCase()}.timestamp_delay",
        Tags.of(Tag.of("topic", topic), Tag.of("subscription", subscription))
    )

    override fun extract(bytes: ByteArray): DecatonTask<T> {
        return delegate.extract(bytes).also {
            if (it.metadata().timestampMillis() > 0) {
                timer.record(
                    System.currentTimeMillis() - it.metadata().timestampMillis(),
                    TimeUnit.MILLISECONDS
                )
            }
        }
    }
}
ocadaruma commented 3 years ago

Thanks for pointing out.

Yeah, that's the heritage from when Decaton didn't support consuming arbitrary task format. (Initially, Decaton only supported DecatonTaskRequest protobuf format)

When we introduced TaskExtractor for supporting arbitrary message format, there was a discussion about how to support retry-feature.

~In short, you have to use DefaultTaskExtractor only when you consume DecatonTaskRequest format.~

In short, you have to use ProcessorsBuilder#consuming(String, Deserializer) to consume DecatonTaskRequest format.

I guess we can solve this problem fundamentally in this attempt (https://github.com/line/decaton/pull/80), but unfortunately it's now stuck due to our lack of resource.

hktechn0 commented 3 years ago

In short, you have to use DefaultTaskExtractor only when you consume DecatonTaskRequest format.

We are using DecatonTaskRequest format task with DefaultTaskExtractor (with wrapped task extractor) and retrying.

I think the problem of the code is here https://github.com/line/decaton/blob/v3.0.2/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java#L76

If we specify Deserializer for .consuming(), new DefaultTaskExtractor<>(deserializer) for both taskExtractor and retryTaskExtractor. https://github.com/line/decaton/blob/v3.0.2/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java#L63 But if we specify TaskExtractor for .consuming(), retryTaskExtractor will be wrapped with DefaultTaskExtractor for consuming non DecatonTaskRequest format topics even if taskExtractor is DefaultTaskExtractor. https://github.com/line/decaton/blob/v3.0.2/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java#L76

If we call .consuming() with DefaultTaskExtractor, deserialize DecatonTaskRequest 2 times and it failed. I know TaskExtractor is especially for non DecatonTaskRequest, but it will cause a bug. If it doesn't support DefaultTaskExtractor, I think it should be thrown error or put logging on .cosuming().

ocadaruma commented 3 years ago

In short, you have to use DefaultTaskExtractor only when you consume DecatonTaskRequest format

This my comment was bit wrong. Precisely, you have to use ProcessorsBuilder#consuming(String, Deserializer) to consume DecatonTaskRequest format.

If we call .consuming() with DefaultTaskExtractor, deserialize DecatonTaskRequest 2 times and it failed.

Yeah, right. I understand that this behavior could cause a bug, but since DefaultTaskExtractor is located in internal package, we don't expect it to be used by users.

Though throwing an exception or logging is an option, if users "wrap" DefaultTaskExtractor (like you did) in outer extractor, it may not work.

I think adding some DefaultTaskExtractor's javadoc about the caution for the usage is sufficient for now.