java 中间件介绍: Apollo

Apollo

简介

Apollo中的几个核心概念:

  1. application (应用)
    • 这个很好理解,就是实际使用配置的应用,Apollo客户端在运行时需要知道当前应用是谁,从而可以去获取对应的配置
    • 每个应用都需要有唯一的身份标识 – appId,我们认为应用身份是跟着代码走的,所以需要在代码中配置,具体信息请参见Java客户端使用指南
  2. environment (环境)
    • 配置对应的环境,Apollo客户端在运行时需要知道当前应用处于哪个环境,从而可以去获取应用的配置
    • 我们认为环境和代码无关,同一份代码部署在不同的环境就应该能够获取到不同环境的配置
    • 所以环境默认是通过读取机器上的配置(server.properties中的env属性)指定的,不过为了开发方便,我们也支持运行时通过System Property等指定,具体信息请参见Java客户端使用指南
  3. cluster (集群)
    • 一个应用下不同实例的分组,比如典型的可以按照数据中心分,把上海机房的应用实例分为一个集群,把北京机房的应用实例分为另一个集群。
    • 对不同的cluster,同一个配置可以有不一样的值,如zookeeper地址。
    • 集群默认是通过读取机器上的配置(server.properties中的idc属性)指定的,不过也支持运行时通过System Property指定,具体信息请参见Java客户端使用指南
  4. namespace (命名空间)
    • 一个应用下不同配置的分组,可以简单地把namespace类比为文件,不同类型的配置存放在不同的文件中,如数据库配置文件,RPC配置文件,应用自身的配置文件等
    • 应用可以直接读取到公共组件的配置namespace,如DAL,RPC等
    • 应用也可以通过继承公共组件的配置namespace来对公共组件的配置做调整,如DAL的初始数据库连接数

上图简要描述了Apollo的总体设计,我们可以从下往上看:

  • Config Service提供配置的读取、推送等功能,服务对象是Apollo客户端
  • Admin Service提供配置的修改、发布等功能,服务对象是Apollo Portal(管理界面)
  • Config Service和Admin Service都是多实例、无状态部署,所以需要将自己注册到Eureka中并保持心跳
  • 在Eureka之上我们架了一层Meta Server用于封装Eureka的服务发现接口
  • Client通过域名访问Meta Server获取Config
  • Service服务列表(IP+Port),而后直接通过IP+Port访问服务,同时在Client侧会做load balance、错误重试
  • Portal通过域名访问Meta Server获取Admin
  • Service服务列表(IP+Port),而后直接通过IP+Port访问服务,同时在Portal侧会做load balance、错误重试

为了简化部署,我们实际上会把Config Service、Eureka和Meta Server三个逻辑角色部署在同一个JVM进程中

基础

如何工作?

@EnableApolloConfig引出ApolloConfigRegistrar注册类

  • 因为继承了ImportBeanDefinitionRegistrar
  • 从而调用了registerBeanDefinitions
  • 从而注册了PropertySourcesProcessor


PropertySourcesProcessor引出ConfigService
ConfigService引出ConfigManager并且是guice框架
首先Spi找出DefaultInjector
然后利用DefaultInjector找出相关的guice框架的关联类
利用guice找到ConfigManager—->DefaultConfigManager


DefaultConfigManager又可以看到getConfig方法主要来找配置
如果不存在则利用ConfigFactoryManager去寻找工厂并通过工厂创建相关的config
然后通过DefaultConfigFactory去create相关的配置
先看本地如果本地没有则加载网络上的


RemoteConfigRepository,主要获取本地的appid和cluster以及dataCenter
然后带有重试机制的去拿服务器的相关配置
然后还引出了RemoteConfigRepository线程池定时刷新远程一旦previous!=current就获取
并激发RepositoryChangeListener的观察者
因为LocalFileConfigRepository实现了RepositoryChangeListener所以会做更新本地缓存

相关配置

1
2
开发环境名和集群名
/opt/settings/server.properties

源码相关-客户端

客户端模块-结构图

客户端-流程时序图

Apollo Bean配置获取流程

ApolloConfigRegistrar

