Flink学习-广播状态

什么是广播状态

Broadcast State是Flink支持的一种Operator State。使用Broadcast State,可以在Flink程序的一个Stream中输入数据记录,然后将这些数据记录广播(Broadcast)到下游的每个Task中,使得这些数据记录能够为所有的Task所共享,比如一些用于配置的数据记录。这样,每个Task在处理其所对应的Stream中记录的时候,读取这些配置,来满足实际数据处理需要。
另外,在一定程度上,Broadcast State能够使得Flink Job在运行过程中与外部的其他系统解耦合。比如,通常Flink会使用YARN来管理计算资源,使用Broadcast State就可以不用直接连接MySQL数据库读取相关配置信息了,也无需对MySQL做额外的授权操作。因为在一些场景下,会使用Flink on YARN部署模式,将Flink Job运行的资源申请和释放交给YARN去管理,那么就存在Hadoop集群节点扩缩容的问题,如新加节点可能需要对一些外部系统的访问,如MySQL等进行连接操作授权,如果忘记对MysQL访问授权,Flink Job被调度到新增的某个新增节点上连接并读取MySQL配置信息就会出错。

广播简单例子

连接ALERT_RULE广播,接收到字符串为A或B或C的猜输出,这里ALERT_RULE写死了strings集合,正常来说应该是一个数据源。

 final static MapStateDescriptor<String, String> ALERT_RULE = new MapStateDescriptor<>(
            "alert_rule",
            BasicTypeInfo.STRING_TYPE_INFO,
            BasicTypeInfo.STRING_TYPE_INFO);


    public static void main(String[] args) throws Exception {
        final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);
        StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool);
        List<String> strings = Arrays.asList("A", "B", "C");

        env.socketTextStream("127.0.0.1", 9200)
                .connect(env.fromCollection(strings).broadcast(ALERT_RULE))
                .process(new BroadcastProcessFunction<String, String, String>() {
                    @Override
                    public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
                        ReadOnlyBroadcastState<String, String> broadcastState = ctx.getBroadcastState(ALERT_RULE);
                        if (broadcastState.contains(value)) {
                            out.collect(value);
                        }
                    }

                    @Override
                    public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {
                        BroadcastState<String, String> broadcastState = ctx.getBroadcastState(ALERT_RULE);
                        broadcastState.put(value, value);
                    }
                })
                .print();

        env.execute();
    }
全部评论

相关推荐

点赞 1 评论
分享
牛客网
牛客企业服务