目录
一、归档配置表设计
二、手动创建线程池工具类 和 MongodbArchiveService 类
三、xxl-job日志归档定时任务入口
四、日志归档处理子类ArchivePrinterLogService
在项目开发亦或是接口调用,异常记录,历史信息记录都难免不了日志的记录,随着时间的推移,日志表中的数据会变的越来越多,日志定期归档就显得尤为重要。
一、归档配置表设计
为了能够适应需求的变化,归档日期和日志的失效时间可能随时会有变更,我们需要设计一个归档配置表,用来存储日志归档周期,失效时间,以及日志处理类名全路径等信息(这里全路径信息可以利用反射方式获得具体的实现类调用归档接口)
t_archive_rule 归档配置表
code :归档类型,name:归档子类全路径名称 ,table_name:需要归档的日志表名,
archive_day:归档距离当前时间n天之前的数据,archive_type:归档数据失效时间(超过90天自动删除)
二、手动创建线程池工具类 和 MongodbArchiveService 类
基础实体类
package com.purcotton.wms.recording.domain;
import com.purcotton.wms.general.domian.BaseDocEntity;
import lombok.*;
import org.springframework.data.mongodb.core.index.IndexDirection;
import org.springframework.data.mongodb.core.index.Indexed;
import org.springframework.data.mongodb.core.mapping.Document;
import java.util.Date;
/**
* 打印日志对象
*
* @author : lovezi0 2021/7/23 13:59
* @version : 1.0
**/
@EqualsAndHashCode(callSuper = true)
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Document(collection = "archive_printer_log")
public class ArchivePrinterLogDoc extends BaseDocEntity {
/**
* 打印参数
*/
private String params;
/**
* 报表代码
*/
private String reportCode;
/**
* 打印数据
*/
private String body;
/**
* 错误
*/
private String error;
/**
* 仓库编码
*/
private String whCode;
/**
* 已打印的次数
*/
private Integer alreadyPrintCount;
/**
* 打印时间
*/
private String printTime;
/**
* 过期时间
*/
@Indexed(direction = IndexDirection.DESCENDING,expireAfterSeconds = 90*24*60*60)
private Date expreDate;
}
线程池工具类
package com.purcotton.wms.recording.util;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author 500007
* @ClassName:
* @Description:
* @date 2021年12月21日 14:44:46
*/
public class ThreadPoolUtil {
/**
* 创建大任务线程池,每个日志表归档任务开启一个线程
* @param poolName
* @return
*/
public static ThreadPoolExecutor generateSchTaskThreadPoolExecutor(String poolName) {
return new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
2 * Runtime.getRuntime().availableProcessors() + 1,
10, TimeUnit.MICROSECONDS,
new ArrayBlockingQueue<Runnable>(10),
new DefaultThreadFactory(poolName),
new ThreadPoolExecutor.CallerRunsPolicy());
}
/**
* 创建小任务线程池,每页数据需要一个线程处理,线程池满的情况下,采用最多等待60秒的方式重新入阻塞队列,超过60秒
* 未入队列则抛异常
* @param poolName
* @return
*/
public static ThreadPoolExecutor generateSmallTaskThreadPoolExecutor(String poolName) {
return new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
2 * Runtime.getRuntime().availableProcessors() + 1,
10, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(20000),
new DefaultThreadFactory(poolName),
new CallerBlocksPolicy(20000));
}
}
MongodbArchiveService类,一些公共方法,创建查询请求体和创建失效时间索引方法
package com.purcotton.wms.recording.service.impl;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.StrUtil;
import com.purcotton.feign.vo.response.ArchiveHeaderPageResponseFeignResponse;
import com.purcotton.wms.general.factory.WmsExceptionFactory;
import com.purcotton.wms.general.utils.DateUtil;
import com.purcotton.wms.recording.constants.LogCommConstants;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.index.*;
import org.springframework.data.mongodb.core.mapping.Document;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import java.lang.reflect.Field;
import java.time.ZoneOffset;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static com.purcotton.wms.general.constant.DateConstant.DEFAULT_DATE_TIME;
/**
* @author 500007
* @ClassName:
* @Description:
* @date 2021年12月20日 19:59:56
*/
@Service
@Slf4j
public class MongodbArchiveService {
@Autowired
private MongoTemplate mongoTemplate;
/**
* 通过mongoTemplate创建失效时间索引
*
* @param entityClass
* @param collectionName
*/
public void createIndex(Class entityClass, String collectionName,Integer expreDays) {
//如果传入collectionName为空则从类的Document注解获取
if (StringUtils.isBlank(collectionName)) {
Document document = (Document) entityClass.getDeclaredAnnotation(Document.class);
collectionName = document.collection();
}
if (StringUtils.isBlank(collectionName)) {
throw WmsExceptionFactory.getException("创建索引失败,collectionName不能为空");
}
//获取字段列表
Field[] declaredFields = entityClass.getDeclaredFields();
if (ArrayUtil.isEmpty(declaredFields)) {
return;
}
//获取索引操作器
IndexOperations indexOperations = this.mongoTemplate.indexOps(collectionName);
//获取索引列表
List<IndexInfo> indexInfo = indexOperations.getIndexInfo();
//转为索引名称list
List<String> indexNameList = indexInfo.stream().map(IndexInfo::getName).collect(Collectors.toList());
for (Field field : declaredFields) {
boolean annotationPresent = field.isAnnotationPresent(Indexed.class);
if (annotationPresent) {
//索引字段名称
String name = field.getName();
Indexed indexed = field.getDeclaredAnnotation(Indexed.class);
//索引过期时间
int expireAfterSeconds;
if(expreDays!=null){
expireAfterSeconds = expreDays*24*60*60;
}else {
expireAfterSeconds = indexed.expireAfterSeconds();
}
//获取索引排序规则
IndexDirection indexDirection = indexed.direction();
Sort.Direction direction = indexDirection.compareTo(IndexDirection.DESCENDING) == 0 ? Sort.Direction.DESC : Sort.Direction.ASC;
//数字型排序规则
String i = indexDirection.compareTo(IndexDirection.DESCENDING) == 0 ? LogCommConstants.DESC : LogCommConstants.ASC;
//判断索引是否已经存在
boolean flag = indexNameList.contains(name + StrUtil.UNDERLINE + i);
if (!flag) {
Index index = new Index(name, direction);
index.expire(expireAfterSeconds, TimeUnit.SECONDS);
//创建索引
indexOperations.ensureIndex(index);
}
}
}
}
/**
* 计算归档日期,当前时间-x天
* @param header
* @param now
* @return
*/
public Long calcEndStamp(ArchiveHeaderPageResponseFeignResponse header,Date now){
//获取归档日期,归档该日期之前的数据
Integer archiveDate = header.getArchiveDaysBefore();
if (ObjectUtils.isEmpty(archiveDate)) {
throw WmsExceptionFactory.getException("打印日志归档时间不能为空");
}
Date endDate = DateUtils.addDays(now, -archiveDate);
String endTimeStr = DateUtil.toString(endDate, DEFAULT_DATE_TIME);
Long endStamp = null;
if (StringUtils.isNotBlank(endTimeStr)) {
endStamp = DateUtil.parseStringToDateTime(endTimeStr, DEFAULT_DATE_TIME).toInstant(ZoneOffset.of("+8")).toEpochMilli();
}
return endStamp;
}
/**
* 构建查询参数
* @param endStamp
* @return
*/
public Query createQuery(Long endStamp) {
Query query = new Query();
if (null != endStamp) {
query.addCriteria(Criteria.where("createTime").lte(endStamp));
}
query.with(Sort.by(Sort.Direction.DESC, "createTime"));
return query;
}
}
三、xxl-job日志归档定时任务入口
package com.purcotton.wms.recording.schedule;
import com.purcotton.feign.client.ArchiveHeaderFeignClient;
import com.purcotton.feign.vo.request.ArchiveHeaderPageRequestFeignRequest;
import com.purcotton.feign.vo.response.ArchiveHeaderPageResponseFeignResponse;
import com.purcotton.obj.response.PageResponse;
import com.purcotton.obj.response.ResponseData;
import com.purcotton.wms.recording.constants.LogCommConstants;
import com.purcotton.wms.recording.util.ThreadPoolUtil;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author: ww
* @date: 2021/12/20
* @desc :
*/
@Slf4j
@Component
public class AchivedLogHandler implements ApplicationContextAware {
@Autowired
private ArchiveHeaderFeignClient archiveHeaderFeignClient;
private static ApplicationContext applicationContext;
private static CountDownLatch COUNT_DOWN_LATCH;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
/**
* 日志记录归档通用方法
* 按配置表配置反射调用具体归档日志方法
*/
@XxlJob("achivedLogHandler")
public ReturnT<String> achivedLogCommon(String code) {
final StopWatch stopWatch = new StopWatch("achivedLogHandler");
log.info("achivedLogHandler:日志归档执行开始,配置归档code:{}", code);
stopWatch.start();
ArchiveHeaderPageRequestFeignRequest param = new ArchiveHeaderPageRequestFeignRequest();
param.setPageNum(1);
param.setPageSize(999);
param.setCode(LogCommConstants.ARCHIVE_HEADER_CODE);
ResponseData<PageResponse<ArchiveHeaderPageResponseFeignResponse>> responseData = this.archiveHeaderFeignClient.findPage(param);
if (responseData == null || responseData.getData() == null) {
log.error("achivedLogHandler_error:【数据归档】归档编码{}不存在对应的归档设置", code);
return ReturnT.SUCCESS;
}
PageResponse<ArchiveHeaderPageResponseFeignResponse> archiveHeader = responseData.getData();
List<ArchiveHeaderPageResponseFeignResponse> headerList = archiveHeader.getList();
if (CollectionUtils.isNotEmpty(headerList)) {
ThreadPoolExecutor threadPoolExecutor = ThreadPoolUtil.generateSchTaskThreadPoolExecutor("achivedLogHandler");
COUNT_DOWN_LATCH = new CountDownLatch(headerList.size());
String tableName = "";
try {
for (ArchiveHeaderPageResponseFeignResponse header : headerList) {
String className = header.getName();
tableName = header.getTableName();
Class cls = Class.forName(className);
Object bean = applicationContext.getBean(cls);
Method mothod = bean.getClass().getDeclaredMethod(LogCommConstants.ARCHIVE_LOG_METHOD, ArchiveHeaderPageResponseFeignResponse.class);
threadPoolExecutor.execute(() -> {
try {
mothod.invoke(bean, header);
} catch (Exception e) {
log.error("achivedLogHandler_error:归档日志线程池任务执行失败,表名称为:" + header.getTableName(), e);
} finally {
COUNT_DOWN_LATCH.countDown();
}
});
}
} catch (Exception e) {
log.error("achivedLogHandler_error:定时任务归档日志反射执行失败,表名称为:" + tableName, e);
} finally {
try {
COUNT_DOWN_LATCH.await();
threadPoolExecutor.shutdown();
log.info("achivedLogHandler:日志归档任务执行结束,配置归档code:{}", code);
} catch (InterruptedException e) {
log.error("achivedLogHandler_error:定时任务归档线程池关闭异常", e);
}
stopWatch.stop();
log.info("achivedLogHandler:日志归档定时任务执行结束,总耗时:{}秒", stopWatch.getTotalTimeSeconds());
}
}
return ReturnT.SUCCESS;
}
}
为了提升日志归档的效率,这里采用线程池进行处理,首选根据code查询所需归档的配置列表,for循环遍历获得每个name,根据name从spring上下文中获取归档任务处理子类的全路径名,通过反射调用任务归档方法。
四、日志归档处理子类ArchivePrinterLogService
前面我们配置了6个日志归档配置数据,也就是说我们会有6个对应的日志归档子类分别进行归档逻辑处理,这里我们仅以一个ArchivePrinterLogService代码作为展示。其他子类归档逻辑类似。
package com.purcotton.wms.recording.service.impl;
import cn.hutool.core.util.ObjectUtil;
import com.purcotton.feign.vo.response.ArchiveHeaderPageResponseFeignResponse;
import com.purcotton.wms.general.domian.PrinterLogDoc;
import com.purcotton.wms.general.service.impl.BaseMongoService;
import com.purcotton.wms.recording.constants.LogCommConstants;
import com.purcotton.wms.recording.domain.ArchivePrinterLogDoc;
import com.purcotton.wms.recording.domain.ShipmentCheckPackingScaledLogDoc;
import com.purcotton.wms.recording.service.AcnhiveLogCommeService;
import com.purcotton.wms.recording.util.ThreadPoolUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author 500007
* @ClassName:
* @Description: 打印日志归档
* @date 2021年12月20日 16:16:20
*/
@Service
@Slf4j
public class ArchivePrinterLogService extends BaseMongoService<ArchivePrinterLogDoc> implements AcnhiveLogCommeService {
@Autowired
private MongodbArchiveService mongodbArchiveService;
@Override
public String getCollectionsName() {
return "archive_printer_log";
}
/**
* 实际归档数量
*/
private AtomicInteger archiveTotal = null;
@Override
public void acnhiveLogOperation(ArchiveHeaderPageResponseFeignResponse header) {
log.info("achivedLogHandler:{}表开始归档{}天前的日志", getCollectionsName(), header.getArchiveDaysBefore());
StopWatch stopWatch = new StopWatch(getCollectionsName());
stopWatch.start();
long totalCount = 0;
archiveTotal = new AtomicInteger(0);
try {
//失效时间
Integer expreDays = header.getArchiveType();
//分页查询并处理数据
totalCount = this.processLogData(LogCommConstants.ARCHIVE_PAGE_SIZE, header);
//如果存在索引,则创建索引
mongodbArchiveService.createIndex(entityClass, getCollectionsName(), expreDays);
} catch (Exception e) {
log.error("achivedLogHandler_error:打印日志归档失败", e);
} finally {
stopWatch.stop();
log.info("achivedLogHandler:{}表完成{}天前的日志归档,共计归档{}条数据,耗时:{}秒", getCollectionsName(), header.getArchiveDaysBefore(), totalCount, stopWatch.getTotalTimeSeconds());
}
}
private long processLogData(int pageSize, ArchiveHeaderPageResponseFeignResponse header) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = ThreadPoolUtil.generateSmallTaskThreadPoolExecutor(getCollectionsName());
CountDownLatch countDownLatch = null;
long count = 0;
try {
//获取总数量
final Date now = new Date();
Long endStamp = mongodbArchiveService.calcEndStamp(header, now);
log.info("achivedLogHandler:打印日志归档时间戳{}",endStamp);
Query query = mongodbArchiveService.createQuery(endStamp);
count = mongoTemplate.count(query, PrinterLogDoc.class);
if (count <= 0) {
return 0;
}
//计算页数
long page = count % LogCommConstants.ARCHIVE_PAGE_SIZE == 0 ? count / LogCommConstants.ARCHIVE_PAGE_SIZE : count / LogCommConstants.ARCHIVE_PAGE_SIZE + 1;
countDownLatch = new CountDownLatch((int) page);
for (int i = 1; i <= page; i++) {
Query subQuery = mongodbArchiveService.createQuery(endStamp);
subQuery.limit(LogCommConstants.ARCHIVE_PAGE_SIZE);
subQuery.skip((long) (i - 1) * pageSize);
try {
threadPoolExecutor.execute(new archiveLogRunable(subQuery, this, countDownLatch));
} catch (Exception e) {
log.error("achivedLogHandler_error:打印日志归档异常,threadPoolName:" + getCollectionsName(), e);
}
}
//主线程等待
countDownLatch.await();
log.info("achivedLogHandler:打印日志归档原表数据总数{},实际归档总数{}", count, archiveTotal.get());
if (count == archiveTotal.get()) {
//删除原表旧数据
mongoTemplate.remove(query, PrinterLogDoc.class);
} else {
throw WmsExceptionFactory.getException("achivedLogHandler_error:打印日志归档原表数据和归档数据不一致,threadPoolName:" + getCollectionsName());
}
} catch (Exception e) {
log.error("achivedLogHandler_error:打印日志归档异常,threadPoolName:" + getCollectionsName(), e);
} finally {
threadPoolExecutor.shutdown();
}
return count;
}
class archiveLogRunable implements Runnable {
private Query query;
private ArchivePrinterLogService archivePrinterLogService;
private CountDownLatch countDownLatch;
public archiveLogRunable(Query query, ArchivePrinterLogService archivePrinterLogService, CountDownLatch countDownLatch) {
this.query = query;
this.archivePrinterLogService = archivePrinterLogService;
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
List<PrinterLogDoc> list = mongoTemplate.find(query, PrinterLogDoc.class);
if (!CollectionUtils.isEmpty(list)) {
List<ArchivePrinterLogDoc> archiveList = new ArrayList<ArchivePrinterLogDoc>(list.size());
//入库到归档表中
list.forEach(printerLogDoc -> {
ArchivePrinterLogDoc archivePrinterLogDoc = new ArchivePrinterLogDoc();
BeanUtils.copyProperties(printerLogDoc, archivePrinterLogDoc);
archivePrinterLogDoc.setExpreDate(new Date());
archiveList.add(archivePrinterLogDoc);
});
//批量入库归档表
Collection<ArchivePrinterLogDoc> insertAll = mongoTemplate.insertAll(archiveList);
if (!CollectionUtils.isEmpty(insertAll)) {
archiveTotal.addAndGet(insertAll.size());
}
}
} catch (Exception e) {
log.error("achivedLogHandler_error:打印日志记录处理批次任务异常", e);
} finally {
this.query = null;
this.countDownLatch.countDown();
}
}
}
}
同样为了提高归档效率,采取了分页查询和线程池并行处理的方式。
注意:我们需要在原始的表中创建一些索引,防止数据量过大分页查询时出现造成查询出来的数据超过mongodb 最大字节数的异常。