apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
8.04k stars 1.82k forks source link

[Bug] [Connectors-v2] "NullPointerException Due to Incomplete Metadata Extraction in SeaTunnel Kafka #8053

Open Han-lai opened 3 days ago

Han-lai commented 3 days ago

Search before asking

What happened

I am encountering a NullPointerException issue while working with the SeaTunnel Kafka connector. I believe this issue might be related to the incomplete implementation of metadata extraction, which was initially proposed in Issue #2755 by hailin0.

In the proposal, there were several features outlined for improvement, such as:

Although PR #3501 has already addressed some of these aspects by supporting the extraction of metadata like the message key, timestamp, and headers, the full implementation, particularly the extraction of topics, partitions, and handling all metadata consistently, has not been completed.

Given that the full extraction of metadata (including topic and partition information) has not been fully implemented, I suspect that the NullPointerException might be a result of missing metadata in the processing pipeline, especially when the connector attempts to access the topic and partition details.

Could the team prioritize the completion of these features, particularly the extraction of topic, partition, and other metadata from SeaTunnelRow, and ensure that metadata is consistently handled when reading and writing Kafka messages? This would not only help resolve potential issues like the NullPointerException but also enhance the flexibility and efficiency of the Kafka connector for developers.

SeaTunnel Version

2.3.7

SeaTunnel Config

none

Running Command

"/bin/sh", "-c", "/opt/seatunnel/bin/seatunnel-cluster.sh"

Error Exception

java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.lang.NullPointerException
    at org.apache.seatunnel.engine.server.operation.GetJobMetricsOperation.run(GetJobMetricsOperation.java:87) ~[seatunnel-starter.jar:2.3.7]
    at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) ~[seatunnel-starter.jar:2.3.7]
    at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) ~[seatunnel-starter.jar:2.3.7]
    ... (省略其他堆疊追蹤)
Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_342]
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) ~[?:1.8.0_342]
    at org.apache.seatunnel.engine.server.operation.GetJobMetricsOperation.run(GetJobMetricsOperation.java:85) ~[seatunnel-starter.jar:2.3.7]
    ... (省略其他堆疊追蹤)

Zeta or Flink or Spark Version

No response

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

liunaijie commented 3 days ago

I got similar issue today. My version is 2.3.5 My error stack is

java.util.concurrent.CompletionException: java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.lang.NullPointerException

    at com.hazelcast.spi.impl.AbstractInvocationFuture.returnOrThrowWithJoinConventions(AbstractInvocationFuture.java:819) ~[seatunnel-starter.jar:2.3.5]

    at com.hazelcast.spi.impl.AbstractInvocationFuture.resolveAndThrowWithJoinConvention(AbstractInvocationFuture.java:835) ~[seatunnel-starter.jar:2.3.5]

    at com.hazelcast.spi.impl.AbstractInvocationFuture.join(AbstractInvocationFuture.java:553) ~[seatunnel-starter.jar:2.3.5]

    at org.apache.seatunnel.engine.server.rest.RestHttpGetCommandProcessor.convertToJson(RestHttpGetCommandProcessor.java:392) ~[seatunnel-starter.jar:2.3.5]

    at org.apache.seatunnel.engine.server.rest.RestHttpGetCommandProcessor.lambda$handleRunningJobsInfo$2(RestHttpGetCommandProcessor.java:173) ~[seatunnel-starter.jar:2.3.5]

    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[?:1.8.0_342]

It's called from restapi, /hazelcast/rest/maps/running-jobs.

And it looks NPE is generate from here https://github.com/apache/seatunnel/blob/dev/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java#L186

If the job is not running and also can't find in history service. it will return null so when call jobMetrics..getMetrics() it will be null..getMetrics() then get NPE.