当前位置: 首页>后端>正文

跑批系统设计

需求分析

将大批量的数据,从一个地方,迁移到另外一个地方,如何处理


跑批系统设计,第1张
跑批系统

主要的涉及到的问题

  • 亿级数据怎么存
  • 怎么防止重复调度
  • 怎么做到负载均衡
  • 同一个节点,任务怎么并行
  • 如何动态调整并发度
  • 机器节点挂了怎么办

概要设计

数据存储

数据导出出来以后怎么存?一般是用文件存或者是数据库存。

数据库存储

对于mysql来说,单表不能太大,一亿行数据拆分成多个表,可以拆分十个表,每一个表大约一千万左右。
问题:

  • 遍历数据,对数据库压力巨大
  • 跑批数据需要删除,造成数据库的碎片问题

文件存储

采用文件系统存储,将文件拆分成多个,而且读取速度快,因为是磁盘顺序读取的

重复调度问题

调度系统我们一般采用比如xxl-job,通常都失败重试的功能,我们要做到访问重复调度的可能,这里可以设计一张表来存储当前任务,如何已经存,则抛弃掉。表的结构类似这样,还可以使用乐观锁更新状态。

跑批系统设计,第2张
防止重复调度

负载均衡

跑批系统怎么上集群,我们可以用MQ,把任务均匀的发布的队列上,在MQ的选型上,我们建议选择redis的list来实现,至于为什么?我们后面会说。


跑批系统设计,第3张
负载均衡

单节点任务并行

使用redis需要自己建一个Event Loop的线程,从redis队列中取任务,然后放入线程池中运行,因为我们使用redis队列缓冲,所以线程池的队列长度设置为0。
为什么选redis队列:
一般的分布式消息队列为了提高效率,会预先取很多消息放在本地,然后使用一个线程把消息放到线程池来处理,这就带来一个问题,如果你的任务太少了的话就会被一个客户端捞取很多任务,使用redis我们就不会有这个问题,因为Event Loop线程是任务进行完一个之后,才从redis里面再拿一个。

跑批系统设计,第4张
并行

动态调整并发

动态调整任务并发,有两个地方:
1,是任务调用远程接口的速度,这个速度控制可以使用Thread.sleep就可以了
2,是任务并发度控制,有多少个线程同时在跑任务,也就是控制线程池数量,我们修改动态修改线程数可能比较困难。另外一个思路是使用开源组件,比较guava的RateLimiter组件,如下图示。

跑批系统设计,第5张
调整并发数

失败重试逻辑

任务失败原因:
1,机器重启,导致任务都终止了
2,调用远程接口失败,超过次数,任务就会停止
3,机器资源不足,内存溢出了

解决上面的问题,两个步骤:记录进度和任务重试

  • 记录进度,在子任务执行过程中,需要一直刷新执行到任务哪一行了,同时更新进度的时间。
  • 分布式任务系统增加一个补偿任务,定时扫描所有还在执行中的子任务,如果发现任务长时间没有更新,那说明任务终止了。将任务状态改成待执行,在放入消息队列就可以了。


    跑批系统设计,第6张
    失败重试逻辑

详情设计

跑批任务表设计

我们讲大文件切割,然后得到若干个相对小的的文件。在linux中切割文件命令如下:

  1. 按行数切割split -l [行数] [输入文件] [输出文件前缀]。例如,如果你想将一个文件分割成每1万行一个文件,后缀是2位,子文件以child开头,可以使用命令:split -a 2 -l 10000 app.log child

  2. 按文件大小切割split -b [大小] [输入文件] [输出文件前缀]。例如,如果你想将一个log文件切割成每个1M大小,后缀是2位数字结尾的子文件,子文件以child开头,可以使用命令:split -a 2 -d -b 1M app.log.10 child

其他常用的选项包括:

  • -a, --suffix-length=N:指定后缀长度为N (默认为2)。
  • -d, --numeric-suffixes:使用数字后缀代替字母后缀。
  • --verbose:在每个输出文件打开前输出文件特征

然后上传的文件系统上,就可以得到文件url地址。
任务表设计信息如下:

