需求分析
将大批量的数据,从一个地方,迁移到另外一个地方,如何处理
主要的涉及到的问题
- 亿级数据怎么存
- 怎么防止重复调度
- 怎么做到负载均衡
- 同一个节点,任务怎么并行
- 如何动态调整并发度
- 机器节点挂了怎么办
概要设计
数据存储
数据导出出来以后怎么存?一般是用文件存或者是数据库存。
数据库存储
对于mysql来说,单表不能太大,一亿行数据拆分成多个表,可以拆分十个表,每一个表大约一千万左右。
问题:
- 遍历数据,对数据库压力巨大
- 跑批数据需要删除,造成数据库的碎片问题
文件存储
采用文件系统存储,将文件拆分成多个,而且读取速度快,因为是磁盘顺序读取的
重复调度问题
调度系统我们一般采用比如xxl-job,通常都失败重试的功能,我们要做到访问重复调度的可能,这里可以设计一张表来存储当前任务,如何已经存,则抛弃掉。表的结构类似这样,还可以使用乐观锁更新状态。
负载均衡
跑批系统怎么上集群,我们可以用MQ,把任务均匀的发布的队列上,在MQ的选型上,我们建议选择redis的list来实现,至于为什么?我们后面会说。
单节点任务并行
使用redis需要自己建一个Event Loop的线程,从redis队列中取任务,然后放入线程池中运行,因为我们使用redis队列缓冲,所以线程池的队列长度设置为0。
为什么选redis队列:
一般的分布式消息队列为了提高效率,会预先取很多消息放在本地,然后使用一个线程把消息放到线程池来处理,这就带来一个问题,如果你的任务太少了的话就会被一个客户端捞取很多任务,使用redis
我们就不会有这个问题,因为Event Loop
线程是任务进行完一个之后,才从redis里面再拿一个。
动态调整并发
动态调整任务并发,有两个地方:
1,是任务调用远程接口的速度,这个速度控制可以使用Thread.sleep
就可以了
2,是任务并发度控制,有多少个线程同时在跑任务,也就是控制线程池数量,我们修改动态修改线程数可能比较困难。另外一个思路是使用开源组件,比较guava的RateLimiter组件,如下图示。
失败重试逻辑
任务失败原因:
1,机器重启,导致任务都终止了
2,调用远程接口失败,超过次数,任务就会停止
3,机器资源不足,内存溢出了
解决上面的问题,两个步骤:记录进度和任务重试
- 记录进度,在子任务执行过程中,需要一直刷新执行到任务哪一行了,同时更新进度的时间。
-
分布式任务系统增加一个补偿任务,定时扫描所有还在执行中的子任务,如果发现任务长时间没有更新,那说明任务终止了。将任务状态改成待执行,在放入消息队列就可以了。
详情设计
跑批任务表设计
我们讲大文件切割,然后得到若干个相对小的的文件。在linux中切割文件命令如下:
按行数切割:
split -l [行数] [输入文件] [输出文件前缀]
。例如,如果你想将一个文件分割成每1万行一个文件,后缀是2位,子文件以child开头,可以使用命令:split -a 2 -l 10000 app.log child
。按文件大小切割:
split -b [大小] [输入文件] [输出文件前缀]
。例如,如果你想将一个log文件切割成每个1M大小,后缀是2位数字结尾的子文件,子文件以child开头,可以使用命令:split -a 2 -d -b 1M app.log.10 child
。
其他常用的选项包括:
-
-a, --suffix-length=N
:指定后缀长度为N (默认为2)。 -
-d, --numeric-suffixes
:使用数字后缀代替字母后缀。 -
--verbose
:在每个输出文件打开前输出文件特征
然后上传的文件系统上,就可以得到文件url地址。
任务表设计信息如下:
CREATE TABLE `t_task` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`file_name` varchar(64) NOT NULL DEFAULT '' COMMENT '文件名称',
`task_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '0 是初始化 1 运行中 2 已完成',
`task_progress` int(11) NOT NULL DEFAULT '0' COMMENT '读取进度',
`file_url` varchar(128) NOT NULL DEFAULT '' COMMENT '保存文件的url',
`ctime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`utime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
定时任务
我们可以在后台管理系统里面上传我们分割后的文件,文件表一般都是会有一个group
的字段,这里文件我们单独设置一个组,以参数的形式在使用xxl-job
动态的传到执行器里面。任务重复我们可以使用文件名称或者是文件地址作为唯一键,每生产一条任务向MQ投递一条消息。伪代码如下:
@XxlJob("createBatchRunTask")
public void createBatchRunTask(String fileGroup) throws Exception {
List<File> files = fileMapper.findByGroup(fileGroup);
if (CollectionUtils.isEmpty(files)) {
logger.info("暂无文件 {}", fileGroup);
return;
}
for (File f : files) {
String fileName = f.getName();
String url = f.getFileUrl();
Task t = new Task();
t.setFileName(fileName);
try {
//略....
taskMapper.insert(t);
mqServer.send(t);
} catch (Exception e) {
if (e instanceof SQLIntegrityConstraintViolationException) logger.error("重复任务");
else throw e;
}
}
// default success
}
任务处理过程
在监听redis队列中,我们拿到对应的任务,这时候我们需要指定任务的下载的文件目录,同时判断该处理的任务是否存在文件,如果存在可能之前处理过的。如果不存在我们之间下载下来就可以了。
这里就涉及到一个问题,任务可能是补偿任务,不一定是从0行开始的,同时我们是用FileReader+ BufferedReader
逐行读取,这里可以选择两种方案:
- FileReader+ BufferedReader根据任务进度,将已经读过的数据直接跳过不处理,但是还是需要重0行开始读取
- 要更有效地跳过文件的一部分,可以使用
LineNumberReader
类,它是BufferedReader
的子类,提供了一个setLineNumber(int lineNumber)
方法,可以让你直接跳到指定的行。
在逐行读取数据以后,我们需要调用远程接口,一条一条的处理,同时将结果入库。这里我们需要做对应的设计,在落库的时候,我们是批量入库的,我们需要设置数据库刷盘策略innodb_flush_log_at_trx_commit=2
每隔1s刷新到磁盘。同时我们设置mysql的批量插入参数为rewriteBatchedStatements=true
,同时关闭调自动提交。
代码示例如下:
Connection conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/myDatabase?rewriteBatchedStatements=true", "username", "password");
conn.setAutoCommit(false);
String sql = "INSERT INTO myTable VALUES (?, ?)";
PreparedStatement pstmt = conn.prepareStatement(sql);
for (int i = 0; i < 10000; i++) {
pstmt.setString(1, "column1Value");
pstmt.setString(2, "column2Value");
pstmt.addBatch();
if (i % 500 == 0) {
pstmt.executeBatch();
}
}
pstmt.executeBatch();
conn.commit();
但是对应事务的刷盘策略设置,有严格的控制权限,运维老哥也不会让你随便修改数据库的配置,如果这样的话,我们只能使用MyIsam了,如果数据量不大,还是使用innodb也是可以的。
我们在批量插入同时,需要记录当前任务进度详情,还有就是更新时间,方便我们将来排查问题,做任务补偿。
控制任务并发度
在我们资源不足的情况,我们希望任务处理的慢点,或者是不用那么快。这个时候我们需要动态修改对应的值。比如使用nacos或者zookeeper这写配置中心,把对应的参数让配置中心管理起来,动态修改以后能够及时生效的。在任务数量少的情况,我们使用限流组件或者是让任务睡眠是可以的。概要设计已经说的明确,这里不做赘述。
具体措施:
- 任务处理过程中使用
TimeUnit.SECODES.sleep()
; - 任务处理中guava的RateLimiter组件,它是规定时间内,只能有多少个线程处理
- JDK自带并发控制工具,比如
Semaphore
任务补偿
在各种情况下任务发生故障的时候,我们任务进度会停滞,我们需要再创建一个定时任务,扫描任务表中任务状态是运行中,并且是更新时间超过了十几分钟或者半个小时(这个可以自行设置对应的阈值),重新入队就可以了。