package com.coder.flink.restart
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
object WaterMarkDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val input = List(("a", 2L, 1), ("b", 2L, 1), ("c", 3L, 1))
env.addSource(new SourceFunction[(String, Long, Int)] {
override def run(ctx: SourceFunction.SourceContext[(String, Long, Int)]): Unit = {
input.foreach(value => {
//固定套路 调用 collectWithTimestamp 指定哪个字段是水印字段
ctx.collectWithTimestamp(value, value._2)
ctx.emitWatermark(new Watermark(value._2 - 1)) //设置最大延迟为1
})
//设置默认水印
ctx.emitWatermark(new Watermark(Long.MaxValue))
}
override def cancel(): Unit = {}
})
val data: DataStream[(String, Long, Int)] = env.fromCollection(List(("a", 2L, 1), ("b", 2L, 1), ("c", 3L, 1)))
data.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.seconds(10)) {
//定义抽取EventTIme 时间戳逻辑
override def extractTimestamp(element: (String, Long, Int)): Long = {
element._2
}
})
//使用系统默认的 升序 分配时间信息和水印,我们只需指定一个字段就可以了
val watermark: DataStream[(String, Long, Int)] = data.assignAscendingTimestamps(t => t._2)
val rs = watermark.keyBy(1).timeWindow(Time.seconds(1L)).sum(2)
rs.print()
// data.print()
env.execute()
}
}
|