跳至主要內容

Flink时间概念和无序数据处理

xw大约 6 分钟大数据JAVAFlink

概念

Flink里面时间分类

  • 事件时间EventTime
    • 事件发生的时间
    • 事件时间是每个单独事件在其产生进程上发生的时间,这个时间通常在记录进入 Flink 之前记录在对象中
    • 在事件时间中,时间值 取决于数据产生记录的时间,而不是任何Flink机器上的
  • 进入时间 IngestionTime
    • 事件到进入Flink
  • 处理时间ProcessingTime
    • 事件被flink处理的时间
    • 指正在执行相应操作的机器的系统时间
    • 是最简单的时间概念,不需要流和机器之间的协调,它提供最佳性能和最低延迟
    • 但是在分布式和异步环境中,处理时间有不确定性,存在延迟或乱序问题

设置时间语义:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 

乱序时间处理

一般我们都是用EventTime事件时间进行处理统计数据,但数据由于网络问题延迟、乱序到达会导致窗口计算数据不准确。比如时间窗是 [12:01:01,12:01:10 ) ,但是有数据延迟到达,当 12:01:10 秒数据到达的时候,不立刻触发窗口计算,而是等一定的时间,等迟到的数据来后再关闭窗口进行计算。

  • Watermark 水位线介绍

    • 由flink的某个operator操作生成后,就在整个程序中随event数据流转
      • With Periodic Watermarks(周期生成,可以定义一个最大允许乱序的时间,用的很多)
      • With Punctuated Watermarks(标点水位线,根据数据流中某些特殊标记事件来生成,相对少)
    • 衡量数据是否乱序的时间,什么时候不用等早之前的数据
    • 是一个全局时间戳,不是某一个key下的值
    • 是一个特殊字段,单调递增的方式,主要是和数据本身的时间戳做比较
    • 用来确定什么时候不再等待更早的数据了,可以触发窗口进行计算,忍耐是有限度的,给迟到的数据一些机会
    • 注意
      • Watermark 设置太小会影响数据准确性,设置太大会影响数据的实时性,更加会加重Flink作业的负担
      • 需要经过测试,和业务相关联,得出一个较合适的值即可
  • 窗口触发计算的时机

    • watermark之前是按照窗口的关闭时间点计算的 [12:01:01,12:01:10 )
    • watermark之后,触发计算的时机
      • 窗口内有数据
      • Watermaker >= Window EndTime窗口结束时间
    • 触发计算后,其他窗口内数据再到达也被丢弃
    • Watermaker = 当前计算窗口最大的事件时间 - 允许乱序延迟的时间
  • 数据流中的事件是有序

  • 数据流中的事件是无序

案例

  • window大小为10s,窗口是W1 [23:12:00~23:12:10) 、 W2[23:12:10~23:12:20)
    • 下面是数据的event time
    • 数据A 23:12:07
    • 数据B 23:12:11
    • 数据C 23:12:08
    • 数据D 23:12:17
    • 数据E 23:12:09
  • 没加入watermark,由上到下进入flink
    • 数据B到了之后,W1就进行了窗口计算,数据只有A
    • 数据C 迟到了3秒,到了之后,由于W1已经计算了,所以就丢失了数据C
  • 加入watermark, 允许5秒延迟乱序,由上到下进入flink
    • 数据A到达
      • watermark = 12:07 - 5 = 12:02 < 12:10 ,所以不触发W1计算, A属于W1
    • 数据B到达
      • watermark = max{ 12:11, 12:07} - 5 = 12:06 < 12:10 ,所以不触发W1计算, B属于W2
    • 数据C到达
      • watermark = max{12:08, 12:11, 12:07} - 5 = 12:06 < 12:10 ,所以不触发W1计算, C属于W1
    • 数据D到达
      • watermark = max{12:17, 12:08, 12:11, 12:07} - 5 = 12:12 > 23:12:10 , 触发W1计算, D属于W2
    • 数据E到达
      • watermark = max{12:09, 12:17, 12:08, 12:11, 12:07} - 5 = 12:12 > 23:12:10 , 之前已触发W1计算, 所以丢失了E数据,
  • Watermaker 计算 = 当前计算窗口最大的事件时间 - 允许乱序延迟的时间
  • 什么时候触发W1窗口计算
    • Watermaker >= Window EndTime窗口结束时间
    • 当前计算窗口最大的事件时间 - 允许乱序延迟的时间 >= Window EndTime窗口结束时间

