confluentinc / kafka-connect-datagen

Connector that generates data for demos
Apache License 2.0
20 stars 87 forks source link

Cannot use arbitrary AVRO schema file with datagen #22

Closed gnschenker closed 5 years ago

gnschenker commented 5 years ago

Line https://github.com/confluentinc/kafka-connect-datagen/blob/0.1.x/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java#L120 does not allow to use an arbitrary AVRO schema file at say /avro/product.avsc

Might want to use getClass().getResourceAsStream(schemaFilename) instead...

Java docu:

The methods in ClassLoader use the given String as the name of the resource without applying any absolute/relative transformation (cf. the methods in Class). The name should not have a leading “/”.

ybyzek commented 5 years ago

Summary of the issue

The goal is to allow the code to access both:

  1. a bundled avro file from the resources subdirectory, as allowed today
  2. a user-defined avro file from any arbitrary location. The reason is: users who are downloading the connector jar files (e.g. from Confluent Hub) who are not building their own jars, they don't have the ability to copy their user-defined avro file to the resources subdirectory (if I'm incorrect in this assumption, please let me know).

But right now it appears to work for the first use case but not the second use case.

Question

How can we get this to work with existing code, or do we need to modify the code somehow?

Test

I tried setting CLASSPATH to include the path to the avro file, with the hopes it would allow for both

export CLASSPATH=/Users/yeva/Desktop/Temp

Then start Confluent Platform with the Confluent CLI, however the same error is generated:

[2019-01-04 14:48:32,132] ERROR WorkerSourceTask{id=datagen-users1-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
java.lang.NullPointerException
    at org.apache.avro.Schema.parse(Schema.java:1225)
    at org.apache.avro.Schema$Parser.parse(Schema.java:1032)
    at org.apache.avro.Schema$Parser.parse(Schema.java:1004)
    at io.confluent.avro.random.generator.Generator.<init>(Generator.java:218)
    at io.confluent.kafka.connect.datagen.DatagenTask.start(DatagenTask.java:121)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:198)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
[2019-01-04 14:48:32,133] ERROR WorkerSourceTask{id=datagen-users1-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)

@rhauch, @mikebin , any suggestions?

Connector config /tmp/datagen-users.json

{
    "name": "datagen-users1",
    "config": {
      "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
      "kafka.topic": "users",
      "schema.filename": "users.avro.original",
      "schema.keyfield": "userid",
      "key.converter": "org.apache.kafka.connect.storage.StringConverter",
      "value.converter": "io.confluent.connect.avro.AvroConverter",
      "value.converter.schema.registry.url": "http://localhost:8081",
      "value.converter.schemas.enable": "false",
      "max.interval": 1000,
      "iterations": 10000000,
      "tasks.max": "1"
    }
}

posted with curl -X POST -H "Content-Type: application/json" --data @/tmp/datagen-users.json http://localhost:8083/connectors

Note the specific line:

      "schema.filename": "users.avro.original",

I also tried absolute path:

      "schema.filename": "/Users/yeva/Desktop/Temp/users.avro.original",
mikebin commented 5 years ago

If you want users to be able to specify an external location outside the classpath, then I think two conditional blocks of code might be required - one for checking on the classpath, one for checking an arbitrary local file. Maybe I'm overlooking something obvious.

To keep things simpler and less dependent on each worker's local filesystem, it might be useful to recommend that users copy their custom Avro schema to a location in the kafka-connect-datagen plugin path, then use one of the following options to load from a location relative to the classpath:

getClass().getResourceAsStream(schemaFilename)

or:

Thread.currentThread().getContextClassLoader().getResourceAsStream(schemaFilename)

Spring has a nice abstraction for loading resources from all kinds of different locations like URLs, local file system, classpath, etc, but not sure if you want to introduce that as a dependency: https://docs.spring.io/spring/docs/5.1.4.BUILD-SNAPSHOT/spring-framework-reference/core.html#resources

ybyzek commented 5 years ago

If you want users to be able to specify an external location outside the classpath...

That isn't a requirement, though I made it confusing because I did write above "a user-defined avro file from any arbitrary location".

