confluentinc / schema-registry

Confluent Schema Registry for Kafka
https://docs.confluent.io/current/schema-registry/docs/index.html
Other
2.22k stars 1.11k forks source link

JSON Schema Support #220

Closed ottomata closed 4 years ago

ottomata commented 9 years ago

Avro is cool!

But lots of people use JSON. JSON Schema allows folks to use JSON with a strictly defined schema. Wouldn't it be nice if schema-registry (and Kafka REST Proxy) supported JSON Schema?

JSON Schema wouldn't be as good as Avro, as there is no schema evolution feature. However, it would be nice if one could produce JSON to Kafka REST Proxy and be sure that the data matched a registered schema. Thoughts? How hard would this be?

ewencp commented 9 years ago

JSON Schema is nice and in some ways more powerful than Avro -- the validations make it a more complex spec, but make the requirements for data much clearer.

I think the biggest gap for JSON Schema is that (as far as I know) it doesn't have the same compatibility specification and tools, so they'd probably have to be implemented from scratch. The rest of the registry should look largely the same between Avro and JSON Schema -- the schema format is JSON in both cases and otherwise the registry is mostly just doing storage/lookup. I would guess that creating the branches to handle JSON Schema should be mostly straightforward, and then the branches would reconverge when they hit the storage layer.

There would be some important design decisions, however, which might not be straightforward to resolve. For example, how do we differentiate between the types in the API (complex content types like in REST Proxy?). And do we need to support both Avro and JSON in one cluster or is the expectation that you go entirely with one or the other? That has implications on how data is stored, if there's a need for some additional namespacing, how you handle conflicts if you try to register different types to the same subject, etc.

In other words, there's probably a quick and dirty version that could be made to work pretty easily, but cleanly supporting all the features that the Avro version does would be a significantly more substantial time investment.

ottomata commented 9 years ago

do we need to support both Avro and JSON in one cluster or is the expectation that you go entirely with one or the other?

In my ideal world, yes. I would love to be able to just convince everyone that they should use Avro and not worry about it, but there is so much JSON (and JSON Schema) in our org already, that in order to use confluent systems, I think we are going to have to support both JSON Schema and Avro.

ewencp commented 9 years ago

Right, I totally get the need for JSON, it was more a question of whether you're adopting Avro at all such that a mixed mode is useful to you :)

ottomata commented 9 years ago

Hm, related question.

Instead of JSON Schema, would it be easier to make the REST Proxy produce the JSON encoding of the Avro data and still validate that against a schema?

That is, instead of validating and converting the POSTed Avro-JSON records into Avro-Binary and then producing them to Kafka, could REST Proxy validate the Avro-JSON records and then produce them as is to Kafka, i.e. as JSON text data?

This would be the best of both worlds, and allow users to choose to use validated JSON directly, while still enforcing that the JSON conform to a schema.

AdamDz commented 8 years ago

+1 for JSON Schema support. The Confluent Platform is Avro-centric, which limits its usage. We are building a platform that has API specification in RAML and JSON Schema, and don't want to maintain definitions of data types in two formats.

yhilem commented 8 years ago

+1 for JSON Schema support.

hakamairi commented 7 years ago

Any plans for this feature?

manuelfu commented 7 years ago

+1 for JSON Schema support.

tim-jones-001 commented 7 years ago

+1 for JSON schema support

eparisca commented 7 years ago

+1 for JSON schema support

GreenAsh commented 7 years ago

+1 for JSON schema support

kolprogman commented 7 years ago

+1 for JSON schema support

lukoyanov commented 7 years ago

+1 for JSON schema support

dgvj-work commented 7 years ago

+1 for JSON schema support

lafolle commented 7 years ago

+1 for JSON schema support

fernandomoraes commented 7 years ago

+1 for JSON schema support

juguyard commented 6 years ago

+1 for JSON schema support!

jchrist31an commented 6 years ago

+1 for JSON schema support

Brian-Burns-Bose commented 6 years ago

