当前位置: 首页>后端>正文

阿里天池怎么用gpu 阿里天池比赛

第五届阿里天池中间件比赛经历分享

本文记录了作者与队友们参加2019年第五届阿里天池中间件的经历。初赛排名175/4000+队伍,幸运进入决赛。虽然最终方案比较简单,但是过程很是曲折。最后通过高分选手开源的代码,总结下不足与经验。决赛正在进行中,本文会不断更新。

初赛 自适应负载均衡算法

题目

三个provider,200:450:650,一个consumer。provider内部通过信号量模拟处理能力的动态变化。信号量数小于线程池大小,当超过信号量数量的请求到达时,wait。当超时时,consumer会接收到异常,认定此次请求失败。最终的评定标准是,60秒内的成功请求数最大。

思路

  1. 最开始想根据cpu、waiting线程数 回调,成绩不理想。原因:拿到回调时已发生变化
  2. 后来直接根据线程的比例进行加权随机,成绩小幅提升,但由于是随机的,还是有可能造成某个producer收到过多请求,导致请求积压甚至超时,使得tps不高。
  3. 捕获请求超时的Exception。当发往某个provider的请求出现了超时说明该provider已经撑慢了,所以接下来进行加权随机时,将该provider剔除。等到一定时间之后(50ms),才能参加随机。但是这个方案的成绩几乎没有提升。经我们分析,发现Exception的出现纪律并不多,导致效果不是很好。而且在50ms的退让期间内,其他的provider也可能被撑爆。
  4. 最后做了个AOP,可以动态的变化权重,此方案也是最终的最高分,175名。一开始按照200:450:600的权重随机分配,当分配给某个provider后,将它的权重减一。当收到该provider的回复后,权重再加一。这样就权重就可以动态的变化:在某个请求发送后还未处理完成,也就是consumer收到回复前,该producer的权重就会缩小。该方案的压测日志可以看出,每个请求几乎都能得到回复,不会超时。也就是说,provider的处理能力还没有达到极限,瓶颈在consumer这里。
  5. 后来想了一个优化方案:既然consumer出现了瓶颈,我们是否能将计算权重的工作分散到provider上,然producer自己统计收到了多少请求,完成了多少请求,两数之差就是处理中的请求,然后让provider通过回调发送给consumer。但是这种方案的分数却急剧下降。通过日志的调试,发现问题是其实跟第一个方案是差不多的:当consumer收到回调时,provider的统计值以及产生了很大变化。导致consumer收到后把它当做权重来计算后,会产生很大误差。
    最后由于截止日期临近,成就就停留在了126.6w分,175名。但是也成功进入了决赛圈(前200/4000+队伍)。

大佬们的方案

初赛截止后,有部分选手开源了自己的代码。我们学习了一下,其实我们缺少最终要的一点,也是高分的队伍关注的一点就是如何感知provider的处理能力动态变化:
https://zhuanlan.zhihu.com/p/71394061 https://mp.weixin.qq.com/s/hGTBRfdivxFCqJ-c4gPVaw
总结一下:

  1. 我们本应该想到却没有想到的就是使用一个令牌队列,提前按照线程数比例填满队列。就不用每次再进行权重的计算,而是直接从队列中取令牌,然后直接发往对应的provider,当受到回复后归还令牌。这样就大大减少了计算权重的时间。而且请求线程数总共1024,所以队列中三种令牌之和应该就是1024,这样每个provider就绝对不会撑爆。这种方案,成绩能打到127-128w,进入前100名。不过这也只是个小优化,并没有解决感知provider处理能力的问题。
  2. 如何捕捉provider处理能力变化:
    方案一:https://mp.weixin.qq.com/s/hGTBRfdivxFCqJ-c4gPVaw
    写Scala的老王借鉴TCP拥塞窗口的概念,将令牌和RTT存入跳表,跳表会根据RTT排序,此时的问题是:由于每次只从队列头部取令牌,所以当某个provider产生了一个大RTT,该provider的令牌就会排到后面,很难被取到(因为小RTT的令牌的不断地放回到它的前面),相当于权重下降后很难回复。所以改进措施是:当收到一个RTT小于3ms的回复时,将该provider提权,即从跳表的尾部抽出一个令牌,重新放到跳表中间位置。该方案能够达到130W分,最终老王获得了初赛第六。
    方案二:https://zhuanlan.zhihu.com/p/71394061

