Flink旁路输出简单实例:根据股价区分股票类型并写出到文件

关于旁路输出的官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/datastream/side_output/

除了由 DataStream 操作产生的主要流之外,我们还可以产生任意数量的旁路输出结果流。结果流中的数据类型不必与主要流中的数据类型相匹配,并且不同旁路输出的类型也可以不同。

使用旁路输出时,首先需要定义用于标识旁路输出流的OutputTag类对象。
构造方法的第一个参数表示一个区分旁路输出流的id标识,第二个参数表示要处理的数据类型。

OutputTag<String> outputTag = new OutputTag<String>("side-output", Types.STRING);

定义旁路输出标签后,通过主输出流的process方法,把数据发送到旁路输出流中。

SingleOutputStreamOperator<String> process = input
  .process(new ProcessFunction<String, Object>() {

      @Override
      public void processElement(
          String value,
          Context ctx,
          Collector<Object> collector) throws Exception {
        // 发送数据到主要的输出
        collector.collect(value);
        // 发送数据到旁路输出
        ctx.output(outputTag, "sideout-" + value);
      }
    });

我们可以在 DataStream 运算结果上使用 getSideOutput(OutputTag) 方法获取旁路输出流,这将产生一个与旁路输出流结果类型一致的 DataStream。

DataStream<String> sideOutputStream = mainDataStream.getSideOutput(outputTag);

以股票为例,在发送股票数据时,我们假设股价小于50就是低价股,否则就是高级股,在发送股票数据时,我们希望把高价股和低价股分别写出到不同的文件中保存起来。

首先,创建一个股票类Stock:

public class Stock {
    //股票名称
    private String name;
    //股票价格
    private Integer price;
    //构造方法、getter、setter、toString方法在此省略
}

其次,编写Flink消费者程序:

public class FlinkKafkaConsumer{

    public static void main(String[] args) throws Exception {
        //1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.配置文件
        Properties props = new Properties();
        props.put("bootstrap.servers","Kafka集群地址");
        //3.构造消费者
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("stock", new SimpleStringSchema(), props);
        //4.配置消费者
        DataStreamSource stream = env.addSource(consumer);
        //5.data sick
        stream.addSick(new SinkFunction<String>(){
            @Override
            public void invoke(String value, Context context){
                Syestem.out.println("当前已处理的数据:" + JsonUtils.deserialize(value, Stock.class));
            }
        });
        //6.执行程序
        env.execute("消费者程序");
    }
}

接着,编写生产者程序:

​public class FlinkProducer {

    public static void main(String[] args) throws Exception {
        //结果输出路径
        String outputPath1 = "C:\\Users\\xxx\\Desktop\\result1";
        String outputPath2 = "C:\\Users\\xxx\\Desktop\\result2";
        //创建数据源
        ArrayList<String> stocks = new ArrayList<>();
        for (int i = 0; i < 100 ; i++) {
            stocks.add(JsonUtils.serialize(new Stock("stock-" + i, (int)(Math.random() * 100))));
        }
        //创建消费者环境
        StreamExecutionEnvironment producerEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        //配置文件
        Properties props = new Properties();
        props.put("bootstrap.servers", "10.225.173.107:9092,10.225.173.108:9092,10.225.173.109:9092");
        //构建生产者
        FlinkKafkaProducer<String> producer_all = new FlinkKafkaProducer<>("STOCK", new SimpleStringSchema(), props);
        //配置数据源
        DataStreamSource<String> stream_all = producerEnvironment.fromCollection(stocks);
        //创建旁路输出
        final OutputTag<String> outputTag_lowPrice = new OutputTag<>("STOCK_LOW_PRICE", Types.STRING);
        final OutputTag<String> outputTag_highPrice = new OutputTag<>("STOCK_HIGH_PRICE", Types.STRING);
        //配置旁路输出
        SingleOutputStreamOperator<Object> process = stream_all.process(new ProcessFunction<String, Object>() {
            @Override
            public void processElement(String s, ProcessFunction<String, Object>.Context context, Collector<Object> collector) {
                Stock stock = JsonUtils.deserialize(s, Stock.class);
                collector.collect(s);
                if (stock.getPrice() < 50) {
                    context.output(outputTag_lowPrice, s);
                } else {
                    context.output(outputTag_highPrice, s);
                }
            }
        });
        //获取低价股票和高价股票的旁路输出
        DataStream<String> stream_lowPrice = process.getSideOutput(outputTag_lowPrice);
        DataStream<String> stream_highPrice = process.getSideOutput(outputTag_highPrice);
        //配置生产者
        stream_all.addSink(producer_all);
        //处理低价股票
        stream_lowPrice.writeAsText(outputPath1, FileSystem.WriteMode.OVERWRITE);
        //处理高价股票
        stream_highPrice.writeAsText(outputPath2, FileSystem.WriteMode.OVERWRITE);
        //执行flink程序
        producerEnvironment.execute("生产者流处理");
    }

}

依次启动消费者程序、生产者程序,观察消费者程序控制台中的输出:
在这里插入图片描述
此时,桌面生成了两个文件夹,result1记录了小于50的股票,result2相反:
在这里插入图片描述

#学习路径#
全部评论
工资多少
点赞 回复 分享
发布于 2022-02-25 14:48

相关推荐

学java时间比较短不到三个月,基本的技术栈都过了一遍就是都不太深,有个小项目。是继续找实习还是沉淀准备秋招呢?找实习的话会花很多时间在八股,放弃的话又怕秋招简历太难看。有无大佬支招
今天java了吗:1.一定要找实习,实习不一定要去,但是找实习过程中的面试经验和心态经验才是最重要的 2.八股本来就是大头,甚至比项目重要 3.这个时间段也是面试比较多的阶段,可以抓住机会锻炼。面试才会发现自己的不足,感觉自己会了和能给面试官娓娓道来是两码事
点赞 评论 收藏
分享
05-24 14:12
门头沟学院 Java
点赞 评论 收藏
分享
评论
点赞
1
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务