跳至主要內容

Flink窗口及增量聚合

xw大约 4 分钟大数据JAVAFlink

概述

数据流处理离不开状态管理,比如窗口聚合统计、去重、排序等,状态是指一个Operator的运行的状态/历史值,是维护在内存中。一个算子的子任务接收输入流,获取对应的状态,计算新的结果,然后把结果更新到状态里面。

窗口

数据流是一直源源不断产生,业务需要聚合统计使用,比如每10秒统计过去5分钟的点击量、成交额等,Windows 就可以将无限的数据流拆分为有限大小的“桶 buckets”,然后程序可以对其窗口内的数据进行计算。

分类

  • time Window 时间窗口,即按照一定的时间规则作为窗口统计
    • time-tumbling-window 时间滚动窗口 (用的多)
    • time-sliding-window 时间滑动窗口 (用的多)
    • session WIndow 会话窗口,即一个会话内的数据进行统计,相对少用
  • count Window 数量窗口,即按照一定的数据量作为窗口统计,相对少用

滑动窗口 Sliding Windows

  • 窗口具有固定大小
  • 窗口数据有重叠
  • 例子:每10s统计一次最近1min内的订单数量

滚动窗口 Tumbling Windows

  • 窗口具有固定大小
  • 窗口数据不重叠
  • 例子:每10s统计一次最近10s内的订单数量

image-20210725145415262

增量聚合和全窗口函数

  • 增量聚合函数,aggregate(agg函数,WindowFunction(){ })

  • 窗口保存临时数据,每进入一个新数据,会与中间数据累加,生成新的中间数据,再保存到窗口中

  • 常见的增量聚合函数有 reduceFunction、aggregateFunction

  • min、max、sum 都是简单的聚合操作,不需要自定义规则

    AggregateFunction<IN, ACC, OUT>
    IN是输入类型,ACC是中间聚合状态类型,OUT是输出类型,是聚合统计当前窗口的数据
    
  • 使用案例:

    SingleOutputStreamOperator<VideoOrder> aggDS = ds.keyBy(new KeySelector<VideoOrder, String>() {
                @Override
                public String getKey(VideoOrder value) throws Exception {
                    return value.getTitle();
                }
            }).window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                    .aggregate(new AggregateFunction<VideoOrder, VideoOrder, VideoOrder>() {
                        //累加器初始化
                        @Override
                        public VideoOrder createAccumulator() {
                            VideoOrder videoOrder = new VideoOrder();
                            return videoOrder;
                        }
                        //聚合方式
                        @Override
                        public VideoOrder add(VideoOrder value, VideoOrder accumulator) {
                            accumulator.setMoney(value.getMoney()+accumulator.getMoney());
                            accumulator.setTitle(value.getTitle());
                            if(accumulator.getCreateTime()==null){
                                accumulator.setCreateTime(value.getCreateTime());
                            }
                            return accumulator;
                        }
                        //获取结果
                        @Override
                        public VideoOrder getResult(VideoOrder accumulator) {
                            return accumulator;
                        }
                        //合并内容,一般不用
                        @Override
                        public VideoOrder merge(VideoOrder a, VideoOrder b) {
                            VideoOrder videoOrder = new VideoOrder();
                            videoOrder.setMoney(a.getMoney()+b.getMoney());
                            return videoOrder;
                        }
                    });
            aggDS.print();
    
  • 全窗口函数

  • apply(new WindowFunction(){ })
    
    • 窗口先缓存该窗口所有元素,等窗口的全部数据收集起来后再触发条件计算
    • 常见的全窗口聚合函数 windowFunction(未来可能弃用)、processWindowFunction(可以获取到窗口上下文 更多信息,包括窗口信息)
    IN是输入类型,OUT是输出类型,KEY是分组类型,W是时间窗 
    WindowFunction<IN, OUT, KEY, W extends Window>
    
    
  • 代码案例:

    SingleOutputStreamOperator<VideoOrder> aggDS = ds.keyBy(new KeySelector<VideoOrder, String>() {
                @Override
                public String getKey(VideoOrder value) throws Exception {
                    return value.getTitle();
                }
            }).window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))
                    .apply( new WindowFunction<VideoOrder, VideoOrder, String, TimeWindow>() {
                        @Override
                        public void apply(String key, TimeWindow window, Iterable<VideoOrder> input, Collector<VideoOrder> out) throws Exception {
                            List<VideoOrder> list = IteratorUtils.toList(input.iterator());
                            int total =list.stream().collect(Collectors.summingInt(VideoOrder::getMoney)).intValue();
                            VideoOrder videoOrder = new VideoOrder();
                            videoOrder.setMoney(total);
                            videoOrder.setCreateTime(list.get(0).getCreateTime());
                            videoOrder.setTitle(list.get(0).getTitle());
                            out.collect(videoOrder);
                        }
             });
    
  • 全窗口函数apply

    apply(new WindowFunction(){ })
    
    • 窗口先缓存该窗口所有元素,等窗口的全部数据收集起来后再触发条件计算

    • 常见的全窗口聚合函数 windowFunction(未来可能弃用)、processWindowFunction(可以获取到窗口上下文 更多信息,包括窗口信息)

      IN是输入类型,OUT是输出类型,KEY是分组类型,W是时间窗 
      WindowFunction<IN, OUT, KEY, W extends Window>
      
    • 案例实战

      SingleOutputStreamOperator<VideoOrder> aggDS = ds.keyBy(new KeySelector<VideoOrder, String>() {
                  @Override
                  public String getKey(VideoOrder value) throws Exception {
                      return value.getTitle();
                  }
              }).window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))
                      .apply( new WindowFunction<VideoOrder, VideoOrder, String, TimeWindow>() {
                          @Override
                          public void apply(String key, TimeWindow window, Iterable<VideoOrder> input, Collector<VideoOrder> out) throws Exception {
      
                              List<VideoOrder> list = IteratorUtils.toList(input.iterator());
      
                              int total =list.stream().collect(Collectors.summingInt(VideoOrder::getMoney)).intValue();
                              VideoOrder videoOrder = new VideoOrder();
                              videoOrder.setMoney(total);
                              videoOrder.setCreateTime(list.get(0).getCreateTime());
                              videoOrder.setTitle(list.get(0).getTitle());
                              out.collect(videoOrder);
                          }
               });
      
  • 全窗口函数process

    process(new ProcessWindowFunction(){})
    
    • 窗口先缓存该窗口所有元素,等窗口的全部数据收集起来后再触发条件计算

    • 常见的全窗口聚合函数 windowFunction(未来可能弃用)、processWindowFunction(可以获取到窗口上下文 更多信息,包括窗口信息)

      IN是输入类型,OUT是输出类型,KEY是分组类型,W是时间窗 
      ProcessWindowFunction<IN, OUT, KEY, W extends Window>
      
    • 案例实战

      SingleOutputStreamOperator<VideoOrder> aggDS = ds.keyBy(new KeySelector<VideoOrder, String>() {
                  @Override
                  public String getKey(VideoOrder value) throws Exception {
                      return value.getTitle();
                  }
              }).window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))
                      .process(new ProcessWindowFunction<VideoOrder, VideoOrder, String, TimeWindow>() {
                          @Override
                          public void process(String key, Context context, Iterable<VideoOrder> elements, Collector<VideoOrder> out) throws Exception {
                              List<VideoOrder> list = IteratorUtils.toList(elements.iterator());
      
                              int total =list.stream().collect(Collectors.summingInt(VideoOrder::getMoney)).intValue();
                              VideoOrder videoOrder = new VideoOrder();
                              videoOrder.setMoney(total);
                              videoOrder.setTitle(list.get(0).getTitle());
                              videoOrder.setCreateTime(list.get(0).getCreateTime());
      
                              out.collect(videoOrder);
                          }
                      });
      

      process函数比apply强。