跳至主要內容

Flink窗口及增量聚合

xw大约 5 分钟大数据JAVAFlink

概述

数据流是一直源源不断产生,业务需要聚合统计使用,比如每10秒统计过去5分钟的点击量、成交额等,Windows 就可以将无限的数据流拆分为有限大小的“桶 buckets”,然后程序可以对其窗口内的数据进行计算。

分类

  • time Window 时间窗口,即按照一定的时间规则作为窗口统计
    • time-tumbling-window 时间滚动窗口
    • time-sliding-window 时间滑动窗口
    • session WIndow 会话窗口
  • count Window 数量窗口

介绍

  • 滑动窗口,窗口具有固定大小,窗口数据有重叠

  • 滚动窗口,窗口具有固定大小,窗口数据不重叠

  • tumbling-window:滚动窗口: size=slide,如:每隔10s统计最近10s的数据
  • sliding-window:滑动窗口: size>slide,如:每隔5s统计最近10s的数据

API

  • 有keyBy 用 window() api,没keyBy 用 windowAll() api ,并行度低

  • 窗口分配器 Window Assigners

    • 定义了如何将元素分配给窗口,负责将每条数据分发到正确的 window窗口上
    • window() 的参数是一个 WindowAssigner,flink本身提供了Tumbling、Sliding 等Assigner
  • 窗口触发器 trigger

    • 用来控制一个窗口是否需要被触发
    • 每个 窗口分配器WindowAssigner 都有一个默认触发器,也支持自定义触发器
  • 窗口 window function ,对窗口内的数据做啥?

    • 定义了要对窗口中收集的数据做的计算操作

    • 增量聚合函数

      aggregate(agg函数,WindowFunction(){  })
      
      • 窗口保存临时数据,每进入一个新数据,会与中间数据累加,生成新的中间数据,再保存到窗口中

      • 常见的增量聚合函数有 reduceFunction、aggregateFunction

      • min、max、sum 都是简单的聚合操作,不需要自定义规则

        AggregateFunction<IN, ACC, OUT>
        IN是输入类型,ACC是中间聚合状态类型,OUT是输出类型,是聚合统计当前窗口的数据
        
    • 全窗口函数

      apply(new processWindowFunction(){ })
      
      • 窗口先缓存该窗口所有元素,等窗口的全部数据收集起来后再触发条件计算
      • 常见的全窗口聚合函数 windowFunction(未来可能弃用)、processWindowFunction(可以获取到窗口上下文 更多信息)
      IN是输入类型,OUT是输出类型,KEY是分组类型,W是时间窗 
      WindowFunction<IN, OUT, KEY, W extends Window>
      
    • 如果想处理每个元素更底层的API的时候用

      //对数据进行解析 ,process对每个元素进行处理,相当于 map+flatMap+filter
      process(new KeyedProcessFunction(){processElement、onTimer})
      

