lensesio / fast-data-dev

Kafka Docker for development. Kafka, Zookeeper, Schema Registry, Kafka-Connect, Landoop Tools, 20+ connectors
https://lenses.io
Apache License 2.0
2.02k stars 331 forks source link

unexpected schema not found error #22

Closed costa closed 7 years ago

costa commented 7 years ago

Hi, I'm still trying to use this docker image, with a trivial goal of connecting Kafka to ElasticSearch. I'm using http://docs.datamountaineer.com/en/latest/elastic.html — which I don't seem to find a (github) repo for. The most recent issue which I'm stuck with is very much like https://github.com/confluentinc/kafka-connect-elasticsearch/issues/59

I'm not sure I'm up to debugging the connector, which also kinda defies the purpose of your work... Any tips or references will be appreciated. Thanks!

stheppi commented 7 years ago

@costa, The github repo for the source code is here:https://github.com/datamountaineer/stream-reactor Here is the link to our testing and example: http://coyote.landoop.com/connect/kafka-connect-elasticsearch/

Now to your error. I am pretty sure your ruby code doesn't do the right thing when registering the schema. Before i go further, Avro is the way to go so it's not unfortunately you are doing the right thing. Let's say your topic is called magic-elastic. Now your ruby component needs to register two entries in schema registry: magic-elastic-keyand magic-elastic-value. You can easily check on the schema registry endpoint: http://machine:8080/subjects

Let us know your findings and how we can help further.

costa commented 7 years ago

Hey @stheppi This is the schema which is registered with the avro component (as seen from within a fast-data-dev container):

$ curl localhost:8081/subjects/com.bandmanage.RawTelemetry/versions
[1]
$ curl localhost:8081/schemas/ids/1
{"schema":"{\"type\":\"record\",\"name\":\"RawTelemetry\",\"namespace\":\"com.bandmanage\",\"fields\":[{\"name\":\"deviceType\",\"type\":\"string\"},{\"name\":\"deviceId\",\"type\":\"string\"},{\"name\":\"ts\",\"type\":\"long\"},{\"name\":\"json\",\"type\":\"string\"}]}"}

So, I don't see a reason for the error after I wait for the schema to be present prior to bringing up the ElasticSinkConnector.

Antwnis commented 7 years ago

Hi @costa the error is in the details. You subject is called com.bandmanage.RawTelemetry

It should instead have either the -value suffix or -key suffix to indicate whether it's the schema of the K(ey) or V(alue)

Register the schema as com.bandmanage.RawTelemetry-value and it should most prob work out :)

To be a bit more precise - the serializers/deserializers expect the schema to match

TOPICNAME-value or TOPICNAME-key

So com.bandmanage.RawTelemetry-value will work only if your topic is called com.bandmanage.RawTelemetry

andmarios commented 7 years ago

And the subject name, as @stheppi said, should be the topic name plus the suffix.

costa commented 7 years ago

@Antwnis Hey, thanks, I'll try that ASAP, however, the original error would be quite confusing then, because it clearly declares failure fetching the schema with the id=1. I'm defining which schema to use when configuring the connector, so I will be surprised (which is a rather bad thing) if your suggestion will really remedy the situation. Cc @andmarios

stheppi commented 7 years ago

We migt do a PR into Confluent serializer to make it explicit why the id is not found. Is either the schema registry is unreachable or the entries are not registered correctly like in your case.

Antwnis commented 7 years ago

@costa Talking about elastic-search we just published a nice blog regarding Kafka-Connect & the particular connector you are trying out. Maybe it's of interest --> http://www.landoop.com/blog/2017/04/kafka-connect-elasticsearch/

costa commented 7 years ago

@stheppi et al. I tried changing the name of the schema, first, without changing the 'name' in the schema, so the schema was:

{
    "type": "record",
    "name": "RawTelemetry",
    "doc": "Sensor readings as sent from terminal devices",
    "namespace": "com.bandmanage",
    "fields": [
        {
            "name": "deviceType",
            "type": "string"
        },
        {
            "name": "deviceId",
            "type": "string"
        },
        {
            "name": "ts",
            "type": "long"
        },
        {
            "name": "json",
            "type": "string"
        }
    ]
}

and the avro_turf ruby gem didn't accept it. Then I changed the "name" in the schema to "RawTelemetry-value", and the schema registry POST — by the same gem, automatically, when producing a message gave me 422 (probably because "name" could not contain dashes). Note that originally, the schema was registered successfully. I tried digging any info on the necessity of the "-value" suffix, including datamountaineer's source code, but couldn't find anything decisive. The connector is configured (as actually fetched from the fast-data-dev mgmt API):

