Apollo组成
Config Service
提供配置的读取、推送等功能,服务对象是Apollo客户端
提供配置更新推送接口(基于Http long polling)
(1)服务端使用Spring DeferredResult实现异步化,从而大大增加长连接数量
(2)目前使用的tomcat embed默认配置是最多10000个连接(可以调整),使用了4C8G的虚拟机实测可以支撑10000个连接,所以满足需求(一个应用实例只会发起一个长连接)。Admin Service
提供配置的修改、发布等功能,服务对象是Apollo Portal(管理界面)Meta Server
Portal通过域名访问Meta Server获取Admin Service服务列表(IP+Port)
Client通过域名访问Meta Server获取Config Service服务列表(IP+Port)
Meta Server从Eureka获取Config Service和Admin Service的服务信息,相当于是一个Eureka Client
增设一个Meta Server的角色主要是为了封装服务发现的细节,对Portal和Client而言,永远通过一个Http接口获取Admin Service和Config Service的服务信息,而不需要关心背后实际的服务注册和发现组件
Meta Server只是一个逻辑角色,在部署时和Config Service是在一个JVM进程中的,所以IP、端口和Config Service一致Eureka
基于Eureka和Spring Cloud Netflix提供服务注册和发现
Config Service和Admin Service会向Eureka注册服务,并保持心跳
为了简单起见,目前Eureka在部署时和Config Service是在一个JVM进程中的(通过Spring Cloud Netflix)Portal
提供Web界面供用户管理配置
通过Meta Server获取Admin Service服务列表(IP+Port),通过IP+Port访问服务
在Portal侧做load balance、错误重试Client
Apollo提供的客户端程序,为应用提供配置获取、实时更新等功能
通过Meta Server获取Config Service服务列表(IP+Port),通过IP+Port访问服务
在Client侧做load balance、错误重试
Apollo运行
- 用户在配置中心对配置进行修改并发布
- 配置中心通知Apollo客户端有配置更新
- Apollo客户端从配置中心拉取最新的配置、更新本地配置并通知到应用
服务端设计
-
配置发布后的实时推送设计
在配置中心中,一个重要的功能就是配置发布后实时推送到客户端。下面我们简要看一下这块是怎么设计实现的。
配置发布过程:
(1)用户在Portal操作配置发布
(2)Portal调用Admin Service的接口操作发布
(3)Admin Service发布配置后,发送ReleaseMessage给各个Config Service
(4)Config Service收到ReleaseMessage后,通知对应的客户端
- 发送ReleaseMessage的实现方式
Admin Service在配置发布后,需要通知所有的Config Service有配置发布,从而Config Service可以通知对应的客户端来拉取最新的配置。
从概念上来看,这是一个典型的消息使用场景,Admin Service作为producer发出消息,各个Config Service作为consumer消费消息。通过一个消息组件(Message Queue)就能很好的实现Admin Service和Config Service的解耦。
在实现上,考虑到Apollo的实际使用场景,以及为了尽可能减少外部依赖,我们没有采用外部的消息中间件,而是通过数据库实现了一个简单的消息队列。
实现方式如下:
- Admin Service在配置发布后会往ReleaseMessage表插入一条消息记录,消息内容就是配置发布的AppId+Cluster+Namespace,参见DatabaseMessageSender
- Config Service有一个线程会每秒扫描一次ReleaseMessage表,看看是否有新的消息记录,参见ReleaseMessageScanner
- Config Service如果发现有新的消息记录,那么就会通知到所有的消息监听器(ReleaseMessageListener),如NotificationControllerV2,消息监听器的注册过程参见ConfigServiceAutoConfiguration
- NotificationControllerV2得到配置发布的AppId+Cluster+Namespace后,会通知对应的客户端
ReleaseMessageScanner:
public class ReleaseMessageScanner implements InitializingBean {
@Override
public void afterPropertiesSet() throws Exception {
databaseScanInterval = bizConfig.releaseMessageScanIntervalInMilli();
maxIdScanned = loadLargestMessageId();
executorService.scheduleWithFixedDelay((Runnable) () -> {
Transaction transaction = Tracer.newTransaction("Apollo.ReleaseMessageScanner", "scanMessage");
try {
scanMessages();
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
transaction.setStatus(ex);
logger.error("Scan and send message failed", ex);
} finally {
transaction.complete();
}
}, databaseScanInterval, databaseScanInterval, TimeUnit.MILLISECONDS);
}
private void scanMessages() {
boolean hasMoreMessages = true;
while (hasMoreMessages && !Thread.currentThread().isInterrupted()) {
hasMoreMessages = scanAndSendMessages();
}
}
/**
* scan messages and send
*
* @return whether there are more messages
*/
private boolean scanAndSendMessages() {
//current batch is 500
List<ReleaseMessage> releaseMessages =
releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(maxIdScanned);
if (CollectionUtils.isEmpty(releaseMessages)) {
return false;
}
// 触发通知监听器处理“发布配置“消息
fireMessageScanned(releaseMessages);
int messageScanned = releaseMessages.size();
maxIdScanned = releaseMessages.get(messageScanned - 1).getId();
return messageScanned == 500;
}
public void addMessageListener(ReleaseMessageListener listener) {
if (!listeners.contains(listener)) {
listeners.add(listener);
}
}
private void fireMessageScanned(List<ReleaseMessage> messages) {
for (ReleaseMessage message : messages) {
for (ReleaseMessageListener listener : listeners) {
try {
listener.handleMessage(message, Topics.APOLLO_RELEASE_TOPIC);
} catch (Throwable ex) {
Tracer.logError(ex);
logger.error("Failed to invoke message listener {}", listener.getClass(), ex);
}
}
}
}
}
ConfigServiceAutoConfiguration:
public ReleaseMessageScanner releaseMessageScanner() {
ReleaseMessageScanner releaseMessageScanner = new ReleaseMessageScanner();
//0. handle release message cache
releaseMessageScanner.addMessageListener(releaseMessageServiceWithCache);
//1. handle gray release rule
releaseMessageScanner.addMessageListener(grayReleaseRulesHolder);
//2. handle server cache
releaseMessageScanner.addMessageListener(configService);
releaseMessageScanner.addMessageListener(configFileController);
//3. notify clients,把notificationControllerV2到releaseMessageScanner消息监听器
releaseMessageScanner.addMessageListener(notificationControllerV2);
releaseMessageScanner.addMessageListener(notificationController);
return releaseMessageScanner;
}
@RestController
@RequestMapping("/notifications/v2")
public class NotificationControllerV2 implements ReleaseMessageListener {
@GetMapping
public DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> pollNotification(
@RequestParam(value = "appId") String appId,
@RequestParam(value = "cluster") String cluster,
@RequestParam(value = "notifications") String notificationsAsString,
@RequestParam(value = "dataCenter", required = false) String dataCenter,
@RequestParam(value = "ip", required = false) String clientIp) {
List<ApolloConfigNotification> notifications = null;
try {
notifications =
gson.fromJson(notificationsAsString, notificationsTypeReference);
} catch (Throwable ex) {
Tracer.logError(ex);
}
if (CollectionUtils.isEmpty(notifications)) {
throw new BadRequestException("Invalid format of notifications: " + notificationsAsString);
}
DeferredResultWrapper deferredResultWrapper = new DeferredResultWrapper(bizConfig.longPollingTimeoutInMilli());
Set<String> namespaces = Sets.newHashSet();
Map<String, Long> clientSideNotifications = Maps.newHashMap();
Map<String, ApolloConfigNotification> filteredNotifications = filterNotifications(appId, notifications);
for (Map.Entry<String, ApolloConfigNotification> notificationEntry : filteredNotifications.entrySet()) {
String normalizedNamespace = notificationEntry.getKey();
ApolloConfigNotification notification = notificationEntry.getValue();
namespaces.add(normalizedNamespace);
clientSideNotifications.put(normalizedNamespace, notification.getNotificationId());
if (!Objects.equals(notification.getNamespaceName(), normalizedNamespace)) {
deferredResultWrapper.recordNamespaceNameNormalizedResult(notification.getNamespaceName(), normalizedNamespace);
}
}
if (CollectionUtils.isEmpty(namespaces)) {
throw new BadRequestException("Invalid format of notifications: " + notificationsAsString);
}
Multimap<String, String> watchedKeysMap =
watchKeysUtil.assembleAllWatchKeys(appId, cluster, namespaces, dataCenter);
Set<String> watchedKeys = Sets.newHashSet(watchedKeysMap.values());
/**
* 1、set deferredResult before the check, for avoid more waiting
* If the check before setting deferredResult,it may receive a notification the next time
* when method handleMessage is executed between check and set deferredResult.
*/
deferredResultWrapper
.onTimeout(() -> logWatchedKeys(watchedKeys, "Apollo.LongPoll.TimeOutKeys"));
deferredResultWrapper.onCompletion(() -> {
//unregister all keys
for (String key : watchedKeys) {
deferredResults.remove(key, deferredResultWrapper);
}
logWatchedKeys(watchedKeys, "Apollo.LongPoll.CompletedKeys");
});
//register all keys
for (String key : watchedKeys) {
this.deferredResults.put(key, deferredResultWrapper);
}
logWatchedKeys(watchedKeys, "Apollo.LongPoll.RegisteredKeys");
logger.debug("Listening {} from appId: {}, cluster: {}, namespace: {}, datacenter: {}",
watchedKeys, appId, cluster, namespaces, dataCenter);
/**
* 2、check new release
*/
List<ReleaseMessage> latestReleaseMessages =
releaseMessageService.findLatestReleaseMessagesGroupByMessages(watchedKeys);
/**
* Manually close the entity manager.
* Since for async request, Spring won't do so until the request is finished,
* which is unacceptable since we are doing long polling - means the db connection would be hold
* for a very long time
*/
entityManagerUtil.closeEntityManager();
List<ApolloConfigNotification> newNotifications =
getApolloConfigNotifications(namespaces, clientSideNotifications, watchedKeysMap,
latestReleaseMessages);
if (!CollectionUtils.isEmpty(newNotifications)) {
deferredResultWrapper.setResult(newNotifications);
}
return deferredResultWrapper.getResult();
}
@Override
public void handleMessage(ReleaseMessage message, String channel) {
logger.info("message received - channel: {}, message: {}", channel, message);
String content = message.getMessage();
Tracer.logEvent("Apollo.LongPoll.Messages", content);
if (!Topics.APOLLO_RELEASE_TOPIC.equals(channel) || Strings.isNullOrEmpty(content)) {
return;
}
String changedNamespace = retrieveNamespaceFromReleaseMessage.apply(content);
if (Strings.isNullOrEmpty(changedNamespace)) {
logger.error("message format invalid - {}", content);
return;
}
if (!deferredResults.containsKey(content)) {
return;
}
//create a new list to avoid ConcurrentModificationException
List<DeferredResultWrapper> results = Lists.newArrayList(deferredResults.get(content));
ApolloConfigNotification configNotification = new ApolloConfigNotification(changedNamespace, message.getId());
configNotification.addMessage(content, message.getId());
//do async notification if too many clients
if (results.size() > bizConfig.releaseMessageNotificationBatch()) {
largeNotificationBatchExecutorService.submit(() -> {
logger.debug("Async notify {} clients for key {} with batch {}", results.size(), content,
bizConfig.releaseMessageNotificationBatch());
for (int i = 0; i < results.size(); i++) {
if (i > 0 && i % bizConfig.releaseMessageNotificationBatch() == 0) {
try {
TimeUnit.MILLISECONDS.sleep(bizConfig.releaseMessageNotificationBatchIntervalInMilli());
} catch (InterruptedException e) {
//ignore
}
}
logger.debug("Async notify {}", results.get(i));
// 设置NotificationControllerV2 HTTP接口/notifications/v2的返回结果DeferredResult
results.get(i).setResult(configNotification);
}
});
return;
}
logger.debug("Notify {} clients for key {}", results.size(), content);
for (DeferredResultWrapper result : results) {
// 设置NotificationControllerV2 HTTP接口/notifications/v2的返回结果DeferredResult
result.setResult(configNotification);
}
logger.debug("Notification completed");
}
}
- Config Service通知客户端的实现方式
NotificationControllerV2在得知有配置发布后是如何通知到客户端的呢?
实现方式如下:
(1)客户端会发起一个Http请求到Config Service的notifications/v2
接口,也就是NotificationControllerV2,参见RemoteConfigLongPollService
(2)NotificationControllerV2不会立即返回结果,而是通过Spring DeferredResult把请求挂起
(3)如果在60秒内没有该客户端关心的配置发布,那么会返回Http状态码304给客户端
(4)如果有该客户端关心的配置发布,NotificationControllerV2会调用DeferredResult的setResult方法,传入有配置变化的namespace信息,同时该请求会立即返回。客户端从返回的结果中获取到配置变化的namespace后,会立即请求Config Service获取该namespace的最新配置。
DeferredResult
当一个请求到达API接口,如果该API接口的return返回值是DeferredResult,在没有超时或者DeferredResult对象设置setResult时,接口不会返回,但是Servlet容器线程会结束,DeferredResult另起线程来进行结果处理(即这种操作提升了服务短时间的吞吐能力),并setResult,如此以来这个请求不会占用服务连接池太久,如果超时或设置setResult,接口会立即返回。
客户端设计
Apollo客户端的实现原理:
- 客户端和服务端保持了一个长连接,从而能第一时间获得配置更新的推送。(通过Http Long Polling实现)
- 客户端还会定时从Apollo配置中心服务端拉取应用的最新配置。
(1)这是一个fallback机制,为了防止推送机制失效导致配置不更新
(2)客户端定时拉取会上报本地版本,所以一般情况下,对于定时拉取的操作,服务端都会返回304 - Not Modified
(3)定时频率默认为每5分钟拉取一次,客户端也可以通过在运行时指定System Property: apollo.refreshInterval来覆盖,单位为分钟。 - 客户端从Apollo配置中心服务端获取到应用的最新配置后,会保存在内存中
- 客户端会把从服务端获取到的配置在本地文件系统缓存一份
在遇到服务不可用,或网络不通的时候,依然能从本地恢复配置 - 应用程序可以从Apollo客户端获取最新的配置、订阅配置更新通知
RemoteConfigRepository:
public RemoteConfigRepository(String namespace) {
m_namespace = namespace;
m_configCache = new AtomicReference<>();
m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
m_httpUtil = ApolloInjector.getInstance(HttpUtil.class);
m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class);
remoteConfigLongPollService = ApolloInjector.getInstance(RemoteConfigLongPollService.class);
m_longPollServiceDto = new AtomicReference<>();
m_remoteMessages = new AtomicReference<>();
m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS());
m_configNeedForceRefresh = new AtomicBoolean(true);
m_loadConfigFailSchedulePolicy = new ExponentialSchedulePolicy(m_configUtil.getOnErrorRetryInterval(),
m_configUtil.getOnErrorRetryInterval() * 8);
gson = new Gson();
this.trySync();
// 开启定时从服务端获取最新配置
this.schedulePeriodicRefresh();
// 调用RemoteConfigLongPollService开启长轮询
this.scheduleLongPollingRefresh();
}
private void schedulePeriodicRefresh() {
logger.debug("Schedule periodic refresh with interval: {} {}",
m_configUtil.getRefreshInterval(), m_configUtil.getRefreshIntervalTimeUnit());
m_executorService.scheduleAtFixedRate(
new Runnable() {
@Override
public void run() {
Tracer.logEvent("Apollo.ConfigService", String.format("periodicRefresh: %s", m_namespace));
logger.debug("refresh config for namespace: {}", m_namespace);
// 调用Config Service的HTTP接口 /configs获取最新配置
trySync();
Tracer.logEvent("Apollo.Client.Version", Apollo.VERSION);
}
}, m_configUtil.getRefreshInterval(), m_configUtil.getRefreshInterval(),
m_configUtil.getRefreshIntervalTimeUnit());
}
private void scheduleLongPollingRefresh() {
remoteConfigLongPollService.submit(m_namespace, this);
}
//接收长轮询的通知
public void onLongPollNotified(ServiceDTO longPollNotifiedServiceDto, ApolloNotificationMessages remoteMessages) {
m_longPollServiceDto.set(longPollNotifiedServiceDto);
m_remoteMessages.set(remoteMessages);
m_executorService.submit(new Runnable() {
@Override
public void run() {
m_configNeedForceRefresh.set(true);
trySync();
}
});
}
RemoteConfigLongPollService:
public boolean submit(String namespace, RemoteConfigRepository remoteConfigRepository) {
boolean added = m_longPollNamespaces.put(namespace, remoteConfigRepository);
m_notifications.putIfAbsent(namespace, INIT_NOTIFICATION_ID);
if (!m_longPollStarted.get()) {
startLongPolling();
}
return added;
}
private void startLongPolling() {
if (!m_longPollStarted.compareAndSet(false, true)) {
//already started
return;
}
try {
final String appId = m_configUtil.getAppId();
final String cluster = m_configUtil.getCluster();
final String dataCenter = m_configUtil.getDataCenter();
final String secret = m_configUtil.getAccessKeySecret();
final long longPollingInitialDelayInMills = m_configUtil.getLongPollingInitialDelayInMills();
m_longPollingService.submit(new Runnable() {
@Override
public void run() {
if (longPollingInitialDelayInMills > 0) {
try {
logger.debug("Long polling will start in {} ms.", longPollingInitialDelayInMills);
TimeUnit.MILLISECONDS.sleep(longPollingInitialDelayInMills);
} catch (InterruptedException e) {
//ignore
}
}
// 调用Config Service的HTTP接口 /notifications/v2获取有变更的配置
// 如果有变更的配置,通知RemoteConfigRepository拉取最新配置
doLongPollingRefresh(appId, cluster, dataCenter, secret);
}
});
} catch (Throwable ex) {
m_longPollStarted.set(false);
ApolloConfigException exception =
new ApolloConfigException("Schedule long polling refresh failed", ex);
Tracer.logError(exception);
logger.warn(ExceptionUtil.getDetailMessage(exception));
}
}