大佬们,这个flink执行结果中b为啥是3?
flink吧
全部回复
仅看楼主
level 1
val data: DataStream[(Int, String, String)] = env.fromElements(
(1, "a", "2024-10-01 12:00:01"),
(2, "a", "2024-10-01 12:00:02"),
(3, "b", "2024-10-01 12:00:02"),
(4, "c", "2024-10-01 12:00:03"),
(5, "c", "2024-10-01 12:00:05"),
(6, "b", "2024-10-01 12:00:02"),
(7, "d", "2024-10-01 12:00:08"), // 推动水印到 12:00:05
(8, "b", "2024-10-01 12:00:04") // 迟到事件,将被丢弃
)
// 设置2秒的延迟
val wm: WatermarkStrategy[(Int, String, String)] = WatermarkStrategy.forBoundedOutOfOrderness[(Int, String, String)](Duration.ofSeconds(2))
.withTimestampAssigner(new SerializableTimestampAssigner[(Int, String, String)] {
override def extractTimestamp(element: (Int, String, String), recordTimestamp: Long): Long = {
val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
// 指定第三个字段为事件时间
format.parse(element._3).getTime
}
})
val rlts: DataStream[(String, Int)] = data.assignTimestampsAndWatermarks(wm)
.map(m => (m._2, 1))
.keyBy(k => k._1)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum(1)
rlts.print()
执行上面的flink代码打印结果为什么是:
(a,2)
(b,3)
(c,1)
(c,1)
(d,1)
而不是:
(a,2)
(b,2)
(c,1)
(c,1)
(d,1)?
(8, "b", "2024-10-01 12:00:04")这一条迟到数据不应该被丢弃吗?
2025年03月13日 05点03分 1
1