apache / pulsar-client-go

Apache Pulsar Go Client Library
https://pulsar.apache.org/
Apache License 2.0
659 stars 336 forks source link

[bug] Schema validation not working: producing messages that are non-compliant with the schema is allowed #1296

Open calindima opened 1 month ago

calindima commented 1 month ago

Expected behavior

Setup:

Expected:

Actual behavior

The producer created with a schema can publish non-compliant messages. The payload is not validated against the schema.

Steps to reproduce

The first 3 steps from setup can be done through the admin API:

curl --request POST \
  --url $BASE_URL/admin/v2/schemas/$TENANT/$NAMESPACE/$TOPIC/schema \
  --header 'Content-Type: application/json' \
  --data '{
        "type": "JSON",
        "schema": "{\n  \"type\": \"record\",\n  \"name\": \"SchemaTest\",\n  \"fields\": [\n    {\n      \"name\": \"userName\",\n      \"type\": \"string\"\n    },\n    {\n      \"name\": \"userAge\",\n      \"type\": \"int\"\n    }\n  ]\n}",
        "properties": {}
    }'
curl --request POST \
  --url $BASE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/schemaValidationEnforced \
  --header 'Content-Type: application/json' \
  --data true
curl --request POST \
  --url $BASE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/isAllowAutoUpdateSchema \
  --header 'Content-Type: application/json' \
  --data false

The code:

package main

import (
    "context"
    "fmt"
    "log/slog"
    "os"
    "time"

    "github.com/apache/pulsar-client-go/pulsar"
    pulsarauth "github.com/apache/pulsar-client-go/pulsar/auth"
    "github.com/google/uuid"
)

type SchemaTest struct {
    UserName string `json:"userName" avro:"userName"`
    UserAge  int    `json:"userAge" avro:"userAge"`
}

type WrongSchema struct {
    NotUserName string `json:"notUserName" avro:"notUserName"`
    NotUserAge  int    `json:"notUserAge" avro:"notUserAge"`
}

func main() {
    slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, nil)))

    // Load settings here or hard-code them

    token := "insert-your-token-here"
    authProvider := pulsarauth.NewAuthenticationToken(token)

    pulsarClient, err := client.CreatePulsarClient(settings.Url, authProvider)
    if err != nil {
        slog.Error("Error creating the pulsar client", err)
    }
    defer pulsarClient.Close()

    producerName := fmt.Sprintf("SchemaProducer-%s", uuid.NewString()[0:6])

    schemaDef := `{
  "type": "record",
  "name": "SchemaTest",
  "fields": [
    {
      "name": "userName",
      "type": "string"
    },
    {
      "name": "userAge",
      "type": "int"
    }
  ]
}`
    schemaExample := pulsar.NewJSONSchema(schemaDef, nil)

    producer, err := pulsarClient.CreateProducer(pulsar.ProducerOptions{
        Name:   producerName,
        Topic:  settings.TopicAddress,
        Schema: schemaExample,
    })
    if err != nil {
        slog.Error("Error creating the pulsar producer", "error", err)
    }
    defer producer.Close()

    for {
        //goodMsg := &SchemaTest{
        //  UserName: "JohnDoe",
        //  UserAge:  30,
        //}
        badMsg := &WrongSchema{
            NotUserName: "JohnDoe",
            NotUserAge:  30,
        }

        msgId, sendErr := producer.Send(context.Background(), &pulsar.ProducerMessage{
            //Value: goodMsg,
            Value: badMsg,
            //Schema: schemaExample,
        })
        if sendErr != nil {
            slog.Error("Error sending message", "error", sendErr)
            break
        }

        //slog.Info("Published message: ", slog.String("messageId", msgId.String()), slog.Any("message", goodMsg))
        slog.Info("Published message: ", slog.String("messageId", msgId.String()), slog.Any("message", badMsg))
        time.Sleep(time.Second * 1)
    }
}

The schema from the registry (http response dump):

{
    "version": 0,
    "type": "JSON",
    "timestamp": 1728915217796,
    "data": "{\n  \"type\": \"record\",\n  \"name\": \"SchemaTest\",\n  \"fields\": [\n    {\n      \"name\": \"userName\",\n      \"type\": \"string\"\n    },\n    {\n      \"name\": \"userAge\",\n      \"type\": \"int\"\n    }\n  ]\n}",
    "properties": {}
}

System configuration

Pulsar version: 3.0.6.8

RobertIndie commented 3 weeks ago

Pulsar does not validate the structure of JSON messages against the topic's schema. It only checks if the producer's schema definition matches the broker's topic schema. In contrast, the Avro schema validates the message structure during both encoding and decoding. This is the same for the Java client.

If you need to verify message structure compatibility when sending the messages, you could use the Avro Schema. Otherwise, you need to ensure the producer's schema matches the message structure.

Here's a similar example in Java:

        PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .build();

        String schemaDef = "{\n" +
                "  \"type\": \"record\",\n" +
                "  \"name\": \"SchemaTest\",\n" +
                "  \"fields\": [\n" +
                "    { \"name\": \"userName\", \"type\": \"string\" },\n" +
                "    { \"name\": \"userAge\", \"type\": \"int\" }\n" +
                "  ]\n" +
                "}";

        SchemaInfo schemaInfo = SchemaInfo.builder()
                .name("SchemaTest")
                .type(SchemaType.JSON)
                .schema(schemaDef.getBytes())
                .build();

        GenericSchema<GenericRecord> schema = Schema.generic(schemaInfo);

        Producer<GenericRecord> producer = client.newProducer(schema)
                .topic("test-schema")
                .create();

        GenericRecord record = schema.newRecordBuilder()
                .set("notUserName", "Alice")
                .set("notUserAge", 30)
                .build();

        producer.send(record);

        System.out.println("Message sent!");

        producer.close();
        client.close();

The message will be sent successfully without validating the schema.

calindima commented 3 weeks ago

I expected JsonSchema to function similarly to AvroSchema, since its very close to it - schema definition is Avro for both, only the type in SchemaInfo is what differs, and how message validation works apparently. Might be a naive view since I haven't looked at how they differ in the internals of Pulsar.

Shouldn't the client library ensure that messages sent with a Schema Producer adhere to its schema? This .NET library returns a typed producer which can't send other type of messages through it, for example.

My impression of the schema validation feature of Pulsar is that it should bring in some guardrails to get some contract between producers and consumers.

RobertIndie commented 2 weeks ago

I expected JsonSchema to function similarly to AvroSchema, since its very close to it - schema definition is Avro for both, only the type in SchemaInfo is what differs, and how message validation works apparently. Might be a naive view since I haven't looked at how they differ in the internals of Pulsar.

When the client encodes the Avro message, the Avro encoder automatically validates the schema. However, JSON schema encoding works differently. We do not pass the schema definition to the encoder. Instead, we encode from any object type to JSON bytes. The JSON encoder does not perform any validation. This's same for both the GO and Java client.

Shouldn't the client library ensure that messages sent with a Schema Producer adhere to its schema? This .NET library returns a typed producer which can't send other type of messages through it, for example. My impression of the schema validation feature of Pulsar is that it should bring in some guardrails to get some contract between producers and consumers.

For client SDKs such as .Net and Java, they can use generics class feature to restrict the type of messages sent or received by the producer or consumer. However, the Go producer and consumer do not use generics. In fact, they are more similar to the Java client's Producer<byte[]>. Since Go introduced struct generics from go 1.18, it might be worthwhile to implement a similar feature for the Go producer and consumer.

calindima commented 1 week ago

When the client encodes the Avro message, the Avro encoder automatically validates the schema. However, JSON schema encoding works differently. We do not pass the schema definition to the encoder. Instead, we encode from any object type to JSON bytes. The JSON encoder does not perform any validation. This's same for both the GO and Java client.

Thanks for pointing it out, it was something that I saw and had me wondering why there's no validation. If I understand correctly, there's nothing to do for now besides accepting and working around it e.g. if validation is needed, to use AvroSchema instead.

For client SDKs such as .Net and Java, they can use generics class feature to restrict the type of messages sent or received by the producer or consumer. However, the Go producer and consumer do not use generics. In fact, they are more similar to the Java client's Producer<byte[]>. Since Go introduced struct generics from go 1.18, it might be worthwhile to implement a similar feature for the Go producer and consumer.

I think it would be interesting to offer this. For example, I noticed that although AvroSchema offers the validation we were looking for, you can just override schema completely by using Payload instead of Value to send a message, which means although you create a Schema Producer, you can still publish incompatible messages, which leaves a lot of checks in the hands of producers, instead of having them follow a paved path for this feature. I think it would be a nicer experience to have the library guide some of these use cases.

Thanks for taking the time to respond to all of this. I'm trying to understand better the existing functionality, whether there's something we can contribute and what needs to be taken as is.

calindima commented 1 week ago

@RobertIndie

For example, I noticed that although AvroSchema offers the validation we were looking for, you can just override schema completely by using Payload instead of Value to send a message, which means although you create a Schema Producer, you can still publish incompatible messages, which leaves a lot of checks in the hands of producers, instead of having them follow a paved path for this feature.

Would it make sense for me to contribute some extra validation to not allow using Payload if a schema is used, ensuring that you can't override schema validation? I think this could be added here.