+1 especially since OpenAPI / Swagger utilizes JSON schema. JSON schema also has much better support for value constraints.

solsson commented 6 years ago

In case this feature gets near a roadmap, https://github.com/joelittlejohn/jsonschema2pojo is pretty great. I used it for an ad-hoc topic-schema mapping outlined in https://github.com/Yolean/kubernetes-kafka/issues/101#issuecomment-355940612.

codeislyric commented 6 years ago

+1 for JSON Schema support.

ottomata commented 6 years ago

I've been thinking about this more, and for my own use case, as I said earlier, all I really want is to be able to use schema validated JSON strings in Kafka. Avro binary in Kafka adds a lot of complications, including a dependency on Confluent Schema Registry (or something) to consume data. However, I really want to be able to use Kafka Connect and other fancy tools, which are pretty difficult to use with JSON.

Perhaps augmenting Schema Registry + REST Proxy (maybe just REST Proxy?) to avoid converting incoming Avro-JSON to Avro-Binary before producing to Kafka would be much easier, and solve many of the use cases that JSON Schema support would. It could be easier to make Kafka Connect know how to convert from Avro-JSON to its internal data model than relying on JSONSchemas. I wonder how hard this would be...

solsson commented 6 years ago

I'd be happy to see this feature (implemented, yes, but also... if I take the liberty of thinking out loud here...) discussed in a wider context of tooling for schemas and topics, geared towards microservices. @ept's https://www.confluent.io/blog/put-several-event-types-kafka-topic/ is a great starting point.

Asynchronous communication via Kafka is, as Confluent convincingly argues, a compelling alternative to REST for contracts between services. However, while REST has service discovery, Istio Mesh and Swagger, we Kafka-hopefuls have... well... topic names and opaque Serdes.

What is it we need to manage about topics?

What are the options for how a producer - be it protobuf or JSON or Avro - can specify the schema of a record?

Many services use JSON natively, and not all of them can easily adopt an Avro library, let alone serdes that speak Schema Registry. For example, kafkacat is a great tool for testing and troubleshooting, but the interest for avro there looks about as low as the interest for JSON here.

With REST + Swagger you might use build time code generation for the contracts your dependency exports. I haven't seen any such approaches for Kafka. Would it require service discovery for topics?

Has anyone seen this kind of discussion anywhere? I've been on the lookout, but no luck so far.

ottomata commented 6 years ago

Had more time to think about this today. I just sent an email to the Kafka users mailing list to start a discussion, but since there are so many folks who have +1ed this, I'll post here too.

The more I think about this, I realize that I don't care that much about JSON support in Confluent products. What I really want is the ability to use Kafka Connect with JSON data. Kafka Connect does sort of support this, but only if your JSON messages conform to its very specific envelope schema format. (If Confluent products made this possible, I'd use them. :) )

What if…Kafka Connect provided a JSONSchemaConverter (not Connect’s JsonConverter), that knew how to convert between a provided JSONSchema and Kafka Connect internal Schemas?
This should allow for configuration of Connectors with JSONSchemas to read JSON messages directly from a Kafka topic. Once read and converted to a ConnectRecord, I believe the messages could be used with any Connector out there.

So, my question here is: is this what most of the +1ers here want too? Or do you specifically want JSONSchema support in Confluent Schema Registry?

Tapppi commented 6 years ago

What if…Kafka Connect provided a JSONSchemaConverter (not Connect’s JsonConverter), that knew how to convert between a provided JSONSchema and Kafka Connect internal Schemas?

This would solve a lot of problems, and be more sensible than the current setup for connect. Using connect to output arbitrary processed data is a pain at the moment.

Or do you specifically want JSONSchema support in Confluent Schema Registry?

This would solve even more problems, but mainly, because I see this as a superset of the above. A JSONSchemaConverter would be a different (more primitive) way of getting this functionality in only Kafka Connect. If it would work as a stepping stone to remote JSONSchema support from Schema Registry, that would be even more wonderful.

