jcustenborder / kafka-connect-json-schema

Apache License 2.0
14 stars 14 forks source link

java.lang.UnsupportedOperationException: Schema type is not supported. #8

Closed cjmatta closed 4 years ago

cjmatta commented 4 years ago

For this schema from wikimedia: https://raw.githubusercontent.com/wikimedia/mediawiki-event-schemas/bc8ba8281d7adb7a51771c2068b8eda25dc38eeb/jsonschema/mediawiki/recentchange/1.0.0.json

Full stack trace:

org.apache.kafka.connect.errors.ConnectException: java.lang.UnsupportedOperationException: Schema type is not supported. org.everit.json.schema.CombinedSchema:{\"description\":\"ID of the recentchange event (rcid).\",\"anyOf\":[{\"type\":\"integer\"},{\"type\":\"null\"}]}
    at org.apache.kafka.connect.runtime.ConnectorConfig.transformations(ConnectorConfig.java:264)
    at org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:513)
    at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:465)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1147)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:126)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder$12.call(DistributedHerder.java:1162)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder$12.call(DistributedHerder.java:1158)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsupportedOperationException: Schema type is not supported. org.everit.json.schema.CombinedSchema:{\"description\":\"ID of the recentchange event (rcid).\",\"anyOf\":[{\"type\":\"integer\"},{\"type\":\"null\"}]}
    at com.github.jcustenborder.kafka.connect.json.FromJsonSchemaConverter.fromJSON(FromJsonSchemaConverter.java:89)
    at com.github.jcustenborder.kafka.connect.json.FromJsonSchemaConverter$ObjectSchemaConverter.lambda$fromJSON$0(FromJsonSchemaConverter.java:170)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
    at java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:352)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:483)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
    at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
    at com.github.jcustenborder.kafka.connect.json.FromJsonSchemaConverter$ObjectSchemaConverter.fromJSON(FromJsonSchemaConverter.java:165)
    at com.github.jcustenborder.kafka.connect.json.FromJsonSchemaConverter$ObjectSchemaConverter.fromJSON(FromJsonSchemaConverter.java:140)
    at com.github.jcustenborder.kafka.connect.json.FromJsonSchemaConverter.fromJSON(FromJsonSchemaConverter.java:104)
    at com.github.jcustenborder.kafka.connect.json.FromJsonSchemaConverter.fromJSON(FromJsonSchemaConverter.java:75)
    at com.github.jcustenborder.kafka.connect.json.FromJson.configure(FromJson.java:155)
    at org.apache.kafka.connect.runtime.ConnectorConfig.transformations(ConnectorConfig.java:261)
    ... 10 more
"

Seems like the way the fields are being described is breaking this schema.

Example piece of data:

{
    "$schema": "/mediawiki/recentchange/1.0.0",
    "meta": {
        "uri": "https://sl.wikipedia.org/wiki/Iztrebek",
        "request_id": "61a2da40-3fea-450f-8404-8ed5e98e784a",
        "id": "5fe0b22e-1ed1-4acd-9e06-07a39ef800e7",
        "dt": "2020-05-14T16:24:27Z",
        "domain": "sl.wikipedia.org",
        "stream": "mediawiki.recentchange",
        "topic": "eqiad.mediawiki.recentchange",
        "partition": 0,
        "offset": 2403979241
    },
    "id": 21345744,
    "type": "edit",
    "namespace": 0,
    "title": "Iztrebek",
    "comment": "vrnitev sprememb uporabnika [[Special:Contributions/176.76.241.31|176.76.241.31]] ([[User talk:176.76.241.31|pogovor]]) na zadnje urejanje uporabnika [[User:Samuele2002|Samuele2002]]",
    "timestamp": 1589473467,
    "user": "Upwinxp",
    "bot": false,
    "minor": true,
    "length": {
        "old": 561,
        "new": 1061
    },
    "revision": {
        "old": 5320868,
        "new": 5320869
    },
    "server_url": "https://sl.wikipedia.org",
    "server_name": "sl.wikipedia.org",
    "server_script_path": "/w",
    "wiki": "slwiki",
    "parsedcomment": "vrnitev sprememb uporabnika <a href=\"/wiki/Posebno:Prispevki/176.76.241.31\" title=\"Posebno:Prispevki/176.76.241.31\">176.76.241.31</a> (<a href=\"/w/index.php?title=Uporabni%C5%A1ki_pogovor:176.76.241.31&amp;action=edit&amp;redlink=1\" class=\"new\" title=\"Uporabniški pogovor:176.76.241.31 (stran ne obstaja)\">pogovor</a>) na zadnje urejanje uporabnika <a href=\"/wiki/Uporabnik:Samuele2002\" title=\"Uporabnik:Samuele2002\">Samuele2002</a>"
}