CREATE TABLE `t_task` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `file_name` varchar(64) NOT NULL DEFAULT '' COMMENT '文件名称',
  `task_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '0 是初始化 1 运行中 2 已完成',
  `task_progress` int(11) NOT NULL DEFAULT '0' COMMENT '读取进度',
  `file_url` varchar(128) NOT NULL DEFAULT '' COMMENT '保存文件的url',
  `ctime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `utime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

定时任务

我们可以在后台管理系统里面上传我们分割后的文件,文件表一般都是会有一个group的字段,这里文件我们单独设置一个组,以参数的形式在使用xxl-job动态的传到执行器里面。任务重复我们可以使用文件名称或者是文件地址作为唯一键,每生产一条任务向MQ投递一条消息。伪代码如下:

    @XxlJob("createBatchRunTask")
    public void createBatchRunTask(String fileGroup) throws Exception {
        List<File> files = fileMapper.findByGroup(fileGroup);
        if (CollectionUtils.isEmpty(files)) {
            logger.info("暂无文件 {}", fileGroup);
            return;
        }
        for (File f : files) {
          String fileName =  f.getName();
          String url = f.getFileUrl();
          Task t = new Task();
          t.setFileName(fileName);
            try {
                //略....
                taskMapper.insert(t);
                mqServer.send(t);
            } catch (Exception e) {
                 if (e instanceof SQLIntegrityConstraintViolationException)   logger.error("重复任务");
                 else throw e;
              
            }
        }
        // default success
    }

任务处理过程

在监听redis队列中,我们拿到对应的任务,这时候我们需要指定任务的下载的文件目录,同时判断该处理的任务是否存在文件,如果存在可能之前处理过的。如果不存在我们之间下载下来就可以了。
这里就涉及到一个问题,任务可能是补偿任务,不一定是从0行开始的,同时我们是用FileReader+ BufferedReader 逐行读取,这里可以选择两种方案:

  • FileReader+ BufferedReader根据任务进度,将已经读过的数据直接跳过不处理,但是还是需要重0行开始读取
  • 要更有效地跳过文件的一部分,可以使用LineNumberReader类,它是BufferedReader的子类,提供了一个setLineNumber(int lineNumber)方法,可以让你直接跳到指定的行。

在逐行读取数据以后,我们需要调用远程接口,一条一条的处理,同时将结果入库。这里我们需要做对应的设计,在落库的时候,我们是批量入库的,我们需要设置数据库刷盘策略innodb_flush_log_at_trx_commit=2每隔1s刷新到磁盘。同时我们设置mysql的批量插入参数为rewriteBatchedStatements=true,同时关闭调自动提交。
代码示例如下:

Connection conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/myDatabase?rewriteBatchedStatements=true", "username", "password");
conn.setAutoCommit(false);
String sql = "INSERT INTO myTable VALUES (?, ?)";
PreparedStatement pstmt = conn.prepareStatement(sql);

for (int i = 0; i < 10000; i++) {
    pstmt.setString(1, "column1Value");
    pstmt.setString(2, "column2Value");
    pstmt.addBatch();

    if (i % 500 == 0) {
        pstmt.executeBatch();
    }
}

pstmt.executeBatch();
conn.commit();

但是对应事务的刷盘策略设置,有严格的控制权限,运维老哥也不会让你随便修改数据库的配置,如果这样的话,我们只能使用MyIsam了,如果数据量不大,还是使用innodb也是可以的。
我们在批量插入同时,需要记录当前任务进度详情,还有就是更新时间,方便我们将来排查问题,做任务补偿。

控制任务并发度

在我们资源不足的情况,我们希望任务处理的慢点,或者是不用那么快。这个时候我们需要动态修改对应的值。比如使用nacos或者zookeeper这写配置中心,把对应的参数让配置中心管理起来,动态修改以后能够及时生效的。在任务数量少的情况,我们使用限流组件或者是让任务睡眠是可以的。概要设计已经说的明确,这里不做赘述。
具体措施:

  • 任务处理过程中使用TimeUnit.SECODES.sleep();
  • 任务处理中guava的RateLimiter组件,它是规定时间内,只能有多少个线程处理
  • JDK自带并发控制工具,比如Semaphore

任务补偿

在各种情况下任务发生故障的时候,我们任务进度会停滞,我们需要再创建一个定时任务,扫描任务表中任务状态是运行中,并且是更新时间超过了十几分钟或者半个小时(这个可以自行设置对应的阈值),重新入队就可以了。


https://www.xamrdz.com/backend/3gg1935772.html

相关文章: