streamnative / pulsar-flink

Elastic data processing with Apache Pulsar and Apache Flink
Apache License 2.0
278 stars 119 forks source link

[BUG] Question about Flink Event-time support. #615

Open moweonlee opened 2 years ago

moweonlee commented 2 years ago

Describe the bug

I think I have found a problem when I tries to execute Flink's job with both pulsar-connector especially in flink's TimeCharacteristic.EventTime mode.

Why I think so is that I have tried same code with Kafka source It works well with the Event time window.

By the way with Pulsar connector, Flink doesn't seems to recognize its TimeCharacteristic is EventTime.

And Flink's dashboard says that it is not running in EventTime Window mode like below.

image

I have checked that timestamps that I'm creating with source data is normal like this.

[10.172.27.108]:[2022-09-11T15:57:26.540Z]:timstamp : [1662911844581]
[10.172.27.108]:[2022-09-11T15:57:26.540Z]:timstamp : [1662911844581]
[10.172.27.108]:[2022-09-11T15:57:26.540Z]:timstamp : [1662911845321]
[10.172.27.108]:[2022-09-11T15:57:26.540Z]:timstamp : [1662911845321]
[10.172.27.108]:[2022-09-11T15:57:26.540Z]:timstamp : [1662911845307]
[10.172.27.108]:[2022-09-11T15:57:26.540Z]:timstamp : [1662911845307]
[10.172.27.108]:[2022-09-11T15:57:26.540Z]:timstamp : [1662911844954]
[10.172.27.108]:[2022-09-11T15:57:26.540Z]:timstamp : [1662911844954]

Could you please let me know whether if pulsar connector support watermarking code like mine ?

Here is the environment that I have tested so far.

public class StreamRealtime {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
        env.getConfig().setGlobalJobParameters(parameterTool);

        // Only in Eventtime 
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        String adminServiceUrl     = "http://10.96.77.102:31143";
        String brokerServiceUrl   = "pulsar://10.96.77.102:32543";
        String inputTopic        = "persistent://nds/nds/lcs-refined-topic";

        int parallelism = 30;
        Properties properties = new Properties();
        properties.setProperty("topic", inputTopic);

        FlinkPulsarSource<String> source = new FlinkPulsarSource<>(
            brokerServiceUrl,
            adminServiceUrl,
            new SimpleStringSchema(),
            properties
        ).setStartFromLatest();

        DataStream<Tuple3<String, String, Long>> stream = env
                .setParallelism( parallelism )
                .addSource(source)
                .setParallelism( parallelism )
                .flatMap( LCSSTMSUrlMapperRefined.create() )
                .setParallelism( parallelism );

        //Event Timestamp 
        stream
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple3<String, String, Long>>forBoundedOutOfOrderness(Duration.ofMillis(200))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
                                @Override
                                public long extractTimestamp(Tuple3<String, String, Long> element, long recordTimestamp) {
                                        Debug.sendUDP("timstamp : [" + element.f2 +"]\n");
                                        return element.f2;
                                }
                        })
                )
                .setParallelism ( parallelism )
                .keyBy(0)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .apply( LCSSTMSProcessor.create(0))
                .setParallelism ( parallelism );

        env.execute("Pulsar NDS Streaming");
    }
}

To Reproduce Steps to reproduce the behavior:

  1. Developed Flink eventtime aggregation code with pulsar-connector.
  2. Checked whether if message have valid timestamps. --> ( okay, It works)
  3. Checked whether if my code works in Flink's processing-time window and pulsar-connector after ... ( -->okay, It works)
    • Removals of watermarking assigner and change to EventProcessing
    • Changed Configuration from TimeCharacteristic.EventTime -> TimeCharacteristic.ProcessingTime

Expected behavior

Screenshots

Posted with description.

Additional context