Stepping back, it would be helpful to identify two solutions:

  1. works with current code. Why didn't my CLASSPATH setting work above?
  2. works with truly arbitrary path, which likely requires code enhancements

Do you have ideas on why (1) didn't work for me?

ybyzek commented 5 years ago

@mikebin , as an alternative to setting CLASSPATH to an arbitrary location, I've also tried putting the avro file into $CONFLUENT_HOME/share/confluent-hub-components/confluentinc-kafka-connect-datagen/ path and that also fails.

ybyzek commented 5 years ago

@mikebin , @gnschenker , I got the following to work with current code

  1. Copy your avro file to a directory
  2. Set CONNECT_CLASSPATH to that directory (export CONNECT_CLASSPATH=</path/to/your/avro/file>)
  3. Start Confluent CLI (or restart just connect)

@gnschenker , can you please verify if this works for you?

ybyzek commented 5 years ago

I have added to the README to explain how to make this work with current code: https://github.com/confluentinc/kafka-connect-datagen#define-a-new-schema-specification via PR #20 . As a result, I believe no further change is necessary.

mamash commented 5 years ago

I have added to the README to explain how to make this work with current code: https://github.com/confluentinc/kafka-connect-datagen#define-a-new-schema-specification via PR #20 . As a result, I believe no further change is necessary.

I wish some of this had helped me, but my connector is still crashing the moment I add a custom schema, no matter what I set in CONNECT_CLASSPATH. I.e. when I do exactly all of that is suggested in the 527e343 doc change, I still end up with the exception throw mentioned above.

ybyzek commented 5 years ago

@mamash -- I just verified it works on CP 5.2.1 with kafka-connect-datagen 0.1.0.

For your test, please post output/values for:

mamash commented 5 years ago

Thanks for the help, see below. I also tried using just directory path rather than file path for CONNECT_CLASSPATH, and filename vs. file path for schema.filename.

# curl http://localhost:8083/connectors/datagen-users | jq
{
  "name": "datagen-users",
  "config": {
    "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
    "tasks.max": "1",
    "value.converter.schemas.enable": "false",
    "name": "datagen-users",
    "kafka.topic": "users",
    "schema.keyfield": "userid",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "schema.filename": "/opt/schemas/schema.avsc"
  },
  "tasks": [
    {
      "connector": "datagen-users",
      "task": 0
    }
  ],
  "type": "source"
}

# docker-compose exec connect bash -c 'cat $CONNECT_CLASSPATH' | jq
{
  "namespace": "ksql",
  "name": "users",
  "type": "record",
  "fields": [
    {
      "name": "registertime",
      "type": {
        "type": "long",
        "arg.properties": {
          "range": {
            "min": 1487715775521,
            "max": 1519273364600
          }
        }
      }
    },
    {
      "name": "userid",
      "type": {
        "type": "string",
        "arg.properties": {
          "regex": "User_[1-9]{0,1}"
        }
      }
    },
    {
      "name": "regionid",
      "type": {
        "type": "string",
        "arg.properties": {
          "regex": "Region_[1-9]?"
        }
      }
    },
    {
      "name": "gender",
      "type": {
        "type": "string",
        "arg.properties": {
          "options": [
            "MALE",
            "FEMALE",
            "OTHER"
          ]
        }
      }
    }
  ]
}
ybyzek commented 5 years ago

@mamash I had validated local CP install, not Docker. Switching over to Docker now, I can reproduce the same behavior and error message as you. I've tried setting both CLASSPATH and CONNECT_CLASSPATH.

cc-ing @confluentinc/connect team for other suggestions

mamash commented 5 years ago

Thanks for confirming.

ybyzek commented 5 years ago

@mamash actually when I did the Docker test, I made the same mistake that is shown in your output above. I just verified Docker works.

Please try the following, note that the filename is not absolute:

    "schema.filename": "/opt/schemas/schema.avsc"

to

    "schema.filename": "schema.avsc"
urspralini commented 5 years ago

@ybyzek I confirmed local CP install is working for arbitrary avro file. it took a while to digest the information from README. Looking into docker now.

urspralini commented 5 years ago

