当前位置: 首页>数据库>正文

mongodb的二级表名 mongodb表设计

        

目录

一、归档配置表设计

二、手动创建线程池工具类 和 MongodbArchiveService 类

三、xxl-job日志归档定时任务入口

四、日志归档处理子类ArchivePrinterLogService


       在项目开发亦或是接口调用,异常记录,历史信息记录都难免不了日志的记录,随着时间的推移,日志表中的数据会变的越来越多,日志定期归档就显得尤为重要。

一、归档配置表设计

为了能够适应需求的变化,归档日期和日志的失效时间可能随时会有变更,我们需要设计一个归档配置表,用来存储日志归档周期,失效时间,以及日志处理类名全路径等信息(这里全路径信息可以利用反射方式获得具体的实现类调用归档接口)

t_archive_rule 归档配置表

code :归档类型,name:归档子类全路径名称 ,table_name:需要归档的日志表名,

archive_day:归档距离当前时间n天之前的数据,archive_type:归档数据失效时间(超过90天自动删除)

mongodb的二级表名 mongodb表设计,mongodb的二级表名 mongodb表设计_mongodb,第1张

 二、手动创建线程池工具类 和 MongodbArchiveService 类

基础实体类

package com.purcotton.wms.recording.domain;

import com.purcotton.wms.general.domian.BaseDocEntity;
import lombok.*;
import org.springframework.data.mongodb.core.index.IndexDirection;
import org.springframework.data.mongodb.core.index.Indexed;
import org.springframework.data.mongodb.core.mapping.Document;

import java.util.Date;

/**
 * 打印日志对象
 *
 * @author : lovezi0 2021/7/23 13:59
 * @version : 1.0
 **/
@EqualsAndHashCode(callSuper = true)
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Document(collection = "archive_printer_log")
public class ArchivePrinterLogDoc extends BaseDocEntity {
    /**
     * 打印参数
     */
    private String params;

    /**
     * 报表代码
     */
    private String reportCode;

    /**
     * 打印数据
     */
    private String body;

    /**
     * 错误
     */
    private String error;

    /**
     * 仓库编码
     */
    private String whCode;

    /**
     * 已打印的次数
     */
    private Integer alreadyPrintCount;

    /**
     * 打印时间
     */
    private String printTime;

    /**
     * 过期时间
     */
    @Indexed(direction = IndexDirection.DESCENDING,expireAfterSeconds = 90*24*60*60)
    private Date expreDate;

}

线程池工具类 

package com.purcotton.wms.recording.util;

import io.netty.util.concurrent.DefaultThreadFactory;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author 500007
 * @ClassName:
 * @Description:
 * @date 2021年12月21日 14:44:46
 */
public class ThreadPoolUtil {

   /**
     * 创建大任务线程池,每个日志表归档任务开启一个线程
     * @param poolName
     * @return
     */
    public static ThreadPoolExecutor generateSchTaskThreadPoolExecutor(String poolName) {
        return new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
                2 * Runtime.getRuntime().availableProcessors() + 1,
                10, TimeUnit.MICROSECONDS,
                new ArrayBlockingQueue<Runnable>(10),
                new DefaultThreadFactory(poolName),
                new ThreadPoolExecutor.CallerRunsPolicy());
    }

    /**
     * 创建小任务线程池,每页数据需要一个线程处理,线程池满的情况下,采用最多等待60秒的方式重新入阻塞队列,超过60秒
     * 未入队列则抛异常
     * @param poolName
     * @return
     */
    public static ThreadPoolExecutor generateSmallTaskThreadPoolExecutor(String poolName) {
        return new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
                2 * Runtime.getRuntime().availableProcessors() + 1,
                10, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(20000),
                new DefaultThreadFactory(poolName),
                new CallerBlocksPolicy(20000));
    }
}

 MongodbArchiveService类,一些公共方法,创建查询请求体和创建失效时间索引方法

package com.purcotton.wms.recording.service.impl;

