Flink,水印的几种简单写法

论坛 期权论坛 脚本     
已经匿名di用户   2022-7-2 22:16   6004   0
package com.coder.flink.restart


import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time

object WaterMarkDemo {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val input = List(("a", 2L, 1), ("b", 2L, 1), ("c", 3L, 1))
    env.addSource(new SourceFunction[(String, Long, Int)] {
      override def run(ctx: SourceFunction.SourceContext[(String, Long, Int)]): Unit = {
        input.foreach(value => {
          //固定套路 调用 collectWithTimestamp 指定哪个字段是水印字段
          ctx.collectWithTimestamp(value, value._2)
          ctx.emitWatermark(new Watermark(value._2 - 1)) //设置最大延迟为1
        })
        //设置默认水印
        ctx.emitWatermark(new Watermark(Long.MaxValue))
      }

      override def cancel(): Unit = {}
    })

    val data: DataStream[(String, Long, Int)] = env.fromCollection(List(("a", 2L, 1), ("b", 2L, 1), ("c", 3L, 1)))
    data.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.seconds(10)) {

      //定义抽取EventTIme 时间戳逻辑
      override def extractTimestamp(element: (String, Long, Int)): Long = {
        element._2
      }
    })

    //使用系统默认的 升序 分配时间信息和水印,我们只需指定一个字段就可以了
    val watermark: DataStream[(String, Long, Int)] = data.assignAscendingTimestamps(t => t._2)
   val  rs =  watermark.keyBy(1).timeWindow(Time.seconds(1L)).sum(2)
    rs.print()
//    data.print()
    env.execute()

  }
}
分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:81
帖子:4969
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP