xuwei517 / FlinkExample

Flink代码实例
120 stars 85 forks source link

kafka + flink + watermarks生成出错 #1

Open quguiliang opened 2 months ago

quguiliang commented 2 months ago

老师您好: 我在使用flink读取kafka数据后,使用watermarks进行乱序处理时,IDEA总是提示assignTimestampsAndWatermarks()有多个重载,代码如下: object Test { def main(args: Array[String]): Unit = { // 创建流处理执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

// 配置 Kafka 消费者
val properties = System.getProperties
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")

// 创建 Kafka 数据源
val kafkaSource = new FlinkKafkaConsumer[String]("test_topic", new SimpleStringSchema(), properties)

// 添加 Watermark 生成器
val watermarkGenerator = new BoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(10)) {
  override def extractTimestamp(element: String): Long = {
    // 解析消息中的时间戳,这里假设消息格式为 "message,timestamp"
    val parts = element.split(",")
    parts(1).toLong
  }
}

// 将 FlinkKafkaConsumer 转换为 DataStream
val stream = env.addSource(kafkaSource)
  .map(new MapFunction[String, (String, Long)] {
    override def map(value: String): (String, Long) = {
      val parts = value.split(",")
      (parts(0), parts(1).toLong)
    }
  })
  .assignTimestampsAndWatermarks(watermarkGenerator)

// 对数据进行处理
stream.print()

// 启动 Flink 作业
env.execute("Flink Kafka Example")

} }

使用的maven配置如下:

org.apache.flink flink-scala_2.11 1.13.2
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_2.11</artifactId>
        <version>1.13.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_2.11</artifactId>
        <version>1.13.2</version>
    </dependency>

image 错误信息如上图,这个问题已经困扰好多天了,期待老师的答复~~

quguiliang commented 2 months ago

flink如果从socket中读取后,使用watermarks进行乱序处理时,却不会报错。只有从kafka这个source中获取流后,就会报上面的错误。 从kafka及 socket的代码如下所示: image