@EnableApolloConfig引出ApolloConfigRegistrar注册类
因为继承了ImportBeanDefinitionRegistrar
从而调用了registerBeanDefinitions
从而注册了PropertySourcesProcessor

1
在AbstractApplicationContext#invokeBeanFactoryPostProcessors里的过程

ConfigService

PropertySourcesProcessor引出ConfigService

AbstractApplicationContext#invokeBeanFactoryPostProcessors方法体系中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
PropertySourcesProcessor#postProcessBeanFactory
PropertySourcesProcessor#initializePropertySources


protected void initializePropertySources() {

....

ImmutableSortedSet<Integer> orders = ImmutableSortedSet.copyOf(NAMESPACE_NAMES.keySet());
Iterator<Integer> iterator = orders.iterator();

while (iterator.hasNext()) {
int order = iterator.next();
for (String namespace : NAMESPACE_NAMES.get(order)) {
Config config = ConfigService.getConfig(namespace);

composite.addPropertySource(new ConfigPropertySource(namespace, config));
}
}

environment.getPropertySources().addFirst(composite);
}

ConfigManager

由ConfigService引出ConfigManager并且是guice框架

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
ConfigService#getConfig

public static Config getConfig(String namespace) {
return s_instance.getManager().getConfig(namespace);
}

private ConfigManager getManager() {
if (m_configManager == null) {
synchronized (this) {
if (m_configManager == null) {
m_configManager = ApolloInjector.getInstance(ConfigManager.class);
}
}
}
return m_configManager;
}

首先Spi找出DefaultInjector,然后利用DefaultInjector找出相关的guice框架的关联类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
ServiceBootstrap.loadFirst(Injector.class);

public DefaultInjector() {
try {
m_injector = Guice.createInjector(new ApolloModule());
} catch (Throwable ex) {
ApolloConfigException exception = new ApolloConfigException("Unable to initialize Guice Injector!", ex);
Tracer.logError(exception);
throw exception;
}
}

private static class ApolloModule extends AbstractModule {
@Override
protected void configure() {
bind(ConfigManager.class).to(DefaultConfigManager.class).in(Singleton.class);
bind(ConfigFactoryManager.class).to(DefaultConfigFactoryManager.class).in(Singleton.class);
bind(ConfigRegistry.class).to(DefaultConfigRegistry.class).in(Singleton.class);
bind(ConfigFactory.class).to(DefaultConfigFactory.class).in(Singleton.class);
bind(ConfigUtil.class).in(Singleton.class);
bind(HttpUtil.class).in(Singleton.class);
bind(ConfigServiceLocator.class).in(Singleton.class);
bind(RemoteConfigLongPollService.class).in(Singleton.class);
}
}

看完guice的绑定就知道如何利用guice找到ConfigManager—->DefaultConfigManager

ConfigFactoryManager&&DefaultConfigFactory

在DefaultConfigManager又可以看到getConfig方法主要来找配置,如果不存在则利用ConfigFactoryManager去寻找工厂并通过工厂创建相关的config

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Override
public Config getConfig(String namespace) {
Config config = m_configs.get(namespace);

if (config == null) {
synchronized (this) {
config = m_configs.get(namespace);

if (config == null) {
ConfigFactory factory = m_factoryManager.getFactory(namespace);

config = factory.create(namespace);
m_configs.put(namespace, config);
}
}
}

return config;
}


//又通过guice寻找ConfigFactoryManager
public DefaultConfigManager() {
m_factoryManager = ApolloInjector.getInstance(ConfigFactoryManager.class);
}

从ConfigFactoryManager找到了相关的DefaultConfigFactory

然后通过DefaultConfigFactory去create相关的配置,先看本地如果本地没有则加载网络上的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
public Config create(String namespace) {
DefaultConfig defaultConfig = new DefaultConfig(namespace, createLocalConfigRepository(namespace));
return defaultConfig;
}


LocalFileConfigRepository createLocalConfigRepository(String namespace) {
if (m_configUtil.isInLocalMode()) {
logger.warn("==== Apollo is in local mode! Won't pull configs from remote server for namespace {} ! ====",namespace);
return new LocalFileConfigRepository(namespace);
}
return new LocalFileConfigRepository(namespace, createRemoteConfigRepository(namespace));
}