{
  "connector.class": "com.datamountaineer.streamreactor.connect.elastic.ElasticSinkConnector",
  "type.name": "com.bandmanage.RawTelemetry",
  "connect.elastic.sink.kcql": "INSERT INTO raw-telemetry SELECT * FROM RawTelemetry",
  "tasks.max": "1",
  "topics": "RawTelemetry",
  "connect.elastic.url": "elasticsearch://elasticsearch:9300",
  "name": "RawTelemetry-ElasticSinkConnector",
  "connect.elastic.cluster.name": "elasticsearch"
}

... I've just realised ElasticSinkConfig might not really use "type.name" parameter (despite gladly having it in the config) and what you were saying the schema name must actually be suffixed with "-value" makes more sense to me now. Well if this is the case, this connector is not compatible with the major ruby kafka messaging implementation, and I'll have to try and use the Confluent's Elastic connector...

stheppi commented 7 years ago

@costa You should first have a read at https://github.com/confluentinc/schema-registry and http://docs.confluent.io/3.2.0/schema-registry/docs/. On confluent page go and read about subjects and compatibility.

The schema name can't have dashes inded. And here is where the confusion is. I hope you do read the above before reading this :)

Example: You push data in avro to a kafka topic. Both the key and the value of a message is avro and you want to rely onschema registry (industry standard). The serializer (not sure what your ruby lib does) would check two entries in schema registey subjects. Http://..:8081/subjects/RawTelemetry-key and htpp://..8081/subjects/RawTelemetry-value to get their id and latest version (for the schema). This is done so that the avro values you are trying to publish are in sync with the required schema. If they are not there would be an error if all is good the key and the value will be serialized to avro (binary) . The byte arrays produced will have a header each containing the id for the two subjects. When deserialization happens it reads the header gets the schema id for key and value goes to the schema registry( if it doesn't have the dat in the cache) and pulls the schemas. Based on the schema it reads the binary and return an avro instance.

We don't support ruby frameworks and don't think confluent does either. So a lib you use needs to do all the above to join the Kafka ecosystem. Alternatively you can always publish your data via kafk -rest proxy which does all that at the price of sending data via http before it's send to kafka.

I hope by now this ecosystem is a lot clear to you. Let us know how it goes.

stheppi commented 7 years ago

You can also go with Json payloads if your lib doesn't handle schemas registry correctly. The sink supports that as well. We don't advise going away from avro but it is an option

costa commented 7 years ago

@stheppi thank you again. I've filed an issue with avro_turf (above), but I'm not sure it will be resolved with the libs I'm using.

So one direction for me is to tail Mongo (e.g. with a to-be-written MongoProducer Scala microservice) or use some other mediator which supports ruby (yes, I still think (j)ruby should be able to talk to Kafka, and no, I don't have too much time to invest in that support myself, unfortunately).

Another direction is to go with schema-less JSON, so I'm really interested in the option you've mentioned: how can I config the ElasticSink to go schema-less?

Antwnis commented 7 years ago

@costa You want to tail Mongo ? Have you had a look at http://debezium.io/docs/connectors/mongodb/ ? Maybe we should bring this connector into the fast-data-dev distribution / docker WDYT ?

costa commented 7 years ago

@Antwnis Hey, this connector looks really cool. Generally, I would like a setup where all events are recorded, so tailing a DB is a decent solution. MongoDB may not serve this purpose in the long run, but that's what we have with our pre-Kafka architecture. It will be really great if you absorb the connector in your Docker images, which should not be too hard with the expertise of yours. Thanks.

Antwnis commented 7 years ago

@costa Great - we are planning on a release to support 3.2 version of CP this week - that includes new connectors / new ui capabilities etc etc

Based on the road-map on the release after this one we will be bringing a loot of CDC (Change Data Capture) Kafka Connectors - including the family of Debezium connectors.

costa commented 7 years ago

Thanks all, my immediate issue is resolved, through the avro_turf ruby lib's undocumented option, so yes, ruby can talk to Kafka as of this moment.

You may close this issue or wait until that PR @stheppi mentioned is merged.

And I'll be sure to check the coming release with Mongo tailing producer once I have the architecture buzzing and will be working on stability+performance.

andmarios commented 7 years ago

Hello, I close the issue as it seems resolved. Feel free to comment or open a new one if you need further help.

Thanks for reporting the issue and following up!