import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.StrUtil;
import com.purcotton.feign.vo.response.ArchiveHeaderPageResponseFeignResponse;
import com.purcotton.wms.general.factory.WmsExceptionFactory;
import com.purcotton.wms.general.utils.DateUtil;
import com.purcotton.wms.recording.constants.LogCommConstants;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.index.*;
import org.springframework.data.mongodb.core.mapping.Document;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;

import java.lang.reflect.Field;
import java.time.ZoneOffset;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static com.purcotton.wms.general.constant.DateConstant.DEFAULT_DATE_TIME;

/**
 * @author 500007
 * @ClassName:
 * @Description:
 * @date 2021年12月20日 19:59:56
 */
@Service
@Slf4j
public class MongodbArchiveService {

    @Autowired
    private MongoTemplate mongoTemplate;

    /**
     * 通过mongoTemplate创建失效时间索引
     *
     * @param entityClass
     * @param collectionName
     */
    public void createIndex(Class entityClass, String collectionName,Integer expreDays) {
        //如果传入collectionName为空则从类的Document注解获取
        if (StringUtils.isBlank(collectionName)) {
            Document document = (Document) entityClass.getDeclaredAnnotation(Document.class);
            collectionName = document.collection();
        }
        if (StringUtils.isBlank(collectionName)) {
            throw WmsExceptionFactory.getException("创建索引失败,collectionName不能为空");
        }
        //获取字段列表
        Field[] declaredFields = entityClass.getDeclaredFields();
        if (ArrayUtil.isEmpty(declaredFields)) {
            return;
        }
        //获取索引操作器
        IndexOperations indexOperations = this.mongoTemplate.indexOps(collectionName);
        //获取索引列表
        List<IndexInfo> indexInfo = indexOperations.getIndexInfo();
        //转为索引名称list
        List<String> indexNameList = indexInfo.stream().map(IndexInfo::getName).collect(Collectors.toList());
        for (Field field : declaredFields) {
            boolean annotationPresent = field.isAnnotationPresent(Indexed.class);
            if (annotationPresent) {
                //索引字段名称
                String name = field.getName();
                Indexed indexed = field.getDeclaredAnnotation(Indexed.class);
                //索引过期时间
                int expireAfterSeconds;
                if(expreDays!=null){
                    expireAfterSeconds = expreDays*24*60*60;
                }else {
                    expireAfterSeconds =  indexed.expireAfterSeconds();
                }
                //获取索引排序规则
                IndexDirection indexDirection = indexed.direction();
                Sort.Direction direction = indexDirection.compareTo(IndexDirection.DESCENDING) == 0 ? Sort.Direction.DESC : Sort.Direction.ASC;
                //数字型排序规则
                String i = indexDirection.compareTo(IndexDirection.DESCENDING) == 0 ? LogCommConstants.DESC : LogCommConstants.ASC;
                //判断索引是否已经存在
                boolean flag = indexNameList.contains(name + StrUtil.UNDERLINE + i);
                if (!flag) {
                    Index index = new Index(name, direction);
                    index.expire(expireAfterSeconds, TimeUnit.SECONDS);
                    //创建索引
                    indexOperations.ensureIndex(index);
                }
            }
        }
    }


    /**
     * 计算归档日期,当前时间-x天
     * @param header
     * @param now
     * @return
     */
    public Long  calcEndStamp(ArchiveHeaderPageResponseFeignResponse header,Date now){
        //获取归档日期,归档该日期之前的数据
        Integer archiveDate = header.getArchiveDaysBefore();
        if (ObjectUtils.isEmpty(archiveDate)) {
            throw WmsExceptionFactory.getException("打印日志归档时间不能为空");
        }

        Date endDate = DateUtils.addDays(now, -archiveDate);

        String endTimeStr = DateUtil.toString(endDate, DEFAULT_DATE_TIME);
        Long endStamp = null;
        if (StringUtils.isNotBlank(endTimeStr)) {
            endStamp = DateUtil.parseStringToDateTime(endTimeStr, DEFAULT_DATE_TIME).toInstant(ZoneOffset.of("+8")).toEpochMilli();
        }
        return endStamp;
    }

