red-kite-solutions / stalker

Stalker, the Extensible Attack Surface Management tool.
https://wiki.stalker.red-kite.io/
GNU General Public License v3.0
78 stars 4 forks source link

Orchestrator infinite crash loop when a Kafka message is not deserializable #218

Open lm-sec opened 9 months ago

lm-sec commented 9 months ago

The error here is triggered by a custom job parameter being an array instead of a string. It would break the deserialization process, but not drop the message. Therefore, it would crash again right away when it retried to read the message, creating an infinite loop of crashing.

Here is the error message:

Confluent.Kafka.ConsumeException: Local: Value deserialization error
       ---> System.Text.Json.JsonException: The JSON value could not be converted to System.String. Path: $.customJobParameters[1].value | LineNumber: 0 | BytePositionInLine: 1675.
       ---> System.InvalidOperationException: Cannot get the value of a token type 'StartArray' as a string.
         at System.Text.Json.ThrowHelper.ThrowInvalidOperationException_ExpectedString(JsonTokenType tokenType)
         at System.Text.Json.Utf8JsonReader.GetString()
         at System.Text.Json.Serialization.Metadata.JsonPropertyInfo`1.ReadJsonAndSetMember(Object obj, ReadStack& state, Utf8JsonReader& reader)
         at System.Text.Json.Serialization.Converters.ObjectDefaultConverter`1.OnTryRead(Utf8JsonReader& reader, Type typeToConvert, JsonSerializerOptions options, ReadStack& state, T& value)
         at System.Text.Json.Serialization.JsonConverter`1.TryRead(Utf8JsonReader& reader, Type typeToConvert, JsonSerializerOptions options, ReadStack& state, T& value)
         at System.Text.Json.Serialization.JsonCollectionConverter`2.OnTryRead(Utf8JsonReader& reader, Type typeToConvert, JsonSerializerOptions options, ReadStack& state, TCollection& value)
         at System.Text.Json.Serialization.JsonConverter`1.TryRead(Utf8JsonReader& reader, Type typeToConvert, JsonSerializerOptions options, ReadStack& state, T& value)
         at System.Text.Json.Serialization.Metadata.JsonPropertyInfo`1.ReadJsonAndSetMember(Object obj, ReadStack& state, Utf8JsonReader& reader)
         at System.Text.Json.Serialization.Converters.ObjectDefaultConverter`1.OnTryRead(Utf8JsonReader& reader, Type typeToConvert, JsonSerializerOptions options, ReadStack& state, T& value)
         at System.Text.Json.Serialization.JsonConverter`1.TryRead(Utf8JsonReader& reader, Type typeToConvert, JsonSerializerOptions options, ReadStack& state, T& value)
         at System.Text.Json.Serialization.JsonConverter`1.ReadCore(Utf8JsonReader& reader, JsonSerializerOptions options, ReadStack& state)
         --- End of inner exception stack trace ---
         at System.Text.Json.ThrowHelper.ReThrowWithPath(ReadStack& state, Utf8JsonReader& reader, Exception ex)
         at System.Text.Json.Serialization.JsonConverter`1.ReadCore(Utf8JsonReader& reader, JsonSerializerOptions options, ReadStack& state)
         at System.Text.Json.JsonSerializer.ReadFromSpan[TValue](ReadOnlySpan`1 utf8Json, JsonTypeInfo jsonTypeInfo, Nullable`1 actualByteCount)
         at System.Text.Json.JsonSerializer.Deserialize[TValue](ReadOnlySpan`1 utf8Json, JsonSerializerOptions options)
         at Orchestrator.Queue.JobsConsumer.JobSerializer`1.Deserialize(ReadOnlySpan`1 data, Boolean isNull, SerializationContext context) in /app/Orchestrator/Queue/JobsConsumer/JobSerializer.cs:line 29
         at Confluent.Kafka.Consumer`2.Consume(Int32 millisecondsTimeout)
         --- End of inner exception stack trace ---
         at Confluent.Kafka.Consumer`2.Consume(Int32 millisecondsTimeout)
         at Confluent.Kafka.Consumer`2.Consume(CancellationToken cancellationToken)
         at Orchestrator.Queue.KafkaConsumer`1.<>c__DisplayClass1_1.<<-ctor>b__0>d.MoveNext() in /app/Orchestrator/Queue/KafkaConsumer.cs:line 34

A potential solution could be to catch the exception and ignore it to just ignore the message. I don't know if it works.

https://jonboulineau.me/blog/kafka/dealing-with-bad-records-in-kafka

Here is a draft of a potential implementation in JsonSerializer.cs:

    public T Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
    {
        if (isNull) return default;

        try
        {
            return JsonSerializer.Deserialize<T>(data.ToArray()) ?? default;
        }
        catch (ConsumeException e)
        {
            Console.WriteLine("Error message here");
            return default;
        }
    }

Again, I don't know that it would work.

lm-sec commented 9 months ago

The modification to JsonSerializer.cs does not work.

lm-sec commented 9 months ago

The issue is in JobSerializer.cs. We now catch the exception, a JsonException, and soft fail. Better logging would / retry and stuff like that would be better, but it is no longer a bug in my branch.