confluentinc / confluent-kafka-dotnet

Confluent's Apache Kafka .NET client
https://github.com/confluentinc/confluent-kafka-dotnet/wiki
Apache License 2.0
45 stars 857 forks source link

Kafka Connect JSON Deserialization #1389

Closed tonysneed closed 4 years ago

tonysneed commented 4 years ago

Description

I have a Postgres connector that writes the following message:

[
    {
        "topic": "dbserver1.public.person",
        "partition": 0,
        "offset": 0,
        "timestamp": 1598317085052,
        "timestampType": "CREATE_TIME",
        "headers": [],
        "key": {
            "person_id": 1
        },
        "value": {
            "before": null,
            "after": {
                "person_id": 1,
                "name": "Tony Sneed",
                "favorite_color": "Green",
                "age": 29
            },
            "source": {
                "version": "1.2.1.Final",
                "connector": "postgresql",
                "name": "dbserver1",
                "ts_ms": 1598317083251,
                "snapshot": "false",
                "db": "source-database",
                "schema": "public",
                "table": "person",
                "txId": 491,
                "lsn": 23476896,
                "xmin": null
            },
            "op": "c",
            "ts_ms": 1598317083583,
            "transaction": null
        },
        "__confluent_index": 0
    }
]

I have the following class:

public class Person
{
    [JsonRequired]
    [JsonProperty("person_id")]
    public int PersonId { get; set; }

    [JsonRequired]
    [JsonProperty("name")]
    public string Name { get; set; }

    [JsonProperty("favorite_color")]
    public string FavoriteColor { get; set; }

    [JsonProperty("age")]
    public int Age { get; set; }
}

I build a consumer with deserializers set.

using (var consumer = new ConsumerBuilder<TKey, TValue>(config)
    .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
    .SetPartitionsAssignedHandler((c, partitions) =>
    {
        Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}]\n");
    })
    .SetPartitionsRevokedHandler((c, partitions) =>
    {
        Console.WriteLine($"Revoking assignment: [{string.Join(", ", partitions)}]");
    })
    // Set value Protobuf deserializer
    .SetKeyDeserializer(new JsonDeserializer<TKey>().AsSyncOverAsync())
    .SetValueDeserializer(new JsonDeserializer<TValue>().AsSyncOverAsync())
    .Build())

Consuming the message generates this error.

JsonSerializationException : Required property 'person_id' not found in JSON. Path '', line 1, position 359.

What is the best way to deserialize a JSON message from Kafka Connect?

How to reproduce

See the json-converter branch on my repo: https://github.com/event-streams-dotnet/connect-event-streams/blob/json-converter/Consumer/Program.cs

Checklist

Please provide the following information:

mhowlett commented 4 years ago

I have a Postgres connector that writes the following message:

i'm a bit confused. is this the contents of the message value? (i'm not that familiar with connect, so guessing) I don't think you can deserialize that as is with the .net json deserializer because it's an array at the top level.

and what is TKey, TValue?

tonysneed commented 4 years ago

I figured it out. Using the VS Code extension, Paste JSON as Code, I created the Value and After classes:

public partial class Value
{
    [JsonProperty("before")]
    public object Before { get; set; }

    [JsonProperty("after")]
    public After After { get; set; }

    [JsonProperty("source")]
    public Source Source { get; set; }

    [JsonProperty("op")]
    public string Op { get; set; }

    [JsonProperty("ts_ms")]
    public long TsMs { get; set; }

    [JsonProperty("transaction")]
    public object Transaction { get; set; }
}

public partial class After
{
    [JsonProperty("person_id")]
    public long PersonId { get; set; }

    [JsonProperty("name")]
    public string Name { get; set; }

    [JsonProperty("favorite_color")]
    public string FavoriteColor { get; set; }

    [JsonProperty("age")]
    public long Age { get; set; }
}

Then I set the value deserializer to a JsonDeserializer<Value>. The main envelope is already deserialized by the consumer, so all that's left is to properly deserialize the key and value.

TKey and TValue are just type arguments. I made the Run_Consumer method in the sample generic.