基于以下思路:

  • CPU负载与线程数存在着某种联系:[1,2,3]与[200,450,650]。即线程数与CPU核心数量的比例大致相同。
  • CPU负载与请求数存在一个正比关系。不管几核的CPU,只要没有请求,CPU负载都为零,这个很符合逻辑是不是;而对于处理相同的请求数,理论上讲CPU的核心越大,负载越小。但反过来讲,当CPU负载差不多的时候,双核的CPU所处理的请求数是不是就必然是单核CPU的两倍呢?不一定。

采用如下公式:

阿里天池怎么用gpu 阿里天池比赛,阿里天池怎么用gpu 阿里天池比赛_中间件,第1张

阿里天池怎么用gpu 阿里天池比赛,阿里天池怎么用gpu 阿里天池比赛_中间件_02,第2张

其中log(最大线程数)是CPU核心数的变种计法。由于log(1)=0,所以就借用了最大线程数来近似替代CPU核心数。

该方案分数能到128W+

决赛 消息持久化存储引擎

最终获得决赛83/200名

题目

实现一个进程内消息持久化存储引擎,要求包含以下功能:

A. 查询一定时间窗口内的消息 B. 对一定时间窗口内的消息属性某个字段求平均,以及求和

例子:t表示时间,时间窗口(1000, 1002)表示: t>1000 & t<1002

消息内容简化成两个字段,一个是业务字段a(整数),一个是时间戳(long)。

1800内接收20亿条,每条50B,总共100G,每秒大概100W条,50M

SSD读写300M/s

思路

初版
  • 分片:因为内存保存不下全部Msg,需要分片存储和读取。参考kafka的时间戳索引概念,按照时间戳分片&落盘,落盘文件名即为分片index。比如插入ts=0x 0111时,如果分区个数为4,则index=ts>>2 = 01
  • 当某一个分片满了之后先不落盘,因为有可能出现数据晚到,当晚到的数据到达是,对应的分片如果已经落盘了,还需要重新读出来、放入该分片中,再落盘(当然也可以直接追加到文件尾部,但是还不如一次性把文件写好效率高)。所以记录一个waterMark,这里代表已经落盘的shardId,当最新的分片Id超过waterMark2个时,再落盘waterMark++;
  • 分片内为了保证顺序,使用索引,可以用跳表存储Msg(顺序插入时TreeSet红黑树旋转开销大)。查询时,定位文件分片+文件内部二分
  • t可以相同,如何存储a,body?,所以每个t要对应一个List 和 List
  • 底层存储:
  • t,a,body可以分别存储到不同的文件,在查询avg时,不需要查询body
  • t,a,body分片的粒度可以不同,考虑:t作为索引,是应该尽量保存到内存?还是可以缩小粒度,但是几乎不存储索引,而是让更多的body保存到内存,可以减少IO?
  • nio 落盘
  • 面向块,而不是字节
  • 通道(Channel),一种新的原生I/O抽象概念
  • 内存映射的文件
  • 通过Selector多路复用、非阻塞的I/O能力实现可伸缩的服务器架构
  • 序列化:Java自带序列化性能差、冗余多。考虑直接保存原始数据而不是对象。
  • 查询
  • lru:LRU或LFU,但是发现会发生大量minorGC,原因是生成了过多对象。所以就题目来讲,不需要使用对象封装消息,直接对t(long)或者body(byte数组)进行操作。
  • 为何何最终采用LRU: LFU淘汰一定时期内被访问次数最少的页, 但是比赛的getT是平均的,最近访问的最少,反而后面更有可能被访问;LRU淘汰最近最少使用的页,不考虑访问次数

代码

消息写入:

// 保存A
int shardId = getShardAId(msgT);
NavigableMap<Long, List<Long>> targetShardA;
// 当分片还未创建,则生成分片
if (!ShardAHasCreated.get(shardId)) {
    ConcurrentSkipListMap<Long, List<Long>> newShard = new ...
    ShardAHasNotSaved.put(shardId, newShard);
// t-->t时刻的所有a , BlockingQueue<NavigableMap<Long, List<Long>>>,  SaveAJob线程会负责将该queue落盘
    shardsA2SaveQueue.put(newShard);
    ShardAHasCreated.set(shardId);
}
targetShardA = ShardAHasNotSaved.get(shardId);
if (targetShardA == null) {// 数据晚到,但是分片早已落盘,则应当load
}

List<Long> arrA = targetShardA.computeIfAbsent(msgT, t -> new ArrayList<>());
arrA.add(message.getA());

// 保存Body
shardId = getShardBodyId(msgT);
NavigableMap<Long, List<byte[]>> targetShardBody;
if (!ShardBoydHasCreated.get(shardId)) {
    ConcurrentSkipListMap<Long, List<byte[]>> newShard = new ..
    ShardBodyHasNotSaved.put(shardId, newShard);
    shardsBody2SaveQueue.put(newShard);
    ShardBoydHasCreated.set(shardId);
}
targetShardBody = ShardBodyHasNotSaved.get(shardId);
if (targetShardBody == null){// 数据晚到,但是分片早已落盘,则应当load
}
List<byte[]> arrBody = targetShardBody.computeIfAbsent(msgT, t -> new ArrayList<>());
arrBody.add(message.getBody());

其中,获取分片的方法如下

private static int getShardAId(long t) {
    //取t的高 SHARD_A_SHIFT 位,作为分片的index
   return (int) (t >> SHARD_A_SHIFT);
}

private static long SHARD_A_NUM = 1L << 46;
private static int SHARD_A_SHIFT = 64 - (int) (Math.log(SHARD_A_NUM) / Math.log(2D));  

private static int getShardBodyId(long t) {    
    return (int) (t >> SHARD_BODY_SHIFT);
}  

private static long SHARD_BODY_NUM = 1L << 46;
private static int SHARD_BODY_SHIFT = 64 - (int) (Math.log(SHARD_BODY_NUM) / Math.log(2D));

消息落盘

private static final int SAVE_WAITING_INTERVAL_SHARD_NUM = 2;// 当第i+n个shard创建后,才落盘第i个分区
private static class SaveAJob implements Runnable {
    public static final String SHARD_A_FILE_PREFIX = SAVE_PATH + "A";
    private static final BitSet shardHasSaved = new BitSet();

    public void run() {
        while (true) {
                StringBuilder sb = new StringBuilder();
                NavigableMap<Long, List<Long>> shard = shardsA2SaveQueue.take();
                while(shard.size()==0)
                    Thread.sleep(50);
                int shard2SaveId = getShardAId(shard.firstKey());
                while (shard2SaveId > getShardAId(maxT) - SAVE_WAITING_INTERVAL_SHARD_NUM)
                    Thread.sleep(50);
                saveShard(ShardAHasNotSaved.get(shard2SaveId),
                          sb.append(SHARD_A_FILE_PREFIX).append(shard2SaveId).toString());
                shardHasSaved.set(shard2SaveId);
                ShardAHasNotSaved.remove(shard2SaveId);
                sb.setLength(0);
        }
    }

