前言
在齿研社的时候随着用户的增长mysql实现的搜索功能已经无法支撑用户任意输入搜索的诉求,速度也越来越慢,于是技术团队开始计划使用es重写搜索。那么摆在我们面前的第一个问题就是如何把数据同步到es。
技术选型
常见cdc开源方案
开源方案的缺点
1.需要搭建额外的服务,增加了一定的运维成本。
2.cancel等部分社区不活跃。
云供应商方案
阿里云数据传输服务DTS
数据传输服务DTS(Data Transmission Service)是阿里云提供的实时数据流服务,支持关系型数据库(RDBMS)、非关系型的数据库(NoSQL)、数据多维分析(OLAP)等数据源间的数据交互,集数据同步、迁移、订阅、集成、加工于一体,构建安全、可扩展、高可用的数据架构。
dts的优点
基于日志的cdcd机制,支持增量全量断点续传。而我们的数据库也是阿里云的rds mysql,订阅的操作直接在控制台就可以操作,且订阅的schema发生变化(比如增加订阅的table),也无需重启订阅服务。
在支持数据同步的同时,也支持数据的迁移,方便我们把开发环境的一些数据配置同步到线上。
同时支持多种常见数据源
同数据源可直接全量同步,异构数据源需要映射对应的schema。
比如将mysql数据同步至es。
binlog的可用性
由于阿里云是会自动删除binlog,每个大小512,默认存在3小时,可设置默认存储个数,存储空间不够可能会删除最近产生的binlog。所以我们可以使用将binlog同步至oss的方式存储
MySQL数据库逻辑备份_数据库备份(DBS)-阿里云帮助中心 (aliyun.com)
使用须知
sdk实际上就是一个kafka client的封装,consumer group在控制台配置,一个订阅对象,只有单个topic,最多可添加20个消费组,通过创建多个消费组可以实现数据的重复消费。
一个消费组只能创建一个消费者(consumer),并通过该消费者执行数据消费。
单行数据超过16MB,则可能会导致消费客户端内存OOM(Out of Memory)。
两种模式
ASSIGN模式:DTS为了保证消息的全局有序,每个订阅Topic只有一个partition,且固定分配至partition 0中。当SDK客户端的使用模式为ASSIGN模式时,建议只启动一个SDK客户端。
SUBSCRIBE模式:DTS为了保证消息的全局有序,每个订阅Topic只有一个partition,且固定分配至partition 0中。当SDK客户端的使用模式为SUBSCRIBE模式时,您可以在一个消费组下同时启动多个SDK客户端,以实现灾备。实现原理是当消费组下的正常消费数据的客户端发生故障后,其他的SDK客户端将随机且自动地分配到partition 0,继续消费。
管理消费位点
sdk首次启动,或者重启服务,内部重试的情况下,我们需要传入offset。
- 从源码得知sdk客户端每5秒保存一次,并提交至dts服务器,所以我们可以在
- SDK客户端所在服务器localCheckpointStore文件查找最近的一次消费位置。
- 控制台的 数据消费 页面查看。
- 实现UserMetaStore()接口,将消费点位存储到
数据库之类的存储介质,该接口每5秒调用一次。
消费速度变慢的排查方式
通过查询dts统计信息中的参数DStoreRecordQueue和DefaultUserRecordQueue
如参数DStoreRecordQueue保持为0,则表示DTS服务器拉取数据速度变慢。
如参数DefaultUserRecordQueue保持为默认值512,则表示SDK客户端消费数据的速度变慢。
增加消费速度的几种解决方案。
1.增加消费者线程数,需要注意提交offset的时候需要提交连续的offset的最大值,来保证消息不丢失。
2.付费增加订阅对象,也就是增加topic。让不同topic专注于处理无交集的表日志。