代码

  • Watermark 一次数据兜底
  • allowedLateness 二次数据兜底
  • OutputTag 最后数据兜底
public static void main(String[] args) throws Exception {

        //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //java,2022-11-11 09-10-10,15
        DataStream<String> ds = env.socketTextStream("127.0.0.1", 8888);

        SingleOutputStreamOperator<Tuple3<String, String, Integer>> flatMapDS = ds.flatMap(new FlatMapFunction<String, Tuple3<String, String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple3<String, String, Integer>> out) throws Exception {
                String[] arr = value.split(",");
                out.collect(Tuple3.of(arr[0], arr[1], Integer.parseInt(arr[2])));

            }
        });

        //指定watermark
        SingleOutputStreamOperator<Tuple3<String, String, Integer>> watermarkDS = flatMapDS.assignTimestampsAndWatermarks(WatermarkStrategy
                //指定允许乱序延迟的最大时间 3 秒
                .<Tuple3<String, String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                //指定POJO事件时间列,毫秒
                .withTimestampAssigner((event, timestamp) -> TimeUtil.strToDate(event.f1).getTime()));




        //最后的兜底数据
        OutputTag<Tuple3<String, String, Integer>> lateData = new OutputTag<Tuple3<String, String, Integer>>("lateDataOrder"){};

        //分组 开窗
        SingleOutputStreamOperator<String> sumDS = watermarkDS.keyBy(new KeySelector<Tuple3<String, String, Integer>, String>() {
            @Override
            public String getKey(Tuple3<String, String, Integer> value) throws Exception {
                return value.f0;
            }
        })
                //开窗
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))

                //允许1分钟延迟
                .allowedLateness(Time.minutes(1))
                .sideOutputLateData(lateData)

                //聚合, 方便调试拿到窗口全部数据,全窗口函数
                .apply(new WindowFunction<Tuple3<String, String, Integer>, String, String, TimeWindow>() {
                    @Override
                    public void apply(String key, TimeWindow window, Iterable<Tuple3<String, String, Integer>> input, Collector<String> out) throws Exception {

                        //准备list,存储窗口的事件时间
                        List<String> timeList = new ArrayList<>();
                        int total = 0;
                        for(Tuple3<String, String, Integer> order:input){
                            timeList.add(order.f1);
                            total = total+order.f2;
                        }

                        String outStr = String.format("分组key:%s,聚合值:%s,窗口开始结束:[%s~%s),窗口所有事件时间:%s", key,total, TimeUtil.format(window.getStart()),TimeUtil.format(window.getEnd()), timeList);
                        out.collect(outStr);

                    }
                });

        sumDS.print();

        //最后兜底处理
        sumDS.getSideOutput(lateData).print("late data order");

        env.execute("watermark job");

    }

总结

  • 第一层 窗口window 的作用是从DataStream数据流里指定范围获取数据。

  • 第二层 watermark的作用是防止数据出现乱序延迟,允许窗口等待延迟数据达到,再触发计算

  • 第三层 allowLateness 会让窗口关闭时间再延迟一段时间, 如果还有数据达到,会局部修复数据并主动更新窗口的数据输出

  • 第四层 sideOutPut侧输出流是最后兜底操作,在窗口已经彻底关闭后,所有过期延迟数据放到侧输出流,可以单独获取,存储到某个地方再批量更新之前的聚合的数据