    private void saveShard(NavigableMap<Long, List<Long>> shard, String fileName) throws IOException {
        Instant s = Instant.now();

        ByteBuffer buffer = ByteBuffer.allocateDirect(4096);
        FileChannel outFile = FileChannel.open(Paths.get(fileName), StandardOpenOption.APPEND,
                                               StandardOpenOption.CREATE);

        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(baos);
        oos.writeObject(shard);
        byte[] shardBytes = baos.toByteArray();
        for (int offset = 0; offset < shardBytes.length; offset += 4096) {
            buffer.put(shardBytes, offset, shardBytes.length - offset < 4096 ? shardBytes.length - offset : 4096);
            buffer.flip();
            outFile.write(buffer);
            buffer.clear();
        }
        outFile.close();
        long l = Duration.between(s, Instant.now()).toMillis();
}

数据查询

public List<Message> getMessage(long aMin, long aMax, long tMin, long tMax) {
    if (tMin < minT) tMin = minT;
    if (tMax > maxT) tMax = maxT;

    List<Message> res =
            LongStream.rangeClosed(tMin, tMax)
                      .parallel()
                      .boxed()
                      .flatMap(t -> {
                          int shardId = getShardAId(t);
                          NavigableMap<Long, List<Long>> targetShard = loadShardA(shardId);
                          if (targetShard == null)
                              return null; //如果t不连续,有可能整个分片不存在
                          List<Long> longs = targetShard.get(t);
                          if (longs == null)
                              return null; //如果t不连续,有可能某个t不存在任何Msg

                          int shardIdB = getShardBodyId(t);
                          NavigableMap<Long, List<byte[]>> targetBodyShard = loadShardBody(shardIdB);
                          List<byte[]> bytes = targetBodyShard.get(t);
                          return IntStream.range(0, longs.size())
                                          .mapToObj(i -> new Message(longs.get(i), t, bytes.get(i)));
                      })
                      .filter(msg -> msg.getA() >= aMin && msg.getA() <= aMax)
                      .collect(toList());

    return res;
}
private static NavigableMap<Long, List<Long>> loadShardA(int shardId) {
        shardACount.increment();
        if (!SaveAJob.shardHasSaved.get(shardId)) {//还未落盘
            return ShardAHasNotSaved.get(shardId);
        }
        return cache4A.get(shardId);
}

private static final LinkedHashLRUCache<Integer, NavigableMap<Long, List<Long>>> cache4A =
        new LinkedHashLRUCache<>(SHARD_A_CACHE_SIZE)             
        {
            @Override
            protected NavigableMap<Long, List<Long>> load(Integer shardId) {
                NavigableMap<Long, List<Long>> targetShard = null;
                targetShard = doLoadAShard(SaveAJob.SHARD_A_FILE_PREFIX + shardId);
                return targetShard;
            }
        };

private static NavigableMap<Long, List<Long>> doLoadAShard(String fileName) throws IOException, ClassNotFoundException {
    NavigableMap<Long, List<Long>> navigableMap;
    FileChannel fileChannel = FileChannel.open(Paths.get(fileName), StandardOpenOption.READ);
    ByteBuffer buffer = ByteBuffer.allocate(4096);
    byte[] src = new byte[(int) fileChannel.size()];
    int index = 0;
    while (fileChannel.read(buffer) != -1) {
        buffer.flip();
        byte[] bytes = new byte[buffer.remaining()];
        buffer.get(bytes);
        for (int i = 0; i < bytes.length; i++) {
            src[index++] = bytes[i];
        }
        buffer.clear();
    }
    ByteArrayInputStream bais = new ByteArrayInputStream(src);
    ObjectInputStream ois = new ObjectInputStream(bais);
    navigableMap = (NavigableMap<Long, List<Long>>) ois.readObject();
    fileChannel.close();
    return navigableMap;
}
二版

学习kafka: 不缓存,直接落盘,顺序写,

  • 发现效果一般,同步落盘还是异步落盘?

后期使用索引、学习ssd特性、读kafka源码?

最终获得决赛83/200名

总结与待优化:

  • 插入

分线程存储不同分片&文件,则不需要做排序。搜索时直接合并即可,但要读取多个文件。
参考kafka的索引算法,分为baseIndex和relativeIndex,根据前者定位分片文件,根据后者直接定位到分片文件内的偏移量

  • 存储

存储时做压缩
异步落盘

  • 查询

根据比赛测试程序阶段,在批量读取的时候,提前批量message对象



https://www.xamrdz.com/backend/3h91944670.html

相关文章: