jaredpetersen / kafka-connect-arangodb

🥑 Kafka connect sink connector for ArangoDB
https://www.confluent.io/connector/kafka-connect-arangodb/
MIT License
30 stars 8 forks source link

Raw JSON import causes java.lang.NullPointerException #12

Open ttrading opened 3 years ago

ttrading commented 3 years ago

I just downloaded ArangoDB 3.7.10 and kafka-connect-arangodb sink connector on the server as our Kafka Broker to test the integration (OS:ubuntu20). Use the quick start guide to start everything but somehow getting an error when I produce the following JSON message to the designated topic:

{ "type": "Point", "time": 1560507792000, "value": 5 }

My quickstart-kafka-connect-arangodb-sink property file is as follows:

# Kafka settings
name=arangodb-sink
connector.class=io.github.jaredpetersen.kafkaconnectarangodb.sink.ArangoDbSinkConnector
tasks.max=1

# Topics to consume from (comma-separated for a list of multiple topics)
topics=test

# ArangoDB sink configuration
arangodb.host=10.10.10.195
arangodb.port=8529
arangodb.user=root
arangodb.password=xxxxxx
arangodb.database.name=webperformance

# Optional transformers
#transforms=cdc
#transforms.cdc.type=io.github.jaredpetersen.kafkaconnectarangodb.sink.transforms.Cdc

In my standalone kafka connect config file, I set the schema convert options to "false" for key and value. I wish I could give me details but the following is the only thing I see in the output of kafka-connect:

 (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:361)
[2021-04-06 20:02:14,583] INFO ConsumerConfig values:
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
        bootstrap.servers = [localhost:9092]
        check.crcs = true
        client.dns.lookup = use_all_dns_ips
        client.id = connector-consumer-arangodb-sink-0
        client.rack =
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = connect-arangodb-sink
        group.instance.id = null
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        internal.throw.on.fetch.stable.offset.unsupported = false
        isolation.level = read_uncommitted
        key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        security.providers = null
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        socket.connection.setup.timeout.max.ms = 127000
        socket.connection.setup.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
        ssl.endpoint.identification.algorithm = https
        ssl.engine.factory.class = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.certificate.chain = null
        ssl.keystore.key = null
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.3
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.certificates = null
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 (org.apache.kafka.clients.consumer.ConsumerConfig:361)
[2021-04-06 20:02:14,633] WARN The configuration 'metrics.context.connect.kafka.cluster.id' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig:369)
[2021-04-06 20:02:14,633] INFO Kafka version: 2.7.0 (org.apache.kafka.common.utils.AppInfoParser:119)
[2021-04-06 20:02:14,633] INFO Kafka commitId: 448719dc99a19793 (org.apache.kafka.common.utils.AppInfoParser:120)
[2021-04-06 20:02:14,633] INFO Kafka startTimeMs: 1617739334633 (org.apache.kafka.common.utils.AppInfoParser:121)
[2021-04-06 20:02:14,646] INFO Created connector arangodb-sink (org.apache.kafka.connect.cli.ConnectStandalone:112)
[2021-04-06 20:02:14,647] INFO [Consumer clientId=connector-consumer-arangodb-sink-0, groupId=connect-arangodb-sink] Subscribed to topic(s): test (org.apache.kafka.clients.consumer.KafkaConsumer:961)
[2021-04-06 20:02:14,648] INFO task config: {connector.class=io.github.jaredpetersen.kafkaconnectarangodb.sink.ArangoDbSinkConnector, arangodb.port=8529, arangodb.database.name=webperformance, task.class=io.github.jaredpetersen.kafkaconnectarangodb.sink.ArangoDbSinkTask, tasks.max=1, topics=test, arangodb.host=10.10.10.195, name=arangodb-sink, arangodb.password=1advtec2, arangodb.user=root} (io.github.jaredpetersen.kafkaconnectarangodb.sink.ArangoDbSinkTask:41)
[2021-04-06 20:02:14,648] INFO initial config: {connector.class=io.github.jaredpetersen.kafkaconnectarangodb.sink.ArangoDbSinkConnector, arangodb.port=8529, arangodb.database.name=webperformance, task.class=io.github.jaredpetersen.kafkaconnectarangodb.sink.ArangoDbSinkTask, tasks.max=1, topics=test, arangodb.host=10.10.10.195, name=arangodb-sink, arangodb.password=1advtec2, arangodb.user=root} (io.github.jaredpetersen.kafkaconnectarangodb.sink.config.ArangoDbSinkConfig:56)
[2021-04-06 20:02:14,726] INFO JsonConverterConfig values:
        converter.type = value
        decimal.format = BASE64
        schemas.cache.size = 1000
        schemas.enable = false
 (org.apache.kafka.connect.json.JsonConverterConfig:361)
[2021-04-06 20:02:14,728] INFO WorkerSinkTask{id=arangodb-sink-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:309)
[2021-04-06 20:02:14,751] INFO [Consumer clientId=connector-consumer-arangodb-sink-0, groupId=connect-arangodb-sink] Cluster ID: L0KJ7-s_RpScEldM7-SUTw (org.apache.kafka.clients.Metadata:279)
[2021-04-06 20:02:14,752] INFO [Consumer clientId=connector-consumer-arangodb-sink-0, groupId=connect-arangodb-sink] Discovered group coordinator questdb:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:847)
[2021-04-06 20:02:14,755] INFO [Consumer clientId=connector-consumer-arangodb-sink-0, groupId=connect-arangodb-sink] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:540)
[2021-04-06 20:02:14,771] INFO [Consumer clientId=connector-consumer-arangodb-sink-0, groupId=connect-arangodb-sink] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:540)
[2021-04-06 20:02:14,775] INFO [Consumer clientId=connector-consumer-arangodb-sink-0, groupId=connect-arangodb-sink] Successfully joined group with generation Generation{generationId=1, memberId='connector-consumer-arangodb-sink-0-c122edff-4722-4306-b173-1f7112cf01d7', protocol='range'} (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:596)
[2021-04-06 20:02:14,777] INFO [Consumer clientId=connector-consumer-arangodb-sink-0, groupId=connect-arangodb-sink] Finished assignment for group at generation 1: {connector-consumer-arangodb-sink-0-c122edff-4722-4306-b173-1f7112cf01d7=Assignment(partitions=[test-0])} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:626)
[2021-04-06 20:02:14,783] INFO [Consumer clientId=connector-consumer-arangodb-sink-0, groupId=connect-arangodb-sink] Successfully synced group in generation Generation{generationId=1, memberId='connector-consumer-arangodb-sink-0-c122edff-4722-4306-b173-1f7112cf01d7', protocol='range'} (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:756)
[2021-04-06 20:02:14,783] INFO [Consumer clientId=connector-consumer-arangodb-sink-0, groupId=connect-arangodb-sink] Notifying assignor about the new Assignment(partitions=[test-0]) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:276)
[2021-04-06 20:02:14,786] INFO [Consumer clientId=connector-consumer-arangodb-sink-0, groupId=connect-arangodb-sink] Adding newly assigned partitions: test-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:288)
[2021-04-06 20:02:14,793] INFO [Consumer clientId=connector-consumer-arangodb-sink-0, groupId=connect-arangodb-sink] Found no committed offset for partition test-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1354)
[2021-04-06 20:02:14,805] INFO [Consumer clientId=connector-consumer-arangodb-sink-0, groupId=connect-arangodb-sink] Resetting offset for partition test-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[questdb:9092 (id: 0 rack: null)], epoch=0}}. (org.apache.kafka.clients.consumer.internals.SubscriptionState:396)
[2021-04-06 20:03:09,353] INFO writing 1 record(s) (io.github.jaredpetersen.kafkaconnectarangodb.sink.ArangoDbSinkTask:74)
[2021-04-06 20:03:09,355] ERROR WorkerSinkTask{id=arangodb-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: null (org.apache.kafka.connect.runtime.WorkerSinkTask:612)
java.lang.NullPointerException
        at io.github.jaredpetersen.kafkaconnectarangodb.sink.writer.RecordConverter.getKey(RecordConverter.java:69)
        at io.github.jaredpetersen.kafkaconnectarangodb.sink.writer.RecordConverter.convert(RecordConverter.java:42)
        at io.github.jaredpetersen.kafkaconnectarangodb.sink.ArangoDbSinkTask.lambda$put$0(ArangoDbSinkTask.java:78)
        at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
        at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
        at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
        at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
        at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
        at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
        at io.github.jaredpetersen.kafkaconnectarangodb.sink.ArangoDbSinkTask.put(ArangoDbSinkTask.java:79)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
[2021-04-06 20:03:09,358] ERROR WorkerSinkTask{id=arangodb-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:187)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.NullPointerException
        at io.github.jaredpetersen.kafkaconnectarangodb.sink.writer.RecordConverter.getKey(RecordConverter.java:69)
        at io.github.jaredpetersen.kafkaconnectarangodb.sink.writer.RecordConverter.convert(RecordConverter.java:42)
        at io.github.jaredpetersen.kafkaconnectarangodb.sink.ArangoDbSinkTask.lambda$put$0(ArangoDbSinkTask.java:78)
        at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
        at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
        at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
        at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
        at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
        at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
        at io.github.jaredpetersen.kafkaconnectarangodb.sink.ArangoDbSinkTask.put(ArangoDbSinkTask.java:79)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
        ... 10 more
[2021-04-06 20:03:09,360] INFO [Consumer clientId=connector-consumer-arangodb-sink-0, groupId=connect-arangodb-sink] Revoke previously assigned partitions test-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:307)
[2021-04-06 20:03:09,360] INFO [Consumer clientId=connector-consumer-arangodb-sink-0, groupId=connect-arangodb-sink] Member connector-consumer-arangodb-sink-0-c122edff-4722-4306-b173-1f7112cf01d7 sending LeaveGroup request to coordinator questdb:9092 (id: 2147483647 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:1029)
[2021-04-06 20:03:09,367] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:668)
[2021-04-06 20:03:09,367] INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:672)
[2021-04-06 20:03:09,367] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:678)
[2021-04-06 20:03:09,374] INFO App info kafka.consumer for connector-consumer-arangodb-sink-0 unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
^C[2021-04-06 20:03:37,502] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:67)
[2021-04-06 20:03:37,512] INFO Stopping REST server (org.apache.kafka.connect.runtime.rest.RestServer:327)
[2021-04-06 20:03:37,518] INFO Stopped http_8083@66d23e4a{HTTP/1.1, (http/1.1)}{0.0.0.0:8083} (org.eclipse.jetty.server.AbstractConnector:381)
[2021-04-06 20:03:37,518] INFO node0 Stopped scavenging (org.eclipse.jetty.server.session:149)
[2021-04-06 20:03:37,523] INFO REST server stopped (org.apache.kafka.connect.runtime.rest.RestServer:344)
[2021-04-06 20:03:37,524] INFO Herder stopping (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:102)
[2021-04-06 20:03:37,525] INFO Stopping task arangodb-sink-0 (org.apache.kafka.connect.runtime.Worker:836)
[2021-04-06 20:03:37,532] INFO Stopping connector arangodb-sink (org.apache.kafka.connect.runtime.Worker:387)
[2021-04-06 20:03:37,532] INFO Scheduled shutdown for WorkerConnector{id=arangodb-sink} (org.apache.kafka.connect.runtime.WorkerConnector:249)
[2021-04-06 20:03:37,533] INFO Completed shutdown for WorkerConnector{id=arangodb-sink} (org.apache.kafka.connect.runtime.WorkerConnector:269)
[2021-04-06 20:03:37,533] INFO Worker stopping (org.apache.kafka.connect.runtime.Worker:209)
[2021-04-06 20:03:37,534] INFO Stopped FileOffsetBackingStore (org.apache.kafka.connect.storage.FileOffsetBackingStore:66)
[2021-04-06 20:03:37,534] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:668)
[2021-04-06 20:03:37,534] INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:672)
[2021-04-06 20:03:37,545] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:678)
[2021-04-06 20:03:37,546] INFO App info kafka.connect for 127.0.1.1:8083 unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
[2021-04-06 20:03:37,546] INFO Worker stopped (org.apache.kafka.connect.runtime.Worker:230)
[2021-04-06 20:03:37,547] INFO Herder stopped (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:120)
[2021-04-06 20:03:37,547] INFO Kafka Connect stopped (org.apache.kafka.connect.runtime.Connect:72)
jaredpetersen commented 3 years ago

Can you provide the exact Kafka record that you're sending?

If it's literally { "type": "Point", "time": 1560507792000, "value": 5 } the problem is that you are sending Kafka records without a key.

Kafka Connect ArangoDB currently relies on the Kafka record key to create documents. However, it looks like this isn't required by ArangoDB so this is something we'll have to fix.