京东技术解密之配置中心DUCC
一、使用方法
简单说下DUCC的特点
支持多环境(或称分组),分组可以合并
内置强大的基于插件的数据绑定框架,支持多种类型等转换;
支持Log4j、Log4j2、Logback的动态修改日记级别功能。
支持Spring原生注解、支持自定义注解,客户端代码入侵性低
支持客户端多配置源,支持自定义配置,如ZK、Consol扩展
支持配置预案切换
接下来说说怎么用,下面我代码中的ConfiguratorManager就是DUCC的配置管理类
@Configuration
@Log4j2
@Order(value = 0)
public class Config extends PropertyPlaceholderConfigurer {
//发布系统的配置
private static Map<String,String> joneProperty = new HashMap<>();
//DUCC系统的配置
private static Map<String,String> duccProperty = new HashMap<>();
@Bean(initMethod = "start" , destroyMethod = "stop")
public ConfiguratorManager configuratorManager()
{
ConfiguratorManager configuratorManager = ConfiguratorManager.getInstance() ;
configuratorManager.setApplication(getJoneProperty("laf.config.manager.application"));
configuratorManager.addResource(new Resource("ucc",getJoneProperty("laf.config.manager.uri")));
configuratorManager.addListener(new ConfigurationListener.CustomConfigurationListener("ucc") {
@Override
public void onUpdate(com.jd.laf.config.Configuration configuration) {
List<Property> properties = configuration.getProperties();
for (Property property:properties) {
log.info("duccConfig update key:{}",property.getKey());
duccProperty.put(property.getKey(), String.valueOf(property.getValue()));
}
}
});
return configuratorManager ;
}
public String getDuccProperty(String key)
{
if(duccProperty.containsKey(key)) {
return duccProperty.get(key);
}
Property property = configuratorManager().getProperty(key);
if(property==null || String.valueOf(property.getValue()).isEmpty()) {
log.error("配置降级,key:{}",key);
return getJoneProperty(key);
}
duccProperty.put(key, String.valueOf(property.getValue()));
return String.valueOf(property.getValue());
}
@Override
protected void processProperties(ConfigurableListableBeanFactory beanFactory, Properties props)throws BeansException {
super.processProperties(beanFactory, props);
for (Object key : props.keySet()) {
String keyStr = key.toString();
joneProperty.put(keyStr, String.valueOf(props.getProperty(keyStr)));
}
}
public String getJoneProperty(String key)
{
return joneProperty.get(key);
}
}复制代码
二、DUCC各重要模块解读
1、ConfiguratorManager统一配置管理类
主要步骤:
1.1 从服务器获取配置信息
1.2 将配置保存到本地
1.3 启动更新事件消费者
1.4 为每个Resource(可以理解为分组)启动变更监控线程Watcher,监听配置变更
public class ConfiguratorManager implements ConfigurationSupplier, Watchable {
//通知
protected Notifier notifier = new Notifier() {
@Override
public <M, T extends Listener> void send(M property, List<T> listeners) {
inform(property, listeners);
}
};
//事件
protected BlockingQueue<Resource> events = new ArrayBlockingQueue<Resource>(5000);
@Override
public Property getProperty(final String key) {
//最终是通过configuration获取的,后面会讲到
return configuration.getProperty(name);
}
@Override
public boolean addListener(final PropertyListener listener) {
//判断是否重复添加
groupResources.addListener(listener.getKey(), listener);
}
/**
* 通知***,notify执行send方法时会触发
*
* @param property 变更的配置
* @param listeners ***
*/
protected <M, T extends Listener> void inform(final M property, final List<T> listeners) {
if (listeners == null) {
return;
}
for (final T listener : listeners) {
if (!isStarted()) {
return;
}
notifierThreads.execute(new Runnable() {
@Override
public void run() {
if (isStarted()) {
listener.onUpdate(property);
}
}
});
}
}
/**
* 启动
*/
public void start() throws Exception {
//验证降级文件路径可写等
validate();
synchronized (mutex) {
if (started.compareAndSet(false, true)) {
ExecutorService executorService = null;
try {
//资源URL解析,排序合并
initializeResource()
resource.setConfigurator(configurator)
//远程加载,初始化上下文context
startRemote(resources, executorService)
//通知,最后还是会执行到notifier.send()方法
inform(groupResource, groupConfiguration);
//启动变更监听
consumer = new Thread(new EventConsumer());
consumer.start();
//所有资源已经初始化过,安全启动***
//支持多个分组,推荐重复度高的单独抽出来
for (Resource resource : groupResources) {
if (resource.isReady()) {
resource.watch();
}
}
} catch (Exception e) {
stop();
throw e;
} finally {
if (executorService != null) {
executorService.shutdownNow();
}
}
}
}
}
protected class EventConsumer implements Runnable {
@Override
public void run() {
while (isStarted() && !Thread.currentThread().isInterrupted()) {
try {
Resource event = events.poll(1000, TimeUnit.MICROSECONDS);
if (event != null) {
onUpdateConfig(event);
}
} catch (InterruptedException e) {
//必须重置中断信号标记
Thread.currentThread().interrupt();
}
}
}
/**
* 资源-配置发生变更
*
* @param resource
*/
protected void onUpdateConfig(final Resource resource) {
//省略很多行
/获取旧的配置项
//获取新的配置项
//判断是否变更或者新增或者有删除
//通知配置***,最后还是会执行到notifier.send()
inform(newProperty, listeners);
}
}复制代码
2、***容器Observer
Observer包含了各种Listener,同时拥有一个ConfiguratorManager的成员变量,Lister最终会传递到该变量中
/**
* ***容器
*/
public class Observer implements InitializingBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent> {
protected ConfiguratorManager manager;
/**
*
* 省略其他
*/
public void addPropertyListener(final PropertyListener listener) {
propertyListeners.add(listener);
}
/**
* 添加***
*/
public synchronized void execute() {
if (!processed) {
if (manager != null) {
for (PropertyListener listener : propertyListeners) {
manager.addListener(listener);
}
for (ConfigurationListener listener : configurationListeners) {
manager.addListener(listener);
}
for (Map.Entry<String, String> entry : scriptListeners.entrySet()) {
manager.addListener(new JavaScriptListener(entry.getKey(), entry.getValue(), context));
}
//针对@ConfigurationPropties注解
for (ConfigPropertiesBean cfgBean : beans) {
String prefix = cfgBean.getPrefix();
Object bean = cfgBean.getBean();
if (bean == null && context.containsBean(cfgBean.getBeanName())) {
bean = context.getBean(cfgBean.getBeanName());
}
if (bean != null) {
//添加***
process(bean, cfgBean.getBeanName(), prefix);
}
}
}
processed = true;
}
}
/**
* 当BeanFactory设置完Bean属性后会调用此方法,可以添加初始化方法
***/
@Override
public void afterPropertiesSet() throws Exception {
//在BeanDefinition中定义注入的
if (fieldListeners != null) {
for (FieldListener fieldListener : fieldListeners) {
addFieldListener(fieldListener);
}
}
if (methodListeners != null) {
for (MethodListener methodListener : methodListeners) {
addMethodListener(methodListener);
}
}
}
/**
* 获取上下文的引用,需要实现ApplicationContextAware接口
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) {
this.context = applicationContext;
}
@Override
public void onApplicationEvent(final ContextRefreshedEvent event) {
if (event.getSource() == context) {
execute();
}
}
}复制代码
如果在上下文中部署一个实现了ApplicationListener接口的Bean,那么每当在一个ApplicationEvent发布到ApplicationContext时,这个Bean会得到通知,其实这就是标准的Oberver设计模式
当ApplicationContext实例完成后,会调用onApplicationEvent()方法,执行execute()方法,然后将PropertyListener/ConfigurationLister添加到ConfiguratorManager实例中
3、Bean实例化后置处理器ConfigPostProcessor
/**
* Bean实例化后置处理器,保存ConfiguratorManager实例,处理Bean的配置
*
*/
public class ConfigPostProcessor implements BeanPostProcessor, PriorityOrdered, ApplicationContextAware {
protected ApplicationContext applicationContext;
protected Observer observer;
/**
* PostProcessBeforeInitialization 方法会在Bean构造完成后(构造方法执行完成),初始化方法(init-method)方法调用之前被调用
* */
@Override
public Object postProcessBeforeInitialization(final Object bean, final String beanName) throws BeansException {
//处理配置对象
if (!process(bean, beanName, observer)) {
//处理属性***
doWithFields(bean.getClass(), new FieldCallback() {
@Override
public void doWith(final Field field) {
process(field, bean, beanName, observer);
}
}, new FieldFilter() {
@Override
public boolean matches(final Field field) {
return !Modifier.isFinal(field.getModifiers()) && !Modifier.isStatic(field.getModifiers());
}
});
//处理方法生成的配置对象
doWithMethods(bean.getClass(), new MethodCallback() {
@Override
public void doWith(Method method) {
process(method, bean, beanName, observer);
}
});
}
return bean;
}
/**
* 处理方法
*
* @param method
* @param bean
* @param beanName
*/
protected void process(final Method method, final Object bean, final String beanName, final Observer observer) {
}
/**
* 处理字段
*
* @param field
* @param bean
* @param beanName
*/
protected void process(final Field field, final Object bean, final String beanName, final Observer observer) {
//SPI服务发现
for (FieldProcessor processor : FIELD.extensions()) {
processor.process(bean, beanName, field, observer);
}
}
/**
* 处理Bean
*
* @param bean
* @param beanName
* @return
*/
protected boolean process(final Object bean, final String beanName, final Observer observer) {
}
/**
*维持Observer的成员变量
**/
@Override
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
if (bean instanceof ConfiguratorManager && observer.getManager() == null) {
observer.setManager((ConfiguratorManager) bean);
}
if (bean instanceof ConfigurationListener) {
observer.addConfigurationListener((ConfigurationListener) bean);
}
if (bean instanceof PropertyListener) {
observer.addPropertyListener((PropertyListener) bean);
}
return bean;
}
@Override
public int getOrder() {
//优先级要放最低
return Ordered.LOWEST_PRECEDENCE;
}
@Override
public void setApplicationContext(ApplicationContext context) throws BeansException {
this.applicationContext = context;
this.observer = context.getBean(Observer.class);
}
}复制代码
我们可以看到ConfigPostProcessor加载通过SPI服务发现的方法\字段处理类,然后执行process方法,其内部封装了Observer.addPropertyListener
4、Bean工厂后置处理器PropertySourcesFactorPostProcess
/**
* 注册PropertySource
*/
public class PropertySourcesFactoryPostProcessor implements BeanFactoryPostProcessor, EnvironmentAware, PriorityOrdered {
protected ConfigurableEnvironment environment;
@Override
public void setEnvironment(Environment environment) {
this.environment = (ConfigurableEnvironment) environment;
}
@Override
public int getOrder() {
//最低优先级,先处理Spring内置的PropertySources
return Ordered.LOWEST_PRECEDENCE;
}
@Override
public void postProcessBeanFactory(final ConfigurableListableBeanFactory factory) throws BeansException {
//其它的PropertySource已经加载了,可以安全创建ConfiguratorManager
ConfiguratorManager manager = factory.getBean(CONFIGURATOR_MANAGER, ConfiguratorManager.class);
Observer observer = factory.getBean(OBSERVER_BEAN_NAME, Observer.class);
//将manager设置为observer的成员变量
if (observer.getManager() == null) {
observer.setManager(manager);
}
// 注册 Spring 属性配置
environment.getPropertySources().addFirst(new ConfigSource(manager));
//Bean的定义已经注册,由于ConfiguratorManager延迟加载,存在Bean定义中的占位符没有被替换的情况。
//用ConfiguratorManager替换一下剩余的占位符
resolvePlaceHolder(factory, manager);
}
/**
* 解析Bean的变量
*
* @param factory
* @param manager
*/
protected void resolvePlaceHolder(final ConfigurableListableBeanFactory factory, final ConfiguratorManager manager) {
StringValueResolver valueResolver = new ConfigResolver(manager);
BeanDefinitionVisitor visitor = new BeanDefinitionVisitor(valueResolver);
String[] beanNames = factory.getBeanDefinitionNames();
for (String beanName : beanNames) {
if (!factory.containsSingleton(beanName)) {
//Bean已经实例化为单例
BeanDefinition bd = factory.getBeanDefinition(beanName);
try {
visitor.visitBeanDefinition(bd);
} catch (Exception ex) {
throw new BeanDefinitionStoreException(bd.getResourceDescription(), beanName, ex.getMessage(), ex);
}
}
}
// New in Spring 2.5: resolve placeholders in alias target names and aliases as well.
factory.resolveAliases(valueResolver);
// New in Spring 3.0: resolve placeholders in embedded values such as annotation attributes.
factory.addEmbeddedValueResolver(valueResolver);
}
}复制代码
可以看到在PostProcessBeanFactory方法中,实例化了ConfiguratorManager和Observer,并把Manager设置为Observer的成员变量。另外构造了一个包含Manager的配置属性源propertrySources(属性集合,内部封装个多个k/v),并放到Sping属性源的第一个
5、资源配置器
public abstract class AbstractConfigurator implements Configurator, Prototype {
/**
* 初始化
*
* @param context
* @throws Exception
*/
Configurator setup(Context context) throws Exception {
}
/**
* 拉取配置
*
* @param version 当前版本
* @param longPolling 长轮询时间
* @return
* @throws Exception
*/
Configuration pull(long version, int longPolling) throws Exception {
}
/**
* 监听配置变化
*
* @param version 当前版本
* @return
*/
boolean watch(long version) {
//这里只判断properties是否有更新,不涉及更新的具体类型
if (!configuration.equals(resource.getConfiguration())) {
resource.setConfiguration(configuration);
//最终调用的是events.add(resource),也就是manager中的那个events
context.fire(resource);
}
}
/**
* 停止监听
*/
void stop() {}
/**
* 资源来源
*
* @return 资源来源。
*/
Source source() {}
/**
* 从URL中返回资源名称
*
* @param url URL
* @return 资源名称
*/
String name(URL url){}
}复制代码
Configuration主要是对properties进行操作,Resource封装了Configuration。DUCC通过SPI服务发现将FileConfigurator、SystemConfigurator等extends了AbstractConfigurator的类自动加载进来从而达到可插拨扩展其他配置源的效果,也是通过这种机制支持所有数据格式、适配其他操作系统、实现方法字段属性配置化
6、SPI服务发现
Java SPI 实际上是“基于接口的编程+策略模式+配置文件”组合实现的动态加载机制,常见的JDBC、SLF4门面就是通过这个实现的,下面是DUCC的运用
/**
* 插件管理器
*/
public interface Plugin {
/**
* 字段处理器扩展点
*/
ExtensionPoint<FieldProcessor, String> FIELD = new ExtensionPointLazy<FieldProcessor, String>(FieldProcessor.class, SpiLoader.INSTANCE, null, null);
/**
* Bean处理器扩展点
*/
ExtensionPoint<BeanProcessor, String> BEAN = new ExtensionPointLazy<BeanProcessor, String>(BeanProcessor.class, SpiLoader.INSTANCE, null, null);
/**
* 方法处理器扩展点
*/
ExtensionPoint<MethodProcessor, String> METHOD = new ExtensionPointLazy<MethodProcessor, String>(MethodProcessor.class, SpiLoader.INSTANCE, null, null);
}
public class SpiLoader implements ExtensionLoader {
public static final ExtensionLoader INSTANCE = new SpiLoader();
public SpiLoader() {
}
public <T> Collection<Plugin<T>> load(Class<T> clazz) {
if (clazz == null) {
return null;
} else {
List<Plugin<T>> result = new LinkedList();
ServiceLoader<T> plugins = ServiceLoader.load(clazz);
Iterator var4 = plugins.iterator();
while(var4.hasNext()) {
//这里的plugin不是上面的那个plugin
T plugin = var4.next();
result.add(new Plugin(new Name(plugin.getClass()), plugin, this));
}
return result;
}
}
}复制代码
通过SPI机制将接口的实现类全部加载并实例化一遍,前提是实现类名称放在"META-INF/services/接口名"。当在ConfigPostProcessor中执行METHOD.extensions()时会将实现了MethodProcessor接口的实例取出来
三、SpingBoot注解(@EnableLafConfig)流程总结
DUCC通过实现ImportBeanDefinitionRegistrar接口,将指定的类注册到Spring Boot容器中,另外必须定义一个Java配置类(带有注解@Configuration)通过@Import指定ImportBeanDefinitionRegistrar的实现类。在registerBeanDefinitions()完成了以下几个类的BeanDefinition的注册:
- 1、配置管理器ConfiguratorManager,***容器Observer
- 2、Bean实例化后置处理器ConfigPostProcessor
- 3、Bean工厂后置处理器PropertySourcesFactoryPostProcessor
四、Spring初始化流程
主要流程在AbstractApplicationContext.refresh()中
流程图备注
- 1、InvokeBeanFactoryPostProcessors()
在Bean未开始实例化时,执行工厂后置处理器,会查找所有BeanFactoryPostProcessor实现类Bean,并且调用方法PostProcessBeanDefinitionRegistry,修改Definition的定义
注意:DUCC中PropertySourcesFactoryPostProcessor实现了BeanFactoryPostProcessor把ConfiguratorManager赋值给Observer - 2、RegisterBeanPostProcessors()
将BeanPostProcessors的实现类添加到工厂的RegisterBeanPostProcessors中
五、Bean实例生命周期
文章来源:www.liangsonghua.me
作者介绍:京东资深工程师-梁松华,长期关注稳定性保障、敏捷开发、JAVA高级、微服务架构