Specifically, I am working on implementing a subset of Kafka Streams functionality for Node.js in TypeScript, and Avro is a strange beast to get in there. Writing up a Node.js libserdes wrapper (schema registry & avro support) and integrating that is on the roadmap, but that still comes with the hurdle of most Node.js shops using primarily JSON.

I think supporting JSON schemas would go a long way towards helping adoption of Kafka for orgs like ours that deal mainly in Node.js and JSON. There's definitely interest and lots of possibilities, but the ecosystem is currently quite far out of reach due to stack differences.

joewood commented 6 years ago

@ottomata so your requirement is more about schema integration for sourcing JSON sources? One of the problems I have with the Connect Schema object is it's simplicity. Schema parity with JSON schema will be difficult.

@Tappi - your usecase is more similar to ours (and sounds very interesting, is it similar to https://github.com/nodefluent/kafka-streams?). The ability to generate TS types based on schema is definitely something I'm looking at.

One approach that we discussed with Confluent while they were last onsite was a best efforts conversion in the REST interface, maybe using the MIME type to render the appropriate meta-model format. Fore example: - a schema GET with Accept: application/schema+json would best efforts convert the stored Avro schema to JSON schema.

In addition, it would be much better to make Avro and JSON usage more consistent. The KafkaAvroConverter uses the schema registry to source the schema, but the JSONConverter sends the schema in every message - which is really impractical for anything other than trivially small message schemas. It would be great to have a way of propagating the schema without the overhead and message bloat. So, not necessarily full JSON schema, but enough to support the connect Schema object and validate a JSON payload. I guess this could be done by copying the bulk of KafkaAvroSerializer and providing a JSON equivalent, complete with schema cache.

ottomata commented 6 years ago

@ottomata so your requirement is more about schema integration for sourcing JSON sources?

Yes. We rely pretty heaviliy on JSON and JSONSchema now. What I really want is to keep JSON in Kafka, and still be able to use Kafka Connect. REST Proxy and Schema Registry might help with this, but they don't have to. Perhaps allowing REST Prroxy to emit the JSON encoded Avro event, rather than binary, would be enough for us? We will be evaluating our options over the next few months.

Tapppi commented 6 years ago

@joewood

and sounds very interesting, is it similar to https://github.com/nodefluent/kafka-streams?

Yes and no, as nodefluent/kafka-streams aims to be a "equivalent" node version of streams, but not functionally equivalent. For example, it does not support local state stores, and due to the architecture they would work very differently from Kafka Streams.

My port aims to be mostly API and functionally compatible with Kafka Streams, while mangling some of the internals to simpler versions (Jesus there is a lot of Java bloat in the Kafka Streams code X_X). This is also the reason for typescript, as it let's me create a consistent typed API that eases the porting.

I was basically duplicating Kafka Streams Processor API independently (with some minor differences) while working on my own Kafka lib, so I instead decided to take the Kafka Streams API and just rework some of the internals. Right now it seems that state store support will also be very similar to the Java version, but some of the constructors etc. might be simplified.

The ability to generate TS types based on schema is definitely something I'm looking at.

This is very interesting, I'll take a look at this!

a schema GET with Accept: application/schema+json would best efforts I guess this could be done by copying the bulk of KafkaAvroSerializer and providing a JSON equivalent, complete with schema cache.

This would also solve most of our problems. I'll think some more about this

In addition, it would be much better to make Avro and JSON usage more consistent. The KafkaAvroConverter uses the schema registry to source the schema, but the JSONConverter sends the schema in every message

Yes, this is the reason we are now looking for this JSON support, because we cannot send the schema with every message. Right now we have to create a new topic with very limited retention and add schemas while forwarding messages to it to use connect. Actually, I wonder if we could add the schema statically with SMTs 🤔 Gotta test this out!

gaui commented 6 years ago

We rely heavily on JSON at our organization. Today we only use Avro for Kafka and Open API / Swagger (which complies to the JSON Schema spec http://json-schema.org). https://swagger.io/docs/specification/data-models/keywords/

By adding JSON Schema support to Schema Registry, we would be able to use a single schema standard in all our services - to make sure all our service contracts are in sync.

suresh-krishnamurthy commented 6 years ago

+1 for JSON Schema support.

elric-k commented 6 years ago

+1 for JSON schema support

rhoeting commented 6 years ago

+1 for JSON Schema Support

pablo-ct commented 6 years ago

+1 for adding JSON Schema support to Schema Registry

bozidarpasagic commented 6 years ago

+1 for JSON Schema Support

fhanin commented 6 years ago

+1 for JSON Schema Support

ccamel commented 6 years ago

+1 for JSON Schema Support

solsson commented 6 years ago

For everyone who's +1, how do you envision messages to identify the schema? As some kind of custom prefix to the value, or a $schema key, or based on topic names or something else? Any requirements on schema evolution?

The main benefit of JSON encoded values as I see it is that it does not require a schema to be decoded. I'd be hesitant to introduce a runtime dependency (like schema registry) for consuming JSON topics.

This is not an argument against JSON-schema support, I'm just curious what properties of JSON encoding+schema people prioritize.

ottomata commented 6 years ago

What I mostly want, is an easy way to map from a JSON message to a JSONSchema. That, + Kafka Connect integration that maps from JSONSchema to a Connect Schema, allowing for easy integration of JSON messages into any system that has Kafka Connector sinks.

For my use case, we have a schema_uri field embedded in all of our messages. In another, older system, schemas can be looked up via this URI. Our schema_uris encompass a schema name and a version. E.g. mediawiki/revision/create/2. Leave off the version, and you get the latest.

I'd hope that the mechanism for discovering the schema/version for a given message would be pluggable.

As for schema evolution, I think we could only support additions of new optional fields. Any other schema changes would not be compatible.

joewood commented 6 years ago

The convention of using the topic name with -key or -value suffix is the most common. My use cases would be:

To minimize the impact of this requirement, a simple extensibility point could be used in the Schema Registry to support other MIME types through an external JAR. This may be the simplest way to solve this and potential future requests.

ottomata commented 6 years ago

FYI (especially for @Tapppi ), I've made an attempt at a JSONSchemaConverter for Kafka Connect here: https://github.com/ottomata/kafka-connect-jsonschema. It is very proof-of-concept at the moment, but works! There are still lots of pieces it needs to be even close to complete.

philippbauer commented 6 years ago

+1 for adding JSON Schema support to Schema Registry

ericfranckx commented 6 years ago

+1 for adding JSON Schema support to Schema Registry

sumannewton commented 5 years ago

+1 for JSON schema support

florintene commented 5 years ago

+1 for adding JSON Schema support to Schema Registry

migalho commented 5 years ago

+1 for JSON schema support

gaui commented 5 years ago

Is this scheduled?

rayokota commented 5 years ago

@gaui, this is being worked on, delivery date TBD.

ottomata commented 5 years ago

FWIW, Wikimedia is going another route. We considered building in JSONSchema support to the Confluent Schema Registry, and decided that it would be too difficult for us to do. Avro doesn't really do 'validation', it just fails if serialization from JSON -> Avro fails. The modifications to Schema Registry seemed large enough that it would be too difficult to fork, modify and upstream changes. In hindsight this was a good decision, since Confluent has moved away from a fully open source license, and Wikimedia is very picky about these things :)

Since in general, JSON does not require schemas to read data, our use case is mostly around a service that will flexibly validate JSON events and then produce them to Kafka (or elsewhere). This use case is more like what Kafka REST Proxy provides than Schema Registry. We are implementing a library and service in NodeJS that will do just this.

https://github.com/wikimedia/eventgate

EventGate is still WIP, but we hope to deploy our first production use case of it in the next month or so.

EventGate is schema registry agnostic; as long as your event's schema can be looked up from a URI (local file:// is fine!) then it can find the schema and validate your event. It is only opinionated in that it expects that your event contains the schema URL in it somewhere.

We plan to host schemas simply using a git repo + an http file server. Schema evolution and compatibility will be enforced by a CI process. This will allow us to decentralize schemas, and allow developers to use and develop schemas in the same way that they do with code, rather than having to POST schemas to a centralized store before deploying code that uses them.

(If you are crazy and want to learn more, all of our project plans are public here https://phabricator.wikimedia.org/T185233)

OneCricketeer commented 5 years ago

As far as having a Java SerDe goes, I was able to find one that supported Jackson and the latest drafts of JSON-schema. https://github.com/worldturner/medeia-validator

For the most part, I was able to wrap the existing json-serializer module of this repo with methods of that library.

Proof of concept, still, but first draft was that schemas definitions are only available on the classpath.

Base class

public abstract class AbstractKafkaJsonSchemaSerde {

    protected MedeiaJacksonApi api;

    public AbstractKafkaJsonSchemaSerde() {
        this.api = new MedeiaJacksonApi();
    }

    protected SchemaValidator getSchemaValidator(URL schemaURL) {
        // TODO: Hook in schema-registry here
        SchemaSource source = new UrlSchemaSource(schemaURL);
        return api.loadSchema(source);
    }

    public abstract SchemaValidator getSchemaValidator();
}

Serializer

    @Override
    public SchemaValidator getSchemaValidator() {
        // the configure method ensures the resource is non-null
        return getSchemaValidator(getClass().getResource(schemaResource));
    }

    @Override
    public byte[] serialize(String topic, T data) {
        if (data == null) {
            return null;
        }

        JsonGenerator validatedGenerator = api.decorateJsonGenerator(schemaValidator, unvalidatedGenerator);
        try {
            objectMapper.writeValue(validatedGenerator, data);
            byte[] bytes = baos.toByteArray();
            baos.reset(); // new calls to serialize would otherwise append onto the stream
            return bytes;
        } catch (IOException e) {
            throw new SerializationException(e);
        }
    }

Deserializer

    @Override
    public T deserialize(String ignoredTopic, byte[] bytes) {
        if (bytes == null || bytes.length == 0) {
            return null;
        }

        try {
            JsonParser unvalidatedParser = objectMapper.getFactory().createParser(bytes);
            JsonParser validatedParser = api.decorateJsonParser(schemaValidator, unvalidatedParser);
            return objectMapper.readValue(validatedParser, type);
        } catch (JsonParseException | JsonMappingException e) {
            throw new SerializationException("Unable to parse JSON into type \"" + type + "\".", e);
        } catch (IOException e) {
            throw new SerializationException(e);
        } catch (ValidationFailedException e) {
            throw new SerializationException("Failed to validate input data against schema resource "
                    + schemaResource, e);
        }
    }

Given this basic schema

{
  "$id": "https://example.com/person.schema.json",
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "Person",
  "type": "object",
  "properties": {
    "firstName": {
      "type": "string",
      "description": "The person's first name."
    },
    "lastName": {
      "type": "string",
      "description": "The person's last name."
    },
    "age": {
      "description": "Age in years which must be equal to or greater than zero.",
      "type": "integer",
      "minimum": 0
    }
  }
}

And a Jackson class

    public static Person createPerson(int age) {
        Person person = new Person();
        person.setFirstName("fname");
        person.setLastName("lname");
        person.setAge(age);
        return person;
    }

We run a serializer

 KafkaJsonSchemaSerializer<Person> s = new KafkaJsonSchemaSerializer<>();
 configureSerializer(s, "/schemas/person/person-min-age.json");
 Person p = createPerson(-1); // cause a schema invalidation with a negative age
 s.serialize("topic", p);

Causes an exception, as expected like

org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.databind.JsonMappingException: [Validation Failure
------------------
Rule:     properties
Property: age
Message:  Property validation failed
Location: at 
Details:
    Rule:     minimum
    Message:  Value -1 is smaller than minimum 0
    Location: at 
    -----
] (through reference chain: model.Person["age"])
gaui commented 5 years ago

Is there some rough ETA on this one?