    /**
     * 构建查询参数
     * @param endStamp
     * @return
     */
    public Query createQuery(Long endStamp) {
        Query query = new Query();
        if (null != endStamp) {
            query.addCriteria(Criteria.where("createTime").lte(endStamp));
        }
        query.with(Sort.by(Sort.Direction.DESC, "createTime"));
        return query;
    }
}

三、xxl-job日志归档定时任务入口

package com.purcotton.wms.recording.schedule;

import com.purcotton.feign.client.ArchiveHeaderFeignClient;
import com.purcotton.feign.vo.request.ArchiveHeaderPageRequestFeignRequest;
import com.purcotton.feign.vo.response.ArchiveHeaderPageResponseFeignResponse;
import com.purcotton.obj.response.PageResponse;
import com.purcotton.obj.response.ResponseData;
import com.purcotton.wms.recording.constants.LogCommConstants;
import com.purcotton.wms.recording.util.ThreadPoolUtil;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * @author: ww
 * @date: 2021/12/20
 * @desc :
 */
@Slf4j
@Component
public class AchivedLogHandler implements ApplicationContextAware {

 @Autowired
    private ArchiveHeaderFeignClient archiveHeaderFeignClient;

    private static ApplicationContext applicationContext;

    private static CountDownLatch COUNT_DOWN_LATCH;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    /**
     * 日志记录归档通用方法
     * 按配置表配置反射调用具体归档日志方法
     */
    @XxlJob("achivedLogHandler")
    public ReturnT<String> achivedLogCommon(String code) {
        final StopWatch stopWatch = new StopWatch("achivedLogHandler");
        log.info("achivedLogHandler:日志归档执行开始,配置归档code:{}", code);
        stopWatch.start();
        ArchiveHeaderPageRequestFeignRequest param = new ArchiveHeaderPageRequestFeignRequest();
        param.setPageNum(1);
        param.setPageSize(999);
        param.setCode(LogCommConstants.ARCHIVE_HEADER_CODE);
        ResponseData<PageResponse<ArchiveHeaderPageResponseFeignResponse>> responseData = this.archiveHeaderFeignClient.findPage(param);
        if (responseData == null || responseData.getData() == null) {
            log.error("achivedLogHandler_error:【数据归档】归档编码{}不存在对应的归档设置", code);
            return ReturnT.SUCCESS;
        }
        PageResponse<ArchiveHeaderPageResponseFeignResponse> archiveHeader = responseData.getData();
        List<ArchiveHeaderPageResponseFeignResponse> headerList = archiveHeader.getList();
        if (CollectionUtils.isNotEmpty(headerList)) {
            ThreadPoolExecutor threadPoolExecutor = ThreadPoolUtil.generateSchTaskThreadPoolExecutor("achivedLogHandler");
            COUNT_DOWN_LATCH = new CountDownLatch(headerList.size());
            String tableName = "";
            try {
                for (ArchiveHeaderPageResponseFeignResponse header : headerList) {
                    String className = header.getName();
                    tableName = header.getTableName();
                    Class cls = Class.forName(className);
                    Object bean = applicationContext.getBean(cls);
                    Method mothod = bean.getClass().getDeclaredMethod(LogCommConstants.ARCHIVE_LOG_METHOD, ArchiveHeaderPageResponseFeignResponse.class);
                    threadPoolExecutor.execute(() -> {
                        try {
                            mothod.invoke(bean, header);
                        } catch (Exception e) {
                            log.error("achivedLogHandler_error:归档日志线程池任务执行失败,表名称为:" + header.getTableName(), e);
                        } finally {
                            COUNT_DOWN_LATCH.countDown();
                        }
                    });
                }
            } catch (Exception e) {
                log.error("achivedLogHandler_error:定时任务归档日志反射执行失败,表名称为:" + tableName, e);
            } finally {
                try {
                    COUNT_DOWN_LATCH.await();
                    threadPoolExecutor.shutdown();
                    log.info("achivedLogHandler:日志归档任务执行结束,配置归档code:{}", code);
                } catch (InterruptedException e) {
                    log.error("achivedLogHandler_error:定时任务归档线程池关闭异常", e);
                }
                stopWatch.stop();
                log.info("achivedLogHandler:日志归档定时任务执行结束,总耗时:{}秒", stopWatch.getTotalTimeSeconds());
            }
        }
        return ReturnT.SUCCESS;
    }


}