RemoteConfigRepository createRemoteConfigRepository(String namespace) {
return new RemoteConfigRepository(namespace);
}

RemoteConfigRepository

主要获取本地的appid和cluster以及dataCenter,然后带有重试机制的去拿服务器的相关配置
然后还引出了RemoteConfigRepository线程池定时刷新远程一旦previous!=current就获取
并激发RepositoryChangeListener的观察者
因为LocalFileConfigRepository实现了RepositoryChangeListener所以会做更新本地缓存

这个类主要加载远程apollo服务器上的配置文件的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
private ApolloConfig loadApolloConfig() {
.....

String appId = m_configUtil.getAppId();
String cluster = m_configUtil.getCluster();
String dataCenter = m_configUtil.getDataCenter();

......

List<ServiceDTO> configServices = getConfigServices();
String url = null;
for (int i = 0; i < maxRetries; i++) {
.....

for (ServiceDTO configService : randomConfigServices) {

url = assembleQueryConfigUrl(configService.getHomepageUrl(), appId, cluster, m_namespace,
dataCenter, m_remoteMessages.get(), m_configCache.get());


HttpRequest request = new HttpRequest(url);

Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "queryConfig");
transaction.addData("Url", url);
try {

HttpResponse<ApolloConfig> response = m_httpUtil.doGet(request, ApolloConfig.class);

...


ApolloConfig result = response.getBody();

return result;
} catch (ApolloConfigStatusCodeException ex) {
.....
} catch (Throwable ex) {
......
} finally {
transaction.complete();
}

// if force refresh, do normal sleep, if normal config load, do exponential sleep
onErrorSleepTime = m_configNeedForceRefresh.get() ? m_configUtil.getOnErrorRetryInterval() :
m_loadConfigFailSchedulePolicy.fail();
}

}
String message = String.format(
"Load Apollo Config failed - appId: %s, cluster: %s, namespace: %s, url: %s",
appId, cluster, m_namespace, url);
throw new ApolloConfigException(message, exception);
}

RemoteConfigLongPollService

后期主要以此Service进行长轮训为主

RemoteConfigRepository构造函数中含有的scheduleLongPollingRefresh

1
2
3
private void scheduleLongPollingRefresh() {
remoteConfigLongPollService.submit(m_namespace, this);
}

开启了远程配置长轮训服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public boolean submit(String namespace, RemoteConfigRepository remoteConfigRepository) {
boolean added = m_longPollNamespaces.put(namespace, remoteConfigRepository);
m_notifications.putIfAbsent(namespace, INIT_NOTIFICATION_ID);
if (!m_longPollStarted.get()) {
startLongPolling();
}
return added;
}


......
private void startLongPolling() {
if (!m_longPollStarted.compareAndSet(false, true)) {
//already started
return;
}
try {
final String appId = m_configUtil.getAppId();
final String cluster = m_configUtil.getCluster();
final String dataCenter = m_configUtil.getDataCenter();
final long longPollingInitialDelayInMills = m_configUtil.getLongPollingInitialDelayInMills();
m_longPollingService.submit(new Runnable() {
@Override
public void run() {
if (longPollingInitialDelayInMills > 0) {
try {
logger.debug("Long polling will start in {} ms.", longPollingInitialDelayInMills);
TimeUnit.MILLISECONDS.sleep(longPollingInitialDelayInMills);
} catch (InterruptedException e) {
//ignore
}
}
doLongPollingRefresh(appId, cluster, dataCenter);
}
});
} catch (Throwable ex) {
m_longPollStarted.set(false);
ApolloConfigException exception =
new ApolloConfigException("Schedule long polling refresh failed", ex);
Tracer.logError(exception);
logger.warn(ExceptionUtil.getDetailMessage(exception));
}
}

长轮训一旦接收到notification变更通知,则将监听器通知变更,RemoteConfigRepository得到通知进行变更,然后通知LocalFileConfigRepository去变更,再通知到DefaultConfig去变更相关数据