OpenLineage / OpenLineage

An Open Standard for lineage metadata collection
http://openlineage.io
Apache License 2.0
1.71k stars 299 forks source link

[Integration][Flink] RunEvent created with null `eventTime`, `run` and `job` #1820

Closed majunting closed 1 year ago

majunting commented 1 year ago

Hi, thank you for your efforts developing Flink integration with OpenLineage.

I'm currently attempting to use OpenLineage to collect Flink run info, and I referred to the examples provided to add jobListener and register with StreamExecutionEnvironment. However, when I run the Flink job, I received failed to emit openlineage event error. Marquez received the event message and returned error code 422. This error occured when OpenLineageFlinkJobListener executes onJobSubmitted. The error stacktrace is attached:

04/05 14:37:16.969 [pool-4-thread-1] ERROR io.openlineage.flink.client.EventEmitter - Failed to emit OpenLineage event: io.openlineage.client.OpenLineageClientException: code: 422, response: {"errors":["job must not be null","eventTime must not be null","run must not be null"]} at io.openlineage.client.transports.HttpTransport.throwOnHttpError(HttpTransport.java:145) at io.openlineage.client.transports.HttpTransport.emit(HttpTransport.java:119) at io.openlineage.client.OpenLineageClient.emit(OpenLineageClient.java:46) at io.openlineage.flink.client.EventEmitter.emit(EventEmitter.java:37) at io.openlineage.flink.visitor.lifecycle.FlinkExecutionContext.onJobSubmitted(FlinkExecutionContext.java:59) at io.openlineage.flink.OpenLineageFlinkJobListener.start(OpenLineageFlinkJobListener.java:82) at io.openlineage.flink.OpenLineageFlinkJobListener.onJobSubmitted(OpenLineageFlinkJobListener.java:64) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.lambda$executeAsync$12(StreamExecutionEnvironment.java:2099) at java.base/java.util.ArrayList.forEach(ArrayList.java:1541) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2099) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1983) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1951) at aero.airlab.flink.systrk.irstoopenatms.IntegrationTest$EnvExecutor.execute$lambda-0(IntegrationTest.kt:130) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829)

After running in debug mode, I found out that in class FlinkExecutionContext, function buildEventForEventType is called for job start and checkpoint. However, the return statement for this function is:

return openLineageContext
    .getOpenLineage()
    .newRunEventBuilder()
    .inputs(inputDatasets)
    .outputs(outputDatasets)
    .eventType(eventType);

Since only inputs, outputs and eventType are provided, the other 3 parameters are null, and the above error occurred.

It would be great if you could give me some advice on this issue. I look forward to hearing from you! Thank you

boring-cyborg[bot] commented 1 year ago

Thanks for creating your first OpenLineage issue! Your feedback is valuable and improves the project. If you haven't already, please be sure to follow the issue template!

pawel-big-lebowski commented 1 year ago

Hi @majunting, you're totally right!

Our integration tests verify if Openlineage events contain input / output dataset information, but we do not verify if they're valid OL events 🤦

And they're not as they're missing fields you got within an error:

{"errors":["job must not be null","eventTime must not be null","run must not be null"]}

Thanks for your debugging. I will fix this next days.

merobi-hub commented 1 year ago

Hi @majunting , thanks for opening an issue! Please join us on Slack if you haven't already.