利用Flink对数据进行分流,分阶处理

Storm迁移flink主要问题:

  1. Storm通过自定义的Bolt类实现自己的业务逻辑,如何在flink中实现

通过flink的ProcessFuction类实现,可以通过继承该类,在processElement方法中实现自己的业务逻辑。

  1. Storm按照业务类型分发数据处理的逻辑,如何在flink中实现

通过flink的旁路输出特性实现,对原始的数据流按照某些分类标准分类,输出到不同的子数据流中处理。

总体处理流程:

  1. Flink从Kafka中读取数据,作为初始数据流initDataStream;

  2. 对初始数据流进行分类处理,将子数据输出到数据流dataStream1、dataStream2、......、dataStreamn中;

  3. 按照自己的业务逻辑,对子数据流dataStream1_1、dataStream2_1、......、dataStreamn_1处理,得到dataStream1_2、dataStream2_2、......、dataStreamn_2,经过多次处理,最终得到dataStream1_x、dataStream2_y、......、dataStreamn_z

  4. 最后利用SinkFuction进行落地处理

例:利用flink对北京、上海两城市民信息进行过滤,过滤掉工资小于1000和大于20000的数据,然后按姓名_城市_收入的格式输出

输入:个人信息{姓名、年龄、性别、城市、收入}

输出:姓名_城市_收入,如:a_b_100

Person类:

public class Person {

    private String name;
    private Integer age;
    private Sex sex;
    private Integer salary;
    private Country country;
    //getter、setter...
}

enum Sex {
    Male,
    Female
}

enum Country {
    Beijing,
    Shanghai
}

生产者程序:

public class FlinkProducer {

    //kafka配置文件
    private static final Properties properties;

    static {
        //将kafka地址放入配置文件
        properties = new Properties();
        properties.put("bootstrap.servers", "10.225.173.107:9092,10.225.173.108:9092,10.225.173.109:9092");
    }

    public static void main(String[] args) throws Exception {
        //创建flink环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //连接kafka
        FlinkKafkaProducer<String> flinkKafkaProducer = new FlinkKafkaProducer<>("flink", new SimpleStringSchema(), properties);
        //初始化数据源
        String[] initData = getData();
        DataStreamSource<String> source = env.fromElements(initData);
        //发送数据
        source.addSink(flinkKafkaProducer);
        //执行flink程序
        env.execute();
    }

    //模拟数据源
    private static String[] getData() {
        String[] data = new String[100];
        for (int i = 0; i < 100; i++) {
            Person person = new Person();
            person.setName("name");
            person.setAge(18);
            //设置性别
            int sex = (int) (Math.random() * 100);
            if (sex % 2 == 0) {
                person.setSex(Sex.Female);
            } else {
                person.setSex(Sex.Male);
            }
            //设置城市
            int country = (int) (Math.random() * 100);
            if (country % 2 == 0) {
                person.setCountry(Country.Beijing);
            } else {
                person.setCountry(Country.Shanghai);
            }
            //设置收入
            person.setSalary((int) (Math.random() * 50000));
            data[i] = JsonUtils.serialize(person);
        }
        return data;
    }

}

消费者程序:

public class FlinkConsumer {

    //kafka配置文件
    private static final Properties properties;

    //男性标签
    private static final OutputTag<String> MALE = new OutputTag<>("male", Types.STRING);

    //女性标签
    private static final OutputTag<String> FEMALE = new OutputTag<>("female", Types.STRING);

    static {
        //将kafka地址放入配置文件
        properties = new Properties();
        properties.put("bootstrap.servers", "10.225.173.107:9092,10.225.173.108:9092,10.225.173.109:9092");
    }

    public static void main(String[] args) throws Exception {
        //1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.flink连接kafka并读取数据源
        FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>("flink", new SimpleStringSchema(), properties);
        //3.获取初始数据源
        DataStreamSource<String> initDataStream = env.addSource(flinkKafkaConsumer);
        //4.对数据源进行分类处理
        SingleOutputStreamOperator<String> process = initDataStream.process(new SexSplitBolt());
        //5.分别获取仅仅包含男性、女性数据的数据流
        DataStream<String> dataStream_male = process.getSideOutput(MALE);
        DataStream<String> dataStream_female = process.getSideOutput(FEMALE);
        //6.过滤掉工资小于1000和大于20000的数据
        SingleOutputStreamOperator<String> dataStream_male_1 = dataStream_male.process(new SalarySplitBolt());
        SingleOutputStreamOperator<String> dataStream_female_1 = dataStream_female.process(new SalarySplitBolt());
        //7.给每个人的工资增加20%
        SingleOutputStreamOperator<String> dataStream_male_1_2 = dataStream_male_1.process(new SalaryAddBolt());
        SingleOutputStreamOperator<String> dataStream_female_1_2 = dataStream_female_1.process(new SalaryAddBolt());
        //8.进行落地处理,输出处理结果到控制台中
        dataStream_male_1_2.addSink(new OutputBolt());
        dataStream_female_1_2.addSink(new OutputBolt());
        //9.执行flink程序
        env.execute();
    }

