confluentinc / cp-demo

Confluent Platform Demo including Apache Kafka, ksqlDB, Control Center, Schema Registry, Security, Schema Linking, and Cluster Linking
Apache License 2.0
38 stars 322 forks source link

Wikipedia/Wikimedia stream "canary" records cause NullPointerException due to missing "bot" field #451

Open javabrett opened 10 months ago

javabrett commented 10 months ago

Description

A while back, Wikimedia stream (recentchange at least) added some "canary" records to help with heartbeat/keepalive. Helper libraries were updated to automatically filter these out. They can be identifed by meta.domain == 'canary'. See https://phabricator.wikimedia.org/T266798

These canary records are thin - specifically they don't include a data bot boolean field which is present on all other records. This is resulting in a NullPointerException when attempting to filter on the bot field as a boolean:

[wikipedia-activity-monitor-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.TaskExecutor - stream-thread [wikipedia-activity-monitor-StreamThread-1] Failed to
process stream task 0_1 due to the following error:
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_1, processor=KSTREAM-SOURCE-0000000000, topic=wikipedia.parsed, partition=1, offset=1139, stacktrace=java.lang.NullPointerException
        at io.confluent.demos.common.wiki.WikipediaActivityMonitor.lambda$createMonitorStream$1(WikipediaActivityMonitor.java:119)
        at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:159)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
        at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:48)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
        at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
        at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:793)
        at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
        at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:793)
        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:724)
        at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100)
        at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81)
        at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1182)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:768)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:588)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:550)

        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:750)
        at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100)
        at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81)
        at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1182)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:768)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:588)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:550)
Caused by: java.lang.NullPointerException
        at io.confluent.demos.common.wiki.WikipediaActivityMonitor.lambda$createMonitorStream$1(WikipediaActivityMonitor.java:119)
        at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:159)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
        at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:48)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
        at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
        at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:793)
        at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
        at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:793)
        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:724)
        ... 6 more
[wikipedia-activity-monitor-StreamThread-1] ERROR org.apache.kafka.streams.KafkaStreams - stream-client [wikipedia-activity-monitor] Encountered the following exception during pr
ocessing and the registered exception handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down now.

Here's a sample canary record as parsed-in by our connector:

{
  "BOT": null,
  "COMMENT": null,
  "ID": null,
  "LENGTH": null,
  "LOG_ACTION": null,
  "LOG_ACTION_COMMENT": null,
  "LOG_ID": null,
  "LOG_TYPE": null,
  "META": {
    "DOMAIN": "canary",
    "DT": 1704674745330,
    "ID": "343e51c8-035e-44e1-9f2f-d755afcd0cf1",
    "REQUEST_ID": "374ea0fa-3ac0-49e4-a656-9454a5d59c06",
    "STREAM": "mediawiki.recentchange",
    "URI": null
  },
  "MINOR": null,
  "NAMESPACE": null,
  "PARSEDCOMMENT": null,
  "PATROLLED": null,
  "REVISION": null,
  "SERVER_NAME": null,
  "SERVER_SCRIPT_PATH": null,
  "SERVER_URL": null,
  "TIMESTAMP": null,
  "TITLE": null,
  "TYPE": null,
  "USER": null,
  "WIKI": null
}

You can also observe these canary records with:

curl https://stream.wikimedia.org/v2/stream/recentchange | cat | grep canary
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 34.0M    0 34.0M    0     0  50229      0 --:--:--  0:11:49 --:--:-- 63765data: {"$schema":"/mediawiki/recentchange/1.0.1","meta":{"stream":"mediawiki.recentchange","domain":"canary","id":"a691352b-9c23-4001-ab62-b373aa521904","dt":"2024-01-07T22:15:41.978Z","request_id":"7db98c7f-fbbf-4436-a912-ff0a5e2eaa0e","topic":"codfw.mediawiki.recentchange","partition":0,"offset":910279214}}
100 34.0M    0 34.0M    0     0  50246      0 --:--:--  0:11:51 --:--:-- 63137data: {"$schema":"/mediawiki/recentchange/1.0.1","meta":{"stream":"mediawiki.recentchange","domain":"canary","id":"d27abe41-c87e-4949-a9a1-14408d5323eb","dt":"2024-01-07T22:15:41.912Z","request_id":"5dd83696-ae8c-4da4-b216-ff4095b0206d","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":4957728053}}
100 44.7M    0 44.7M    0     0  52054      0 --:--:--  0:15:01 --:--:-- 70626