Flink旁路输出简单实例:根据股价区分股票类型并写出到文件
关于旁路输出的官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/datastream/side_output/
除了由 DataStream 操作产生的主要流之外,我们还可以产生任意数量的旁路输出结果流。结果流中的数据类型不必与主要流中的数据类型相匹配,并且不同旁路输出的类型也可以不同。
使用旁路输出时,首先需要定义用于标识旁路输出流的OutputTag类对象。
构造方法的第一个参数表示一个区分旁路输出流的id标识,第二个参数表示要处理的数据类型。
OutputTag<String> outputTag = new OutputTag<String>("side-output", Types.STRING);
定义旁路输出标签后,通过主输出流的process方法,把数据发送到旁路输出流中。
SingleOutputStreamOperator<String> process = input .process(new ProcessFunction<String, Object>() { @Override public void processElement( String value, Context ctx, Collector<Object> collector) throws Exception { // 发送数据到主要的输出 collector.collect(value); // 发送数据到旁路输出 ctx.output(outputTag, "sideout-" + value); } });
我们可以在 DataStream 运算结果上使用 getSideOutput(OutputTag) 方法获取旁路输出流,这将产生一个与旁路输出流结果类型一致的 DataStream。
DataStream<String> sideOutputStream = mainDataStream.getSideOutput(outputTag);
以股票为例,在发送股票数据时,我们假设股价小于50就是低价股,否则就是高级股,在发送股票数据时,我们希望把高价股和低价股分别写出到不同的文件中保存起来。
首先,创建一个股票类Stock:
public class Stock { //股票名称 private String name; //股票价格 private Integer price; //构造方法、getter、setter、toString方法在此省略 }
其次,编写Flink消费者程序:
public class FlinkKafkaConsumer{ public static void main(String[] args) throws Exception { //1.创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.配置文件 Properties props = new Properties(); props.put("bootstrap.servers","Kafka集群地址"); //3.构造消费者 FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("stock", new SimpleStringSchema(), props); //4.配置消费者 DataStreamSource stream = env.addSource(consumer); //5.data sick stream.addSick(new SinkFunction<String>(){ @Override public void invoke(String value, Context context){ Syestem.out.println("当前已处理的数据:" + JsonUtils.deserialize(value, Stock.class)); } }); //6.执行程序 env.execute("消费者程序"); } }
接着,编写生产者程序:
public class FlinkProducer { public static void main(String[] args) throws Exception { //结果输出路径 String outputPath1 = "C:\\Users\\xxx\\Desktop\\result1"; String outputPath2 = "C:\\Users\\xxx\\Desktop\\result2"; //创建数据源 ArrayList<String> stocks = new ArrayList<>(); for (int i = 0; i < 100 ; i++) { stocks.add(JsonUtils.serialize(new Stock("stock-" + i, (int)(Math.random() * 100)))); } //创建消费者环境 StreamExecutionEnvironment producerEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); //配置文件 Properties props = new Properties(); props.put("bootstrap.servers", "10.225.173.107:9092,10.225.173.108:9092,10.225.173.109:9092"); //构建生产者 FlinkKafkaProducer<String> producer_all = new FlinkKafkaProducer<>("STOCK", new SimpleStringSchema(), props); //配置数据源 DataStreamSource<String> stream_all = producerEnvironment.fromCollection(stocks); //创建旁路输出 final OutputTag<String> outputTag_lowPrice = new OutputTag<>("STOCK_LOW_PRICE", Types.STRING); final OutputTag<String> outputTag_highPrice = new OutputTag<>("STOCK_HIGH_PRICE", Types.STRING); //配置旁路输出 SingleOutputStreamOperator<Object> process = stream_all.process(new ProcessFunction<String, Object>() { @Override public void processElement(String s, ProcessFunction<String, Object>.Context context, Collector<Object> collector) { Stock stock = JsonUtils.deserialize(s, Stock.class); collector.collect(s); if (stock.getPrice() < 50) { context.output(outputTag_lowPrice, s); } else { context.output(outputTag_highPrice, s); } } }); //获取低价股票和高价股票的旁路输出 DataStream<String> stream_lowPrice = process.getSideOutput(outputTag_lowPrice); DataStream<String> stream_highPrice = process.getSideOutput(outputTag_highPrice); //配置生产者 stream_all.addSink(producer_all); //处理低价股票 stream_lowPrice.writeAsText(outputPath1, FileSystem.WriteMode.OVERWRITE); //处理高价股票 stream_highPrice.writeAsText(outputPath2, FileSystem.WriteMode.OVERWRITE); //执行flink程序 producerEnvironment.execute("生产者流处理"); } }
依次启动消费者程序、生产者程序,观察消费者程序控制台中的输出:
此时,桌面生成了两个文件夹,result1记录了小于50的股票,result2相反: