(一)概述
关系型数据库是我们程序员日常开发中最常用的工具,通过Mysql、Oracle等软件,将我们要处理的数据存储在关系型数据库中。为了能够在Hadoop上分析这些关系型数据库,我们需要通一些设定好的框架,将关系型数据库中的【结构化】数据存储到HDFS上,以便采用MapReduce和Spark做进一步的运算处理。为了便于后续数据可视化工作,我们将数据计算出来后,通常会再倒回到关系型数据库中。这样,HDSF与关系型数据便有了频繁的交互,采用Sqoop和Canal一类的工具便非常合适。工具、Hadoop、数据库之间的关系如下:
Mysql – Sqoop/Canal – HDFS – MapReduce/Spark – HDFS - Sqoop/Canal – Mysql
从架构的角度看,Sqoop与Canal的设计动机有如下三种:
- 数据迁移:通常情况下,大多数公司的商业数据都是存储在关系型数据库中,如果数据规模持续增长,综合考虑扩展性、成本、安全等场景,需要迁移到Hadoop平台上,可以方便的使用诸如Sql on Hadoop工具进行大规模数据的分析。为了将数据每日全量或增量的同步到Hadoop平台,需要使用Sqoop与Canal框架;
- 结果展示可视化:Hadoop平台所处理的数据量非常大,通常是PB级别的,但产生的最终结果数据可能不会太大,如报表数据通常只有几KB的结果。为了更好的将这类数据做可视化分析,比较主流的做法是将这些分析数据,重新导入到关系型数据库中,由前端调用并进行可视化展示。
(二)Sqoop概述
官网对Sqoop的描述是:Sqoop是针对关系型数据库(RDBMS)与Hadoop之间进行数据传输的工具,数据传输的过程大部分是自动的,通过MapReduce过程来实现,只需要依赖数据库的Schema信息。Sqoop所执行的操作是并行的,数据传输性能高,具备较好的容错性,并且能够自动转换数据类型。
值得注意的是,Sqoop存在两个版本,版本号分别是1.4.x和1.9.x,通常被称为Sqoop1和Sqoop2。Sqoop2在架构和实现上,对于Sqoop1做了比较大幅度的改进,因此两个版本之间是不兼容的。
Sqoop1的基本架构如下:
Sqoop1主要采用命令行的方式来执行任务,如果需要客户端调用,对应的命令信息需要自行配置和拼接。Sqoop1必须严格执行JDBC机制,加密上需要用户自行设定。在执行上,当数据同步的shell命令提交之后,Sqoop会从配置文件中读取数据库的元数据信息,并根据表的大小将数据分割成很多切片,每一个切片由一个MapTask来处理,同时处理数据的同步。Sqoop允许用户定制各种参数来控制对应的任务,包括并行度、数据源、超时时间等。Sqoop1的命令主要分为import命令和export命令。
import命令对应例子如下:
$ sqoop import --connect jdbc:mysql://database.example.com/employees --username admin --password 12345
常用参数如下:
export命令对应例子如下:
sqoop export --connect jdbc:mysql://db.example.com/foo --table bar --export-dir /results/bar_data --validate
常用参数如下:
Sqoop2的基本架构如下:
Sqoop2采用C/S架构,即Client和Server,实现了配置信息的API化,允许用户通过Java等更高端的语言来开发任务作业。Sqoop2 Client包含了用户使用Sqoop2的方式,包括了客户端命令行及浏览器两种方式,允许用户直接通过HTTP的方式来完成数据的同步工作。Sqoop2 Server负责实现数据的同步工作,主要包括了三个组件:
- Connector:抽象成为三个模块,主要负责MapReduce作业的生成和运行工作。Partitioner负责如何对数据进行分片;Extractor负责将分片的数据切割成为一条一条的数据;Loader负责读取数据,并以约定的格式写入到目标数据源中。
- Metadata:存储了Sqoop2中的元数据信息,包括了可用Connector列表、作业列表及Link信息等。
- Restful及Http Server:负责与Client进行通信,相应用户请求。
下面介绍一下Java客户端的例子:
主方法:
public class SqoopClient {
...
}
项目依赖:
<dependency>
<groupId>org.apache.sqoop</groupId>
<artifactId>sqoop-client</artifactId>
<version>${requestedVersion}</version>
</dependency>
初始化:
String url = "http://localhost:12000/sqoop/";
SqoopClient client = new SqoopClient(url);
创建及保存链接:
MLink link = client.createLink("connectorName");
link.setName("Vampire");
link.setCreationUser("Buffy");
MLinkConfig linkConfig = link.getConnectorLinkConfig();
linkConfig.getStringInput("linkConfig.connectionString").setValue("jdbc:mysql://localhost/my");
linkConfig.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.jdbc.Driver");
linkConfig.getStringInput("linkConfig.username").setValue("root");
linkConfig.getStringInput("linkConfig.password").setValue("root");
Status status = client.saveLink(link);
if(status.canProceed()) {
System.out.println("Created Link with Link Name : " + link.getName());
} else {
System.out.println("Something went wrong creating the link");
}
创建及保存Job:
String url = "http://localhost:12000/sqoop/";
SqoopClient client = new SqoopClient(url);
MJob job = client.createJob("fromLinkName", "toLinkName");
job.setName("Vampire");
job.setCreationUser("Buffy");
MFromConfig fromJobConfig = job.getFromJobConfig();
fromJobConfig.getStringInput("fromJobConfig.schemaName").setValue("sqoop");
fromJobConfig.getStringInput("fromJobConfig.tableName").setValue("sqoop");
fromJobConfig.getStringInput("fromJobConfig.partitionColumn").setValue("id");
MToConfig toJobConfig = job.getToJobConfig();
toJobConfig.getStringInput("toJobConfig.outputDirectory").setValue("/usr/tmp");
MDriverConfig driverConfig = job.getDriverConfig();
driverConfig.getStringInput("throttlingConfig.numExtractors").setValue("3");
Status status = client.saveJob(job);
if(status.canProceed()) {
System.out.println("Created Job with Job Name: "+ job.getName());
} else {
System.out.println("Something went wrong creating the job");
}
启动任务:
MSubmission submission = client.startJob("jobName");
System.out.println("Job Submission Status : " + submission.getStatus());
if(submission.getStatus().isRunning() && submission.getProgress() != -1) {
System.out.println("Progress : " + String.format("%.2f %%", submission.getProgress() * 100));
}
System.out.println("Hadoop job id :" + submission.getExternalId());
System.out.println("Job link : " + submission.getExternalLink());
Counters counters = submission.getCounters();
if(counters != null) {
System.out.println("Counters:");
for(CounterGroup group : counters) {
System.out.print("\t");
System.out.println(group.getName());
for(Counter counter : group) {
System.out.print("\t\t");
System.out.print(counter.getName());
System.out.print(": ");
System.out.println(counter.getValue());
}
}
}
if(submission.getExceptionInfo() != null) {
System.out.println("Exception info : " +submission.getExceptionInfo());
}
MSubmission submission = client.getJobStatus("jobName");
if(submission.getStatus().isRunning() && submission.getProgress() != -1) {
System.out.println("Progress : " + String.format("%.2f %%", submission.getProgress() * 100));
}
submission.stopJob("jobName");
Sqoop1与Sqoop2的主要区别如下:
(三)Canal概述
虽然Sqoop能够实现对关系型数据的全量同步,但在很多业务场景下,由于数据量非常非常大,每天全量同步,对于Hadoop的压力较大,因此要慎用。出于性能和业务两方面的考虑,我们需要实现基于CDC的数据同步方案,也就是捕获数据源中更新的数据,从而获得增量数据的更新。
CDC方案在实际中应用场景广泛,包括:
- 异地机房同步:公网由于带宽有限,因此需要严格的控制传输流量;
- 数据实时备份:类似于Mysql的Master/Slave模式;
- 业务缓存刷新:缓存系统通过增量数据,获得数据库更新信息,进而刷新缓存;
- 数据全库迁移:在历史数据迁移完成前,先获取新增数据并提供给系统使用;
- 搜索索引构建:例如倒排索引、拆分异构索引等;
- 增量业务逻辑:很多业务逻辑只需要用到增量数据即可。
Canal工作原理:
Mysql主备复制原理:
- Master 将数据变更写入二进制日志( binary log);
- Slave 将 Master的 binary log events 拷贝到它的中继日志(relay log);
- Slave 重放中继日志的事件,将数据变更反映它自己的数据。
Canal在其中的作用:
- Canal 模拟 MySQL的交互协议,伪装自己为Slave,向Master发送dump 协议;
- Master收到dump请求,开始推送 binary log 给Slave(即Canal);
- Canal解析binary log,获得Mysql的更新信息。
Canal的GitHub地址为:https://github.com/alibaba/canal
Canal的客户端例子如下:
依赖配置:
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
ClientSample代码:
public class SimpleCanalClientExample {
public static void main(String args[]) {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
11111), "example", "", "");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*\..*");
connector.rollback();
int totalEmptyCount = 120;
while (emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
emptyCount = 0;
// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}
private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
运行任务并查看结果:
mysql> use test;
Database changed
mysql> CREATE TABLE `xdual` (
-> `ID` int(11) NOT NULL AUTO_INCREMENT,
-> `X` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
-> PRIMARY KEY (`ID`)
-> ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 ;
Query OK, 0 rows affected (0.06 sec)
mysql> insert into xdual(id,x) values(null,now());Query OK, 1 row affected (0.06 sec)
empty count : 1
empty count : 2
empty count : 3
empty count : 4
================> binlog[mysql-bin.001946:313661577] , name[test,xdual] , eventType : INSERT
ID : 4 update=true
X : 2013-02-05 23:29:46 update=true