fbascheper / kafka-connect-telegram

Kafka-connect telegram connector
Apache License 2.0
16 stars 5 forks source link

Failure: scala.MatchError: null #10

Open rmoff opened 5 years ago

rmoff commented 5 years ago

I get this error:

scala.MatchError: null
  at com.github.fbascheper.kafka.connect.telegram.mapper.TelegramMessageMapper.mapTelegramMessage$1(TelegramMessageMapper.scala:140)
  at com.github.fbascheper.kafka.connect.telegram.mapper.TelegramMessageMapper.mapAvroStructMessage$1(TelegramMessageMapper.scala:133)
  at com.github.fbascheper.kafka.connect.telegram.mapper.TelegramMessageMapper.map(TelegramMessageMapper.scala:158)
  at com.github.fbascheper.kafka.connect.telegram.TelegramSinkTask$$anonfun$put$1.apply(TelegramSinkTask.scala:57)
  at com.github.fbascheper.kafka.connect.telegram.TelegramSinkTask$$anonfun$put$1.apply(TelegramSinkTask.scala:57)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
  at com.github.fbascheper.kafka.connect.telegram.TelegramSinkTask.put(TelegramSinkTask.scala:57)
  at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
  at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
  at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
  at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
  at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
  at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)

Config:

{
  "name": "sink-telegram-delays-over-30min-v00",
  "config": {
    "connector.class": "com.github.fbascheper.kafka.connect.telegram.TelegramSinkConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "topics": "TRAINS_DELAYED_OVER_30MIN",
    "telegram.bot.name":"Kafka Connect Bot",
    "telegram.bot.username":"rmoff_connect_02_bot",
    "telegram.bot.api.key":"<key>",
    "telegram.bot.destination.chat.id":"-364377679"
  }
}
rmoff commented 5 years ago

Seems to be an issue with Avro. See below.

JSON

Source data:

kafkacat -b localhost:9092 -t test -C
{"hello": "world"}

Config:

curl -i -X POST -H  "Content-Type:application/json" http://localhost:38083/connectors/ \
      -d '{
    "name": "sink-telegram-test-json",
    "config": {
      "connector.class": "com.github.fbascheper.kafka.connect.telegram.TelegramSinkConnector",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "value.converter.schemas.enable": "false",
      "key.converter": "org.apache.kafka.connect.json.JsonConverter",
      "key.converter.schemas.enable": "false",
  "errors.tolerance":"all",
  "errors.deadletterqueue.topic.name":"my_dlq",
  "errors.deadletterqueue.topic.replication.factor": 1,
      "topics": "test",
      "telegram.bot.name":"Kafka Connect Bot",
      "telegram.bot.username":"rmoff_connect_02_bot",
      "telegram.bot.api.key":"xxx",
      "telegram.bot.destination.chat.id":"-364377679"
    }
  }'

Message sent, and log file shows:

INFO Using text to send telegram text-message with contents = {hello=world} (com.github.fbascheper.kafka.connect.telegram.mapper.TelegramMessageMapper)
INFO Successfully sent message with assigned message-id ArrayBuffer(7) and text '{hello=world}'  (com.github.fbascheper.kafka.connect.telegram.TelegramSinkTask)

Avro

Send the data

$ kafka-avro-console-producer \
    --broker-list localhost:9092 --topic test2_avro \
    --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"hello","type":"string"}]}'
{"hello":"world"}

Check the data:

$ kafka-avro-console-consumer \
          --bootstrap-server localhost:9092 \
          --property schema.registry.url=http://localhost:8081 \
          --topic test2_avro --from-beginning|jq -c '.'
{"hello":"world"}

Config:

curl -i -X POST -H  "Content-Type:application/json" http://localhost:38083/connectors/ \
      -d '{
    "name": "sink-telegram-avro2",
    "config": {
      "connector.class": "com.github.fbascheper.kafka.connect.telegram.TelegramSinkConnector",
  "errors.tolerance":"all",
  "errors.deadletterqueue.topic.name":"my_dlq",
  "errors.deadletterqueue.topic.replication.factor": 1,
      "topics": "test2_avro",
      "telegram.bot.name":"Kafka Connect Bot",
      "telegram.bot.username":"rmoff_connect_02_bot",
      "telegram.bot.api.key":"xxx",
      "telegram.bot.destination.chat.id":"-364377679"
    }
  }'

Log error -- Found TgMessage with type null sounds like somethings not right

INFO Converting struct back to model (com.github.fbascheper.kafka.connect.telegram.mapper.TelegramMessageMapper)
INFO Found TgMessage with type null (com.github.fbascheper.kafka.connect.telegram.mapper.TelegramMessageMapper)
ERROR WorkerSinkTask{id=sink-telegram-avro2-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually r
estarted. (org.apache.kafka.connect.runtime.WorkerSinkTask)
scala.MatchError: null
  at com.github.fbascheper.kafka.connect.telegram.mapper.TelegramMessageMapper.mapTelegramMessage$1(TelegramMessageMapper.scala:140)
  at com.github.fbascheper.kafka.connect.telegram.mapper.TelegramMessageMapper.mapAvroStructMessage$1(TelegramMessageMapper.scala:133)
[…]

Tried same config with adding "enhanced.avro.schema.support": "true", (because I found it here) but it doesn't make any difference, I get the same error.