@ybyzek I looked into docker now, I still see the same NPE described above. The first image shows the configuration especially the CONNECT_CLASSPATH and the schema file existence.

Screen Shot 2019-04-08 at 10 06 52 PM

Below is the datagen-connector config from curl -X GET http://localhost:8083/connectors/datagen-gps-event: { "name": "datagen-gps-event", "config": { "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", "value.converter.schema.registry.url": "http://localhost:8081", "tasks.max": "1", "name": "datagen-gps-event", "kafka.topic": "gps_events", "value.converter": "io.confluent.connect.avro.AvroConverter", "max.interval": "100", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "iterations": "100000", "schema.filename": "gps_events.avro" }, "tasks": [ { "connector": "datagen-gps-event", "task": 0 } ], "type": "source" }

The schema file name is set to "gps_events.avro". I'm not sure what am I missing?

mamash commented 5 years ago

Please try the following, note that the filename is not absolute:

That's one of the variations I tried earlier too, but to no avail. Just tried it again for sanity (re-registered the datagen configuration, restarted the connect container). Still throws an exception no matter what. No idea where we're doing something different.

ybyzek commented 5 years ago

@mamash here are the step-by-step instructions to reproduce what worked in my environment. When you have the time, can you please try these exact steps?

  1. Start with https://github.com/confluentinc/cp-docker-images/tree/5.2.1-post/examples/cp-all-in-one/docker-compose.yml

  2. Make the following changes to docker-compose.yml:

cp-all-in-one(5.2.1-post) ✗: git diff
diff --git a/examples/cp-all-in-one/docker-compose.yml b/examples/cp-all-in-one/docker-compose.yml
index b2011ac..327b420 100644
--- a/examples/cp-all-in-one/docker-compose.yml
+++ b/examples/cp-all-in-one/docker-compose.yml
@@ -60,6 +60,8 @@ services:
       - schema-registry
     ports:
       - "8083:8083"
+    volumes:
+      - $PWD/orders_schema.avro:/tmp/orders_schema.avro
     environment:
       CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
       CONNECT_REST_ADVERTISED_HOST_NAME: connect
@@ -78,6 +80,7 @@ services:
       CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
       CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
       CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
+      CONNECT_CLASSPATH: /tmp
       # Assumes image is based on confluentinc/kafka-connect-datagen:latest which is pulling 5.1.1 Connect image
       CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.2.1.jar
       CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
  1. Download this file to the same directory: https://github.com/confluentinc/kafka-connect-datagen/blob/0.1.x/src/main/resources/orders_schema.avro

  2. Create this file:

cp-all-in-one(5.2.1-post) ✗: cat /tmp/submit_connector_orders.json
{
  "name": "datagen-orders",
  "config": {
    "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
    "kafka.topic": "orders",
    "schema.filename": "orders_schema.avro",
    "schema.keyfield": "orderid",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.schemas.enable": "false",
    "max.interval": 1000,
    "iterations": 10000000,
    "tasks.max": "1"
  }
}
  1. Bring up the Docker environment with docker-compose up -d --build

  2. When Connect is ready, submit the connector

curl -X POST -H "Content-Type: application/json" --data @/tmp/submit_connector_orders.json http://localhost:8083/connectors
urspralini commented 5 years ago

@ybyzek It is working for you because "orders_schema.avro" file is also available in the resources folder of kafka-connect-datagen. When I try to copy the content of "orders_schema.avro" into another file say "gps.avro", I'm still getting the NPE. Could you please try the same with a different file name.

ybyzek commented 5 years ago

