第五届阿里天池中间件比赛经历分享
本文记录了作者与队友们参加2019年第五届阿里天池中间件的经历。初赛排名175/4000+队伍,幸运进入决赛。虽然最终方案比较简单,但是过程很是曲折。最后通过高分选手开源的代码,总结下不足与经验。决赛正在进行中,本文会不断更新。
初赛 自适应负载均衡算法
题目
三个provider,200:450:650,一个consumer。provider内部通过信号量模拟处理能力的动态变化。信号量数小于线程池大小,当超过信号量数量的请求到达时,wait。当超时时,consumer会接收到异常,认定此次请求失败。最终的评定标准是,60秒内的成功请求数最大。
思路
- 最开始想根据cpu、waiting线程数 回调,成绩不理想。原因:拿到回调时已发生变化
- 后来直接根据线程的比例进行加权随机,成绩小幅提升,但由于是随机的,还是有可能造成某个producer收到过多请求,导致请求积压甚至超时,使得tps不高。
- 捕获请求超时的Exception。当发往某个provider的请求出现了超时说明该provider已经撑慢了,所以接下来进行加权随机时,将该provider剔除。等到一定时间之后(50ms),才能参加随机。但是这个方案的成绩几乎没有提升。经我们分析,发现Exception的出现纪律并不多,导致效果不是很好。而且在50ms的退让期间内,其他的provider也可能被撑爆。
- 最后做了个AOP,可以动态的变化权重,此方案也是最终的最高分,175名。一开始按照200:450:600的权重随机分配,当分配给某个provider后,将它的权重减一。当收到该provider的回复后,权重再加一。这样就权重就可以动态的变化:在某个请求发送后还未处理完成,也就是consumer收到回复前,该producer的权重就会缩小。该方案的压测日志可以看出,每个请求几乎都能得到回复,不会超时。也就是说,provider的处理能力还没有达到极限,瓶颈在consumer这里。
- 后来想了一个优化方案:既然consumer出现了瓶颈,我们是否能将计算权重的工作分散到provider上,然producer自己统计收到了多少请求,完成了多少请求,两数之差就是处理中的请求,然后让provider通过回调发送给consumer。但是这种方案的分数却急剧下降。通过日志的调试,发现问题是其实跟第一个方案是差不多的:当consumer收到回调时,provider的统计值以及产生了很大变化。导致consumer收到后把它当做权重来计算后,会产生很大误差。
最后由于截止日期临近,成就就停留在了126.6w分,175名。但是也成功进入了决赛圈(前200/4000+队伍)。
大佬们的方案
初赛截止后,有部分选手开源了自己的代码。我们学习了一下,其实我们缺少最终要的一点,也是高分的队伍关注的一点就是如何感知provider的处理能力动态变化:
https://zhuanlan.zhihu.com/p/71394061 https://mp.weixin.qq.com/s/hGTBRfdivxFCqJ-c4gPVaw
总结一下:
- 我们本应该想到却没有想到的就是使用一个令牌队列,提前按照线程数比例填满队列。就不用每次再进行权重的计算,而是直接从队列中取令牌,然后直接发往对应的provider,当受到回复后归还令牌。这样就大大减少了计算权重的时间。而且请求线程数总共1024,所以队列中三种令牌之和应该就是1024,这样每个provider就绝对不会撑爆。这种方案,成绩能打到127-128w,进入前100名。不过这也只是个小优化,并没有解决感知provider处理能力的问题。
- 如何捕捉provider处理能力变化:
方案一:https://mp.weixin.qq.com/s/hGTBRfdivxFCqJ-c4gPVaw写Scala的老王
借鉴TCP拥塞窗口的概念,将令牌和RTT存入跳表,跳表会根据RTT排序,此时的问题是:由于每次只从队列头部取令牌,所以当某个provider产生了一个大RTT,该provider的令牌就会排到后面,很难被取到(因为小RTT的令牌的不断地放回到它的前面),相当于权重下降后很难回复。所以改进措施是:当收到一个RTT小于3ms的回复时,将该provider提权,即从跳表的尾部抽出一个令牌,重新放到跳表中间位置。该方案能够达到130W分,最终老王获得了初赛第六。
方案二:https://zhuanlan.zhihu.com/p/71394061
基于以下思路:
- CPU负载与线程数存在着某种联系:[1,2,3]与[200,450,650]。即线程数与CPU核心数量的比例大致相同。
- CPU负载与请求数存在一个正比关系。不管几核的CPU,只要没有请求,CPU负载都为零,这个很符合逻辑是不是;而对于处理相同的请求数,理论上讲CPU的核心越大,负载越小。但反过来讲,当CPU负载差不多的时候,双核的CPU所处理的请求数是不是就必然是单核CPU的两倍呢?不一定。
采用如下公式:
其中log(最大线程数)是CPU核心数的变种计法。由于log(1)=0,所以就借用了最大线程数来近似替代CPU核心数。
该方案分数能到128W+
决赛 消息持久化存储引擎
最终获得决赛83/200名
题目
实现一个进程内消息持久化存储引擎,要求包含以下功能:
A. 查询一定时间窗口内的消息 B. 对一定时间窗口内的消息属性某个字段求平均,以及求和
例子:t表示时间,时间窗口(1000, 1002)表示: t>1000 & t<1002
消息内容简化成两个字段,一个是业务字段a(整数),一个是时间戳(long)。
1800内接收20亿条,每条50B,总共100G,每秒大概100W条,50M
SSD读写300M/s
思路
初版
- 分片:因为内存保存不下全部Msg,需要分片存储和读取。参考kafka的时间戳索引概念,按照时间戳分片&落盘,落盘文件名即为分片index。比如插入ts=0x 0111时,如果分区个数为4,则index=ts>>2 = 01
- 当某一个分片满了之后先不落盘,因为有可能出现数据晚到,当晚到的数据到达是,对应的分片如果已经落盘了,还需要重新读出来、放入该分片中,再落盘(当然也可以直接追加到文件尾部,但是还不如一次性把文件写好效率高)。所以记录一个waterMark,这里代表已经落盘的shardId,当最新的分片Id超过waterMark2个时,再落盘waterMark++;
- 分片内为了保证顺序,使用索引,可以用跳表存储Msg(顺序插入时TreeSet红黑树旋转开销大)。查询时,定位文件分片+文件内部二分
- t可以相同,如何存储a,body?,所以每个t要对应一个List 和 List
- 底层存储:
- t,a,body可以分别存储到不同的文件,在查询avg时,不需要查询body
- t,a,body分片的粒度可以不同,考虑:t作为索引,是应该尽量保存到内存?还是可以缩小粒度,但是几乎不存储索引,而是让更多的body保存到内存,可以减少IO?
- nio 落盘
- 面向块,而不是字节
- 通道(Channel),一种新的原生I/O抽象概念
- 内存映射的文件
- 通过Selector多路复用、非阻塞的I/O能力实现可伸缩的服务器架构
- 序列化:Java自带序列化性能差、冗余多。考虑直接保存原始数据而不是对象。
- 查询
- lru:LRU或LFU,但是发现会发生大量minorGC,原因是生成了过多对象。所以就题目来讲,不需要使用对象封装消息,直接对t(long)或者body(byte数组)进行操作。
- 为何何最终采用LRU: LFU淘汰一定时期内被访问次数最少的页, 但是比赛的getT是平均的,最近访问的最少,反而后面更有可能被访问;LRU淘汰最近最少使用的页,不考虑访问次数
代码
消息写入:
// 保存A
int shardId = getShardAId(msgT);
NavigableMap<Long, List<Long>> targetShardA;
// 当分片还未创建,则生成分片
if (!ShardAHasCreated.get(shardId)) {
ConcurrentSkipListMap<Long, List<Long>> newShard = new ...
ShardAHasNotSaved.put(shardId, newShard);
// t-->t时刻的所有a , BlockingQueue<NavigableMap<Long, List<Long>>>, SaveAJob线程会负责将该queue落盘
shardsA2SaveQueue.put(newShard);
ShardAHasCreated.set(shardId);
}
targetShardA = ShardAHasNotSaved.get(shardId);
if (targetShardA == null) {// 数据晚到,但是分片早已落盘,则应当load
}
List<Long> arrA = targetShardA.computeIfAbsent(msgT, t -> new ArrayList<>());
arrA.add(message.getA());
// 保存Body
shardId = getShardBodyId(msgT);
NavigableMap<Long, List<byte[]>> targetShardBody;
if (!ShardBoydHasCreated.get(shardId)) {
ConcurrentSkipListMap<Long, List<byte[]>> newShard = new ..
ShardBodyHasNotSaved.put(shardId, newShard);
shardsBody2SaveQueue.put(newShard);
ShardBoydHasCreated.set(shardId);
}
targetShardBody = ShardBodyHasNotSaved.get(shardId);
if (targetShardBody == null){// 数据晚到,但是分片早已落盘,则应当load
}
List<byte[]> arrBody = targetShardBody.computeIfAbsent(msgT, t -> new ArrayList<>());
arrBody.add(message.getBody());
其中,获取分片的方法如下
private static int getShardAId(long t) {
//取t的高 SHARD_A_SHIFT 位,作为分片的index
return (int) (t >> SHARD_A_SHIFT);
}
private static long SHARD_A_NUM = 1L << 46;
private static int SHARD_A_SHIFT = 64 - (int) (Math.log(SHARD_A_NUM) / Math.log(2D));
private static int getShardBodyId(long t) {
return (int) (t >> SHARD_BODY_SHIFT);
}
private static long SHARD_BODY_NUM = 1L << 46;
private static int SHARD_BODY_SHIFT = 64 - (int) (Math.log(SHARD_BODY_NUM) / Math.log(2D));
消息落盘
private static final int SAVE_WAITING_INTERVAL_SHARD_NUM = 2;// 当第i+n个shard创建后,才落盘第i个分区
private static class SaveAJob implements Runnable {
public static final String SHARD_A_FILE_PREFIX = SAVE_PATH + "A";
private static final BitSet shardHasSaved = new BitSet();
public void run() {
while (true) {
StringBuilder sb = new StringBuilder();
NavigableMap<Long, List<Long>> shard = shardsA2SaveQueue.take();
while(shard.size()==0)
Thread.sleep(50);
int shard2SaveId = getShardAId(shard.firstKey());
while (shard2SaveId > getShardAId(maxT) - SAVE_WAITING_INTERVAL_SHARD_NUM)
Thread.sleep(50);
saveShard(ShardAHasNotSaved.get(shard2SaveId),
sb.append(SHARD_A_FILE_PREFIX).append(shard2SaveId).toString());
shardHasSaved.set(shard2SaveId);
ShardAHasNotSaved.remove(shard2SaveId);
sb.setLength(0);
}
}
private void saveShard(NavigableMap<Long, List<Long>> shard, String fileName) throws IOException {
Instant s = Instant.now();
ByteBuffer buffer = ByteBuffer.allocateDirect(4096);
FileChannel outFile = FileChannel.open(Paths.get(fileName), StandardOpenOption.APPEND,
StandardOpenOption.CREATE);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(shard);
byte[] shardBytes = baos.toByteArray();
for (int offset = 0; offset < shardBytes.length; offset += 4096) {
buffer.put(shardBytes, offset, shardBytes.length - offset < 4096 ? shardBytes.length - offset : 4096);
buffer.flip();
outFile.write(buffer);
buffer.clear();
}
outFile.close();
long l = Duration.between(s, Instant.now()).toMillis();
}
数据查询
public List<Message> getMessage(long aMin, long aMax, long tMin, long tMax) {
if (tMin < minT) tMin = minT;
if (tMax > maxT) tMax = maxT;
List<Message> res =
LongStream.rangeClosed(tMin, tMax)
.parallel()
.boxed()
.flatMap(t -> {
int shardId = getShardAId(t);
NavigableMap<Long, List<Long>> targetShard = loadShardA(shardId);
if (targetShard == null)
return null; //如果t不连续,有可能整个分片不存在
List<Long> longs = targetShard.get(t);
if (longs == null)
return null; //如果t不连续,有可能某个t不存在任何Msg
int shardIdB = getShardBodyId(t);
NavigableMap<Long, List<byte[]>> targetBodyShard = loadShardBody(shardIdB);
List<byte[]> bytes = targetBodyShard.get(t);
return IntStream.range(0, longs.size())
.mapToObj(i -> new Message(longs.get(i), t, bytes.get(i)));
})
.filter(msg -> msg.getA() >= aMin && msg.getA() <= aMax)
.collect(toList());
return res;
}
private static NavigableMap<Long, List<Long>> loadShardA(int shardId) {
shardACount.increment();
if (!SaveAJob.shardHasSaved.get(shardId)) {//还未落盘
return ShardAHasNotSaved.get(shardId);
}
return cache4A.get(shardId);
}
private static final LinkedHashLRUCache<Integer, NavigableMap<Long, List<Long>>> cache4A =
new LinkedHashLRUCache<>(SHARD_A_CACHE_SIZE)
{
@Override
protected NavigableMap<Long, List<Long>> load(Integer shardId) {
NavigableMap<Long, List<Long>> targetShard = null;
targetShard = doLoadAShard(SaveAJob.SHARD_A_FILE_PREFIX + shardId);
return targetShard;
}
};
private static NavigableMap<Long, List<Long>> doLoadAShard(String fileName) throws IOException, ClassNotFoundException {
NavigableMap<Long, List<Long>> navigableMap;
FileChannel fileChannel = FileChannel.open(Paths.get(fileName), StandardOpenOption.READ);
ByteBuffer buffer = ByteBuffer.allocate(4096);
byte[] src = new byte[(int) fileChannel.size()];
int index = 0;
while (fileChannel.read(buffer) != -1) {
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
for (int i = 0; i < bytes.length; i++) {
src[index++] = bytes[i];
}
buffer.clear();
}
ByteArrayInputStream bais = new ByteArrayInputStream(src);
ObjectInputStream ois = new ObjectInputStream(bais);
navigableMap = (NavigableMap<Long, List<Long>>) ois.readObject();
fileChannel.close();
return navigableMap;
}
二版
学习kafka: 不缓存,直接落盘,顺序写,
- 发现效果一般,同步落盘还是异步落盘?
后期使用索引、学习ssd特性、读kafka源码?
最终获得决赛83/200名
总结与待优化:
- 插入
分线程存储不同分片&文件,则不需要做排序。搜索时直接合并即可,但要读取多个文件。
参考kafka的索引算法,分为baseIndex和relativeIndex,根据前者定位分片文件,根据后者直接定位到分片文件内的偏移量
- 存储
存储时做压缩
异步落盘
- 查询
根据比赛测试程序阶段,在批量读取的时候,提前批量message对象