使用

  • 滚动窗口

    public static void main(String[] args) throws Exception {
    
            //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
            env.setParallelism(1);
    
            //数据源 source
            DataStream<VideoOrder> ds = env.addSource(new VideoOrderSourceV2());
    
            KeyedStream<VideoOrder, String> keyByDS = ds.keyBy(new KeySelector<VideoOrder, String>() {
                @Override
                public String getKey(VideoOrder value) throws Exception {
                    return value.getTitle();
                }
            });
            //每5秒钟滚动一次
            DataStream<VideoOrder> sumDS = keyByDS.window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum("money");
    
            sumDS.print();
    
            //DataStream需要调用execute,可以取个名称
            env.execute("tumbling window job");
        }
    
  • 滑动窗口

    public static void main(String[] args) throws Exception {
    
            //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
            env.setParallelism(1);
    
            //数据源 source
            DataStream<VideoOrder> ds = env.addSource(new VideoOrderSourceV2());
    
            KeyedStream<VideoOrder, String> keyByDS = ds.keyBy(new KeySelector<VideoOrder, String>() {
                @Override
                public String getKey(VideoOrder value) throws Exception {
                    return value.getTitle();
                }
            });
            
            //每5秒统计一次最近20秒内的总和
            DataStream<VideoOrder> sumDS = keyByDS.window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(5))).sum("money");
    
            sumDS.print();
    
            //DataStream需要调用execute,可以取个名称
            env.execute("sliding window job");
        }
    
  • 数量窗口

    public static void main(String[] args) throws Exception {
    
            //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
            env.setParallelism(1);
    
            //数据源 source
            DataStream<VideoOrder> ds = env.addSource(new VideoOrderSourceV2());
    
            KeyedStream<VideoOrder, String> keyByDS = ds.keyBy(new KeySelector<VideoOrder, String>() {
                @Override
                public String getKey(VideoOrder value) throws Exception {
                    return value.getTitle();
                }
            });
    
            //分组后的组内数据超过5个则触发
            //DataStream<VideoOrder> sumDS = keyByDS.countWindow(5).sum("money");
    
            //分组后的组内数据超过3个则触发统计过去的5个数据
            DataStream<VideoOrder> sumDS = keyByDS.countWindow(5,3).sum("money");
    
            sumDS.print();
    
            //DataStream需要调用execute,可以取个名称
            env.execute("sliding window job");
        }
    
  • 增量聚合函数

        public static void main(String[] args) throws Exception {
    
            //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
            env.setParallelism(1);
    
            //数据源 source
            DataStream<VideoOrder> ds = env.addSource(new VideoOrderSourceV2());
    
            KeyedStream<VideoOrder, String> keyByDS = ds.keyBy(new KeySelector<VideoOrder, String>() {
                @Override
                public String getKey(VideoOrder value) throws Exception {
                    return value.getTitle();
                }
            });
    
            SingleOutputStreamOperator<VideoOrder> aggregate = keyByDS.window(TumblingProcessingTimeWindows.of(Time.seconds(5))).aggregate(new AggregateFunction<VideoOrder, VideoOrder, VideoOrder>() {
                //初始化累加器
                @Override
                public VideoOrder createAccumulator() {
                    VideoOrder videoOrder = new VideoOrder();
                    return videoOrder;
                }
    
                //聚合操作
                @Override
                public VideoOrder add(VideoOrder videoOrder, VideoOrder videoOrder2) {
                    videoOrder2.setMoney(videoOrder.getMoney() + videoOrder2.getMoney());
                    videoOrder2.setTitle(videoOrder.getTitle());
                    if (videoOrder2.getCreateTime() == null) {
                        videoOrder2.setCreateTime(videoOrder.getCreateTime());
                    }
                    return videoOrder2;
                }
    
                // 获取结果
                @Override
                public VideoOrder getResult(VideoOrder videoOrder) {
                    return videoOrder;
                }
    
                //合并
                @Override
                public VideoOrder merge(VideoOrder videoOrder, VideoOrder acc1) {
                    return null;
                }
            });
    
            aggregate.print();
    
            //DataStream需要调用execute,可以取个名称
            env.execute("tumbling window job");
        }
    
  • 全窗口函数

    public static void main(String[] args) throws Exception {
    
            //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
            env.setParallelism(1);
    
            //数据源 source
            DataStream<VideoOrder> ds = env.addSource(new VideoOrderSourceV2());
    
            KeyedStream<VideoOrder, String> keyByDS = ds.keyBy(new KeySelector<VideoOrder, String>() {
                @Override
                public String getKey(VideoOrder value) throws Exception {
                    return value.getTitle();
                }
            });
    
            SingleOutputStreamOperator<VideoOrder> aggDS = ds.keyBy(new KeySelector<VideoOrder, String>() {
                        @Override
                        public String getKey(VideoOrder value) throws Exception {
                            return value.getTitle();
                        }
                    }).window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))
                    .process(new ProcessWindowFunction<VideoOrder, VideoOrder, String, TimeWindow>() {
                        @Override
                        public void process(String key, Context context, Iterable<VideoOrder> elements, Collector<VideoOrder> out) throws Exception {
                            List<VideoOrder> list = IteratorUtils.toList(elements.iterator());
    
                            int total =list.stream().collect(Collectors.summingInt(VideoOrder::getMoney)).intValue();
                            VideoOrder videoOrder = new VideoOrder();
                            videoOrder.setMoney(total);
                            videoOrder.setTitle(list.get(0).getTitle());
                            videoOrder.setCreateTime(list.get(0).getCreateTime());
                            out.collect(videoOrder);
                        }
                    });
            aggDS.print();
            //DataStream需要调用execute,可以取个名称
            env.execute("tumbling window job");
        }