为了提升日志归档的效率,这里采用线程池进行处理,首选根据code查询所需归档的配置列表,for循环遍历获得每个name,根据name从spring上下文中获取归档任务处理子类的全路径名,通过反射调用任务归档方法。

四、日志归档处理子类ArchivePrinterLogService

前面我们配置了6个日志归档配置数据,也就是说我们会有6个对应的日志归档子类分别进行归档逻辑处理,这里我们仅以一个ArchivePrinterLogService代码作为展示。其他子类归档逻辑类似。

package com.purcotton.wms.recording.service.impl;

import cn.hutool.core.util.ObjectUtil;
import com.purcotton.feign.vo.response.ArchiveHeaderPageResponseFeignResponse;
import com.purcotton.wms.general.domian.PrinterLogDoc;
import com.purcotton.wms.general.service.impl.BaseMongoService;
import com.purcotton.wms.recording.constants.LogCommConstants;
import com.purcotton.wms.recording.domain.ArchivePrinterLogDoc;
import com.purcotton.wms.recording.domain.ShipmentCheckPackingScaledLogDoc;
import com.purcotton.wms.recording.service.AcnhiveLogCommeService;
import com.purcotton.wms.recording.util.ThreadPoolUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;

import java.util.Date;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * @author 500007
 * @ClassName:
 * @Description: 打印日志归档
 * @date 2021年12月20日 16:16:20
 */
@Service
@Slf4j
public class ArchivePrinterLogService extends BaseMongoService<ArchivePrinterLogDoc> implements AcnhiveLogCommeService {

 @Autowired
    private MongodbArchiveService mongodbArchiveService;

    @Override
    public String getCollectionsName() {
        return "archive_printer_log";
    }

    /**
     * 实际归档数量
     */
    private AtomicInteger archiveTotal = null;

    @Override
    public void acnhiveLogOperation(ArchiveHeaderPageResponseFeignResponse header) {
        log.info("achivedLogHandler:{}表开始归档{}天前的日志", getCollectionsName(), header.getArchiveDaysBefore());
        StopWatch stopWatch = new StopWatch(getCollectionsName());
        stopWatch.start();
        long totalCount = 0;
        archiveTotal = new AtomicInteger(0);
        try {
            //失效时间
            Integer expreDays = header.getArchiveType();
            //分页查询并处理数据
            totalCount = this.processLogData(LogCommConstants.ARCHIVE_PAGE_SIZE, header);
            //如果存在索引,则创建索引
            mongodbArchiveService.createIndex(entityClass, getCollectionsName(), expreDays);
        } catch (Exception e) {
            log.error("achivedLogHandler_error:打印日志归档失败", e);
        } finally {
            stopWatch.stop();
            log.info("achivedLogHandler:{}表完成{}天前的日志归档,共计归档{}条数据,耗时:{}秒", getCollectionsName(), header.getArchiveDaysBefore(), totalCount, stopWatch.getTotalTimeSeconds());
        }
    }

