apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.2k stars 3.58k forks source link

Producer with JSON schema type with RecordSchemaBuilder doesn't throw an error if any extra parameter is passed in the message payload. #20274

Open vpeddada-tibco opened 1 year ago

vpeddada-tibco commented 1 year ago

Search before asking

Version

Java Client(2.11.0)-->broker(2.11.0) Broker instance OS: Amazon Linux 2023 Java App running OS : Windows 10

Minimal reproduce step

    String jsonSchemaDef= "{"name" : "Employee", "fields" : [{ "name" : "Name" , "type" : "string" }, { "name" : "Age" , "type" : "int" }]}";
    JSONObject schemaDefObj = new JSONObject(jsonSchemaDef);
    JSONArray fArray = schemaDefObj.getJSONArray("fields");                 
    RecordSchemaBuilder recordSchemaBuilder = SchemaBuilder.record(schemaName);
    for(int i=0;i<fieldArray.length();i++) {
        JSONObject fieldObj = fieldArray.getJSONObject(i);
        recordSchemaBuilder.field(fieldObj.getString("name")).type(SchemaType.valueOf(fieldObj.getString("type")type.toUpperCase()));   // Create the record builder by parsing through a JSON  String          
    }       
    SchemaInfo schemaInfo = recordSchemaBuilder.build(SchemaType.valueOf("JSON")); // Setting the type of the schemaDef to JSON.
    Producer<GenericRecord> producerJson = pulsarclient.newProducer(Schema.generic(schemaInfo))
                                .topic(topicName)                               
                                .create();
    GenericSchemaImpl schema = GenericJsonSchema.of(schemaInfo);
    GenericRecordBuilder recordbuilder =schema.newRecordBuilder();          
    String message ="{"Employee":{"Age" :35,"Name" :"TestUser","Mobile":123469870}}"; // Added an extra field Mobile in the payload.
    JSONObject payloadObj = new JSONObject(message);                    
    Set<String> fieldSet =payloadObj.keySet();              
    for (String key1 : fieldSet) {                          
            recordbuilder.set(key1, payloadObj.getJSONObject(key1).toMap()); //Building the payload using GenericRecord builder to send the message
    }
    GenericRecord fieldrecord= rcbuilder.build();
    MessageId msgId =producerAvro.newMessage().value(fieldrecord).send();

What did you expect to see?

As the message payload contains extra parameter Mobile the pulsar broker should reject the message and throw an error.

What did you see instead?

The message is acknowledged by the broker and send operation is successful.

Anything else?

We tried the same case with Avro schema type and it throws an error if any extra parameter is passed in the message payload.

Are you willing to submit a PR?

HattoriHenzo commented 1 year ago

@vpeddada Your point is good, but I am not sure this is not the role of the Broker to handle message validation. It can be a good feature to implement.

lifepuzzlefun commented 1 year ago

@vpeddada hi, can you provide the policy on the namespace like SchemaCompatibilityStrategy And does you demo upload an json schema to create a producer, send a message by runtime create avro record, am i understanding right ? just want to know the demo background and try to reproduce your finding : - )

github-actions[bot] commented 1 year ago

The issue had no activity for 30 days, mark with Stale label.

RobertIndie commented 6 days ago

Quote from the reply here

Pulsar does not validate the structure of JSON messages against the topic's schema. It only checks if the producer's schema definition matches the broker's topic schema. In contrast, the Avro schema validates the message structure during both encoding and decoding.