利用Flink对数据进行分流,分阶处理
Storm迁移flink主要问题:
- Storm通过自定义的Bolt类实现自己的业务逻辑,如何在flink中实现
通过flink的ProcessFuction类实现,可以通过继承该类,在processElement方法中实现自己的业务逻辑。
- Storm按照业务类型分发数据处理的逻辑,如何在flink中实现
通过flink的旁路输出特性实现,对原始的数据流按照某些分类标准分类,输出到不同的子数据流中处理。
总体处理流程:
Flink从Kafka中读取数据,作为初始数据流initDataStream;
对初始数据流进行分类处理,将子数据输出到数据流dataStream1、dataStream2、......、dataStreamn中;
按照自己的业务逻辑,对子数据流dataStream1_1、dataStream2_1、......、dataStreamn_1处理,得到dataStream1_2、dataStream2_2、......、dataStreamn_2,经过多次处理,最终得到dataStream1_x、dataStream2_y、......、dataStreamn_z
最后利用SinkFuction进行落地处理
例:利用flink对北京、上海两城市民信息进行过滤,过滤掉工资小于1000和大于20000的数据,然后按姓名_城市_收入的格式输出
输入:个人信息{姓名、年龄、性别、城市、收入}
输出:姓名_城市_收入,如:a_b_100
Person类:
public class Person { private String name; private Integer age; private Sex sex; private Integer salary; private Country country; //getter、setter... } enum Sex { Male, Female } enum Country { Beijing, Shanghai }
生产者程序:
public class FlinkProducer { //kafka配置文件 private static final Properties properties; static { //将kafka地址放入配置文件 properties = new Properties(); properties.put("bootstrap.servers", "10.225.173.107:9092,10.225.173.108:9092,10.225.173.109:9092"); } public static void main(String[] args) throws Exception { //创建flink环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //连接kafka FlinkKafkaProducer<String> flinkKafkaProducer = new FlinkKafkaProducer<>("flink", new SimpleStringSchema(), properties); //初始化数据源 String[] initData = getData(); DataStreamSource<String> source = env.fromElements(initData); //发送数据 source.addSink(flinkKafkaProducer); //执行flink程序 env.execute(); } //模拟数据源 private static String[] getData() { String[] data = new String[100]; for (int i = 0; i < 100; i++) { Person person = new Person(); person.setName("name"); person.setAge(18); //设置性别 int sex = (int) (Math.random() * 100); if (sex % 2 == 0) { person.setSex(Sex.Female); } else { person.setSex(Sex.Male); } //设置城市 int country = (int) (Math.random() * 100); if (country % 2 == 0) { person.setCountry(Country.Beijing); } else { person.setCountry(Country.Shanghai); } //设置收入 person.setSalary((int) (Math.random() * 50000)); data[i] = JsonUtils.serialize(person); } return data; }
}
消费者程序:
public class FlinkConsumer { //kafka配置文件 private static final Properties properties; //男性标签 private static final OutputTag<String> MALE = new OutputTag<>("male", Types.STRING); //女性标签 private static final OutputTag<String> FEMALE = new OutputTag<>("female", Types.STRING); static { //将kafka地址放入配置文件 properties = new Properties(); properties.put("bootstrap.servers", "10.225.173.107:9092,10.225.173.108:9092,10.225.173.109:9092"); } public static void main(String[] args) throws Exception { //1.创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.flink连接kafka并读取数据源 FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>("flink", new SimpleStringSchema(), properties); //3.获取初始数据源 DataStreamSource<String> initDataStream = env.addSource(flinkKafkaConsumer); //4.对数据源进行分类处理 SingleOutputStreamOperator<String> process = initDataStream.process(new SexSplitBolt()); //5.分别获取仅仅包含男性、女性数据的数据流 DataStream<String> dataStream_male = process.getSideOutput(MALE); DataStream<String> dataStream_female = process.getSideOutput(FEMALE); //6.过滤掉工资小于1000和大于20000的数据 SingleOutputStreamOperator<String> dataStream_male_1 = dataStream_male.process(new SalarySplitBolt()); SingleOutputStreamOperator<String> dataStream_female_1 = dataStream_female.process(new SalarySplitBolt()); //7.给每个人的工资增加20% SingleOutputStreamOperator<String> dataStream_male_1_2 = dataStream_male_1.process(new SalaryAddBolt()); SingleOutputStreamOperator<String> dataStream_female_1_2 = dataStream_female_1.process(new SalaryAddBolt()); //8.进行落地处理,输出处理结果到控制台中 dataStream_male_1_2.addSink(new OutputBolt()); dataStream_female_1_2.addSink(new OutputBolt()); //9.执行flink程序 env.execute(); } static class SexSplitBolt extends ProcessFunction<String, String> { @Override public void processElement(String s, ProcessFunction<String, String>.Context context, Collector<String> collector) { Person person = JsonUtils.deserialize(s, Person.class); if (person.getSex() == Sex.Male) { context.output(MALE, s); } else { context.output(FEMALE, s); } } } static class SalarySplitBolt extends ProcessFunction<String, String> { @Override public void processElement(String s, ProcessFunction<String, String>.Context context, Collector<String> collector) { Person person = JsonUtils.deserialize(s, Person.class); if (person.getSalary() < 1000 || person.getSalary() > 20000) { return; } collector.collect(s); } } static class SalaryAddBolt extends ProcessFunction<String, String> { @Override public void processElement(String s, ProcessFunction<String, String>.Context context, Collector<String> collector) { Person person = JsonUtils.deserialize(s, Person.class); person.setSalary((int) (person.getSalary() * 1.2)); collector.collect(s); } } static class OutputBolt implements SinkFunction<String> { @Override public void invoke(String value, Context context) { Person person = JsonUtils.deserialize(value, Person.class); System.out.println("已处理数据:" + person.getName() + "_" + person.getCountry() + "_" + person.getSalary()); } } }
控制台输出:
已处理数据:name_Beijing_9320
已处理数据:name_Beijing_1049
已处理数据:name_Beijing_11548
已处理数据:name_Shanghai_16721
已处理数据:name_Beijing_3638
已处理数据:name_Shanghai_18862
已处理数据:name_Shanghai_8183
已处理数据:name_Shanghai_18627
已处理数据:name_Shanghai_16510
已处理数据:name_Beijing_18983
已处理数据:name_Beijing_10734
已处理数据:name_Shanghai_1594
已处理数据:name_Beijing_3640
已处理数据:name_Shanghai_17757
已处理数据:name_Shanghai_16700
已处理数据:name_Beijing_1175
已处理数据:name_Beijing_11788
已处理数据:name_Shanghai_15732
已处理数据:name_Beijing_15137
已处理数据:name_Beijing_12853
已处理数据:name_Beijing_11548
已处理数据:name_Shanghai_1188
已处理数据:name_Shanghai_3202
已处理数据:name_Shanghai_3986
已处理数据:name_Shanghai_8193
已处理数据:name_Beijing_16093
已处理数据:name_Beijing_8274
已处理数据:name_Beijing_3630
已处理数据:name_Beijing_11962
已处理数据:name_Beijing_14441
已处理数据:name_Shanghai_5312
已处理数据:name_Beijing_1815
已处理数据:name_Shanghai_11287
已处理数据:name_Beijing_5848
已处理数据:name_Beijing_11056
已处理数据:name_Shanghai_12407
已处理数据:name_Shanghai_12652
已处理数据:name_Beijing_19864
已处理数据:name_Shanghai_18935
已处理数据:name_Shanghai_16894
已处理数据:name_Shanghai_9732
36570 [TaskExecutorLocalStateStoresManager shutdown hook] INFO o.a.f.r.s.TaskExecutorLocalStateStoresManager - Shutting down TaskExecutorLocalStateStoresManager.
#笔经#