    private long processLogData(int pageSize, ArchiveHeaderPageResponseFeignResponse header) throws InterruptedException {
        ThreadPoolExecutor threadPoolExecutor = ThreadPoolUtil.generateSmallTaskThreadPoolExecutor(getCollectionsName());
        CountDownLatch countDownLatch = null;
        long count = 0;
        try {
            //获取总数量
            final Date now = new Date();
            Long endStamp = mongodbArchiveService.calcEndStamp(header, now);
            log.info("achivedLogHandler:打印日志归档时间戳{}",endStamp);
            Query query = mongodbArchiveService.createQuery(endStamp);
            count = mongoTemplate.count(query, PrinterLogDoc.class);
            if (count <= 0) {
                return 0;
            }
            //计算页数
            long page = count % LogCommConstants.ARCHIVE_PAGE_SIZE == 0 ? count / LogCommConstants.ARCHIVE_PAGE_SIZE : count / LogCommConstants.ARCHIVE_PAGE_SIZE + 1;
            countDownLatch = new CountDownLatch((int) page);
            for (int i = 1; i <= page; i++) {
                Query subQuery = mongodbArchiveService.createQuery(endStamp);
                subQuery.limit(LogCommConstants.ARCHIVE_PAGE_SIZE);
                subQuery.skip((long) (i - 1) * pageSize);
                try {
                    threadPoolExecutor.execute(new archiveLogRunable(subQuery, this, countDownLatch));
                } catch (Exception e) {
                    log.error("achivedLogHandler_error:打印日志归档异常,threadPoolName:" + getCollectionsName(), e);
                }
            }
            //主线程等待
            countDownLatch.await();
            log.info("achivedLogHandler:打印日志归档原表数据总数{},实际归档总数{}", count, archiveTotal.get());
            if (count == archiveTotal.get()) {
                //删除原表旧数据
                mongoTemplate.remove(query, PrinterLogDoc.class);
            } else {
                throw WmsExceptionFactory.getException("achivedLogHandler_error:打印日志归档原表数据和归档数据不一致,threadPoolName:" + getCollectionsName());
            }
        } catch (Exception e) {
            log.error("achivedLogHandler_error:打印日志归档异常,threadPoolName:" + getCollectionsName(), e);
        } finally {
            threadPoolExecutor.shutdown();
        }
        return count;
    }

    class archiveLogRunable implements Runnable {
        private Query query;
        private ArchivePrinterLogService archivePrinterLogService;
        private CountDownLatch countDownLatch;

        public archiveLogRunable(Query query, ArchivePrinterLogService archivePrinterLogService, CountDownLatch countDownLatch) {
            this.query = query;
            this.archivePrinterLogService = archivePrinterLogService;
            this.countDownLatch = countDownLatch;
        }


        @Override
        public void run() {
            try {
                List<PrinterLogDoc> list = mongoTemplate.find(query, PrinterLogDoc.class);
                if (!CollectionUtils.isEmpty(list)) {
                    List<ArchivePrinterLogDoc> archiveList = new ArrayList<ArchivePrinterLogDoc>(list.size());
                    //入库到归档表中
                    list.forEach(printerLogDoc -> {
                        ArchivePrinterLogDoc archivePrinterLogDoc = new ArchivePrinterLogDoc();
                        BeanUtils.copyProperties(printerLogDoc, archivePrinterLogDoc);
                        archivePrinterLogDoc.setExpreDate(new Date());
                        archiveList.add(archivePrinterLogDoc);
                    });
                    //批量入库归档表
                    Collection<ArchivePrinterLogDoc> insertAll = mongoTemplate.insertAll(archiveList);
                    if (!CollectionUtils.isEmpty(insertAll)) {
                        archiveTotal.addAndGet(insertAll.size());
                    }
                }
            } catch (Exception e) {
                log.error("achivedLogHandler_error:打印日志记录处理批次任务异常", e);
            } finally {
                this.query = null;
                this.countDownLatch.countDown();
            }
        }
    }
  
}

同样为了提高归档效率,采取了分页查询和线程池并行处理的方式。

注意:我们需要在原始的表中创建一些索引,防止数据量过大分页查询时出现造成查询出来的数据超过mongodb 最大字节数的异常。

mongodb的二级表名 mongodb表设计,mongodb的二级表名 mongodb表设计_spring boot_02,第2张

mongodb的二级表名 mongodb表设计,mongodb的二级表名 mongodb表设计_spring boot_03,第3张


https://www.xamrdz.com/database/6vu1964151.html

相关文章: