Flink操练(二十六)之map求平均值
1、代码逻辑实现package day02; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.Random; /** * @program: Flink_learn * @description: 整数平均值 * @author: Mr.逗 * @create: 2021-09-17 10:51 **/ public class Avg { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource streamSource = env.addSource(new SourceFunction() { private boolean isRunning = true; private Random random = new Random(); @Override public void run(SourceContext ctx) throws Exception { while (isRunning) { ctx.collect( random.nextInt(10) ); Thread.sleep(1000); } } @Override public void cancel() { isRunning = false; } }); //转为整数+整数个数形式 DataStream> map = streamSource.map(v -> Tuple2.of(v, 1)).returns(Types.TUPLE(Types.INT,Types.INT)); SingleOutputStreamOperator> reduce = map.keyBy(v -> v.f0).reduce((Tuple2 v1, Tuple2 v2) -> { return Tuple2.of(v1.f0 + v2.f0, v1.f1 + v2.f1); }); //1、非匿名方式 reduce.map(new MapFunction, Tuple3>() { @Override public Tuple3 map(Tuple2 v) throws Exception { return Tuple3.of("数字"+v.f0+"的个数是:",v.f1+"其中平均值是:",(double)v.f0/v.f1); } }).map(v->v.f0+v.f1+v.f2).print(); //2、匿名方式 DataStream> map1 = reduce.map(v -> Tuple3.of("数字" + v.f0 + "的个数是:", v.f1 + "其中平均值是:", (double) v.f0 / v.f1)).returns(Types.TUPLE(Types.STRING, Types.STRING, Types.DOUBLE)); map1.map(v->v.f0+v.f1+v.f2).print(); String name = Avg.class.getName(); env.execute(name); } } 2、结果之展示4> 数字18的个数是:2其中平均值是:9.0 4> 数字18的个数是:2其中平均值是:9.0 2> 数字12的个数是:2其中平均值是:6.0 2> 数字12的个数是:2其中平均值是:6.0 4> 数字27的个数是:3其中平均值是:9.0 4> 数字27的个数是:3其中平均值是:9.0