    static class SexSplitBolt extends ProcessFunction<String, String> {

        @Override
        public void processElement(String s, ProcessFunction<String, String>.Context context, Collector<String> collector) {
            Person person = JsonUtils.deserialize(s, Person.class);
            if (person.getSex() == Sex.Male) {
                context.output(MALE, s);
            } else {
                context.output(FEMALE, s);
            }
        }
    }

    static class SalarySplitBolt extends ProcessFunction<String, String> {

        @Override
        public void processElement(String s, ProcessFunction<String, String>.Context context, Collector<String> collector) {
            Person person = JsonUtils.deserialize(s, Person.class);
            if (person.getSalary() < 1000 || person.getSalary() > 20000) {
                return;
            }
            collector.collect(s);
        }
    }

    static class SalaryAddBolt extends ProcessFunction<String, String> {

        @Override
        public void processElement(String s, ProcessFunction<String, String>.Context context, Collector<String> collector) {
            Person person = JsonUtils.deserialize(s, Person.class);
            person.setSalary((int) (person.getSalary() * 1.2));
            collector.collect(s);
        }
    }

    static class OutputBolt implements SinkFunction<String> {
        @Override
        public void invoke(String value, Context context) {
            Person person = JsonUtils.deserialize(value, Person.class);
            System.out.println("已处理数据:" + person.getName() + "_" + person.getCountry() + "_" + person.getSalary());
        }
    }

}

控制台输出:
已处理数据:name_Beijing_9320

已处理数据:name_Beijing_1049

已处理数据:name_Beijing_11548

已处理数据:name_Shanghai_16721

已处理数据:name_Beijing_3638

已处理数据:name_Shanghai_18862

已处理数据:name_Shanghai_8183

已处理数据:name_Shanghai_18627

已处理数据:name_Shanghai_16510

已处理数据:name_Beijing_18983

已处理数据:name_Beijing_10734

已处理数据:name_Shanghai_1594

已处理数据:name_Beijing_3640

已处理数据:name_Shanghai_17757

已处理数据:name_Shanghai_16700

已处理数据:name_Beijing_1175

已处理数据:name_Beijing_11788

已处理数据:name_Shanghai_15732

已处理数据:name_Beijing_15137

已处理数据:name_Beijing_12853

已处理数据:name_Beijing_11548

已处理数据:name_Shanghai_1188

已处理数据:name_Shanghai_3202

已处理数据:name_Shanghai_3986

已处理数据:name_Shanghai_8193

已处理数据:name_Beijing_16093

已处理数据:name_Beijing_8274

已处理数据:name_Beijing_3630

已处理数据:name_Beijing_11962

已处理数据:name_Beijing_14441

已处理数据:name_Shanghai_5312

已处理数据:name_Beijing_1815

已处理数据:name_Shanghai_11287

已处理数据:name_Beijing_5848

已处理数据:name_Beijing_11056

已处理数据:name_Shanghai_12407

已处理数据:name_Shanghai_12652

已处理数据:name_Beijing_19864

已处理数据:name_Shanghai_18935

已处理数据:name_Shanghai_16894

已处理数据:name_Shanghai_9732

36570 [TaskExecutorLocalStateStoresManager shutdown hook] INFO o.a.f.r.s.TaskExecutorLocalStateStoresManager - Shutting down TaskExecutorLocalStateStoresManager.


#笔经#
全部评论
这些是银行开发需要掌握的知识吗?
点赞 回复 分享
发布于 2022-03-06 10:52
学到了,感谢楼主
点赞 回复 分享
发布于 2022-03-04 13:45

相关推荐

Beeee0927:是缅甸园区吗
点赞 评论 收藏
分享
不愿透露姓名的神秘牛友
07-11 17:10
什么素质,我请问呢,要掉小珍珠了。。。又憋屈又生气
Steven267:这不喷回去?花钱是大爷,记住这个道理
点赞 评论 收藏
分享
不愿透露姓名的神秘牛友
06-11 13:34
offe从四面八方来:我真的没时间陪你闹了
点赞 评论 收藏
分享
来个大佬救一下,为上投了都是石沉大海了,没实习经历的话怕秋招直接进不了面。什么实习这么难找,基本
心态爆炸了:现在正式的岗位都少,实习基本不咋招的,除了大厂,中小企业其实没那么多岗位需求,就算是有,大多都是招一两个廉价劳动力,同时,他们也会希望你一来就能干活的,没时间培训你,就让你了解公司的项目,你了解完就可以开始干活。再者是,很多低质量的实习其实用处没有那么大的。我去年也是找实习找到破防,最后去了一家深圳的小公司实习,工作对我来说很简单,甚至不如我在学校做的项目,秋招的时候,这段实习经历也并没有帮上什么忙,投递简历,依旧非常低的回复率。低回复率是常态,尤其是找实习,找不到,那就把重心放在优化自己的简历和项目,多看八股文,锻炼自己的面试能力,多看别人的面经,自己模拟面试,等秋招的时候,只要有那么寥寥几次,好好抓住那几次机会。
点赞 评论 收藏
分享
评论
2
2
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务