@urspralini very good catch! You are absolutely correct. I now get a NPE when I change the name :(

ybyzek commented 5 years ago

I've instrumented the connector itself to print the files in the classpath, and the schema file is not found. This is consistent with the behavior we are seeing, but I still can't explain why.

I've tried to place the schema in a few different locations (including /usr/share/java/kafka) and it's not getting picked up anywhere.

abbhatia commented 5 years ago

I am facing the same issue. I have tried to put the schema at various places(including the classpath setting mentioned above) but in vain

ybyzek commented 5 years ago

Here is a PR that addresses the issue (I verified with Docker in my setup): https://github.com/confluentinc/kafka-connect-datagen/pull/25 (It's not a "fix" per se, because I still don't know what the actual problem is. The PR just circumvents the problem)

Is anyone here willing to pull the code and confirm?

ybyzek commented 5 years ago

25 has been validated by two users and merged.

The updated kafka-connect-datagen version 0.1.2 has been posted to Confluent Hub: https://www.confluent.io/connector/kafka-connect-datagen/

fred-jan commented 5 years ago

It's working now using a custom schema file referenced by an absolute path. Many thanks for fixing this! :ok_hand:

elabbassiwidad commented 4 years ago

I tried in my prject but doesn't work, i tried the absolute path and all the solutions listed above, any recommendation :

image

i tried the put the absolute path at first, then just the name of the file while expoerting the CONNECT_CLASSPATH, but i alway get the same error: image it can't read the file, i would really appreciate someone's help

C0urante commented 4 years ago

@elabbassiwidad which version are you running?

elabbassiwidad commented 4 years ago

@elabbassiwidad which version are you running?

i am running the same script of docker-compose.yml , so for connect i'm using the image confluentinc/kafka-connect-datagen:0.2.0 , and i already tried the version 0.3, but always getting the same error, (No such file or Directory), I dont know what i am missing ?

C0urante commented 4 years ago

@elabbassiwidad have you copied the schema file onto the docker container you're running, at the path used for the schema.filename property in the connector config?

elabbassiwidad commented 4 years ago

@elabbassiwidad have you copied the schema file onto the docker container you're running, at the path used for the schema.filename property in the connector config?

I don't understand, the schema file sensor.avsc is in my current system as well as the connector.config , i did not place the file inside any container, do i need to create the file inside the container ? if yes, in which container ?

C0urante commented 4 years ago

If you're running the connector inside a docker container, you need to copy the schema file into that container. If you're using this docker-compose file, that'd be the container named connect.

elabbassiwidad commented 4 years ago

If you're running the connector inside a docker container, you need to copy the schema file into that container. If you're using this docker-compose file, that'd be the container named connect.

Finally, it worked, thank you so much, i placed it in the broker and the schema registry containers before, but didn't think it needed to be placed in the connect container, thank you again i've been stuck with this problem for more than 5 days

C0urante commented 4 years ago

That's great to hear! Glad I could help 😃

MED-SALAH commented 4 years ago

Hello @ybyzek , Can i use Datagen to generate data from nested avro with this example of avro (because it not working with me with this type of structure when, in the fields i have other records ) : { "type" : "record", "name" : "data" "fields" : [ {"name": "id", "type": "string"}, {"name": "code", "type": "string"}, {"name": "referer",
"type": [ { "type" : "record", "name" : "refer1", "fields" : [ {"name": "num", "type": "string"}, {"name": "add", "type": ["string","null"]}, ] }, { "type" : "record", "name" : "refer2", "fields" : [ {"name": "num", "type": "string"}, {"name": "add", "type": ["string","null"]}, ] } ] } ] } In the fields i have other records ? Thanks

elabbassiwidad commented 4 years ago

Hello @ybyzek , Can i use Datagen to generate data from nested avro with this example of avro (because it not working with me with this type of structure when, in the fields i have other records ) : { "type" : "record", "name" : "data" "fields" : [ {"name": "id", "type": "string"}, {"name": "code", "type": "string"}, {"name": "referer", "type": [ { "type" : "record", "name" : "refer1", "fields" : [ {"name": "num", "type": "string"}, {"name": "add", "type": ["string","null"]}, ] }, { "type" : "record", "name" : "refer2", "fields" : [ {"name": "num", "type": "string"}, {"name": "add", "type": ["string","null"]}, ] } ] } ] } In the fields i have other records ? Thanks

I'm not an expert in Avro, but first I think, that you should remove the comma from the end of the fields, because normally it's used to seperate the attributes and since the last attribute has nothing to follow it, then no need to put a comma :

{ "type" : "record", "name" : "data", "fields" : [ {"name": "id", "type": "string"}, {"name": "code", "type": "string"}, {"name": "referer", "type": [ { "type" : "record", "name" : "refer1", "fields" : [ {"name": "num", "type": "string"}, {"name": "add", "type": ["string","null"]} ] }, { "type" : "record", "name" : "refer2", "fields" : [ {"name": "num", "type": "string"}, {"name": "add", "type": ["string","null"]} ] } ] } ] }

MED-SALAH commented 4 years ago

Thank you @elabbassiwidad yes I corrected it because her it just an example, it's not my real case, but it does not work too, it works only if I remove the record from the fields, i don't now why, https://github.com/confluentinc/avro-random-generator

elabbassiwidad commented 4 years ago

Thank you @elabbassiwidad yes I corrected it because her it just an example, it's not my real case, but it does not work too, it works only if I remove the record from the fields, i don't now why, https://github.com/confluentinc/avro-random-generator

Well i'm not familiar with the nested avro schema, but I just saw an example now : https://docs.informatica.com/data-integration/powerexchange-cdc-publisher/1-1/user-guide/avro-schema-formats/avro-nested-schema-format.html

And if your goal is to create multiple column in each record which i believe in your case: (referer has multiple columns : refer1[num,add], refer2[num,add] ...) , then I think the way you defined the schema is wrong. and what you're looking for is more something like this : https://docs.informatica.com/data-integration/powerexchange-cdc-publisher/1-1/user-guide/avro-schema-formats/avro-generic-schema-format.html

and Again, i'm not sure

MED-SALAH commented 4 years ago

@elabbassiwidad I try with this but nothing happened : { "type" : "record", "name" : "data", "fields" : [ {"name": "id", "type": "string"}, {"name": "code", "type": "string"}, {"name": "referer",
"type": [ { "name" : "refer1", "type" : "record", "fields" : [ {"name": "num", "type": "string"}, {"name": "add", "type": ["string","null"]} ] }, { "name" : "refer2", "type" : "record", "fields" : [ {"name": "num", "type": "string"}, {"name": "add", "type": ["string","null"]} ] } ] } ] }

MED-SALAH commented 4 years ago

https://docs.informatica.com/data-integration/powerexchange-cdc-publisher/1-1/user-guide/avro-schema-formats/avro-nested-schema-format.html

MED-SALAH commented 4 years ago

@elabbassiwidad have a producer but i need to generate more intensive data for simulation using datagen provided by confluent.

zaidyahya commented 4 years ago

@MED-SALAH

Were you able to create the complex data for your use-case? I'm trying to do the same but unable to construct an array of records.

My aim is a structure like this,

class Event {
    String name
    Address address
    List<Reference> references
}

where

class Address {
    String street
}

class Reference {
    String age
}

My AVRO schema looks like this,

{
        "namespace": "ksql",
        "name": "event",
        "type": "record",
        "fields": [
            {   "name": "name", 
                "type": {
                    "type": "string",
                    "arg.properties": {
                        "options": [
                            "JaneDoe",
                            "TestUser"
                        ]
                    }
                }
            },
            {   "name": "address",
                "type": {
                    "name": "address",
                    "type": "record",
                    "fields": [
                        {
                            "name": "street",
                            "type": {
                                "type": "string",
                                "arg.properties": {
                                    "regex":"Account_[1-9]{0,2}"
                                }
                            }
                        }
                    ]
                }
            },
            {
                "name": references,
                "type": {
                    "type": "array",
                    "items": {
                        "name": "reference",
                        "type": {
                            "name": "reference",
                            "type": "record",
                            "fields": [
                                {
                                    "name": "age",
                                    "type": {
                                        "type": "string",
                                        "arg.properties": {
                                            "options": [
                                                "13", "14"
                                            ]
                                        }
                                    }
                                }
                            ]
                        }
                    }
                }
            }
        ]
}

I get the error No type: {"name":"reference","type":{"name":"reference","type":"record","fields":[{"name":"age","type":{"type":"string","arg.properties":{"options":["13","14"]}}}]}}

I have tried different ways of defining the array, such as removing the first name and type in items. But that gets me this error, org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type STRUCT: class org.apache.avro.generic.GenericData$Record for field: "null"

A single record property works fine i.e. address property, but I can't construct an array of records. Could any point out what I'm doing wrong?