对那些想快速把数据传输到其Hadoop集群的企业来说,《Apache kafka入门篇:工作原理简介》
本文是面向技术人员编写的。阅读本文你将了解到我是如何通过Kafka把关系数据库管理系统(RDBMS)中的数据实时写入到
总体解决架构
下图展示RDBMS中的事务数据如何结合Kafka、
如果想及时了解Spark、Hadoop或者Hbase相关的文章,欢迎关注微信公共帐号:iteblog_hadoop
7步把RDBMS的数据实时写入Hadoop
现在我们来深入这个解决方案的细节,我将展示你如何可以通过仅仅几步就把数据实时导入到Hadoop中。
1、从RDBMS中抽取数据
所有的关系型数据库都有一个日志文件用于记录最新的事务信息。我们流解决方案的第一步就是获取这些事务数据,并使得Hadoop可以解析这些事务格式。(关于如何解析这些事务日志,原作者并没有介绍,可能涉及到商业信息。)
2、启动Kafka Producer
将消息发送到Kafka主题的进程成为生产者。Topic将Kafka中同类消息写入到一起。RDBMS中的事务消息将被转换到Kafka的Topic中。对于我们的例子来说,我们有一个销售团队的数据库,其中的事务信息都会被发布到Kafka的Topic中,下面步骤是启动Kafka producer的必要步骤: $ cd /usr/hdp/2.4.0.0-169/kafka
$ bin/kafka-topics.sh --create --zookeeper www.iteblog.com:2181 --replication-factor 1 --partitions 1 --topic SalesDBTransactions
Created topic "SalesDBTransactions".
$ bin/kafka-topics.sh --list --zookeeper www.iteblog.com:2181
SalesDBTransactions
3、设置Hive
我们在Hive中创建一个表用于接收销售团队数据库的事务信息。这例子中我们将重建一个名为customers的表: [iteblog@sandbox ~]$ beeline -u jdbc:hive2:// -n hive -p hive
0: jdbc:hive2://> use raj;
create table customers (id string, name string, email string, street_address string, company string)
partitioned by (time string)
clustered by (id) into 5 buckets stored as orc
location '/user/iteblog/salescust'
TBLPROPERTIES ('transactional'='true');
为了在Hive中启用事务,我们需要在Hive中进行如下的配置: hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
4、启动会一个
下面我们将创建一个Flume Agent,其将会把Kafka主题中的数据发送到Hive相应的表中。按照下面步骤在启动Flume agent之前设置好相关的环境变量: $ pwd
/home/iteblog/streamingdemo
$ mkdir flume/checkpoint
$ mkdir flume/data
$ chmod 777 -R flume
$ export HIVE_HOME=/usr/hdp/current/hive-server2
$ export HCAT_HOME=/usr/hdp/current/hive-webhcat
$ pwd
/home/iteblog/streamingdemo/flume
$ mkdir logs
然后创建一个log4j properties文件: [iteblog@sandbox conf]$ vi log4j.properties
flume.root.logger=INFO,LOGFILE
flume.log.dir=/home/iteblog/streamingdemo/flume/logs
flume.log.file=flume.log
最后我们的Flume Agent配置如下: $ vi flumetohive.conf
flumeagent1.sources = source_from_kafka
flumeagent1.channels = mem_channel
flumeagent1.sinks = hive_sink
# Define / Configure source
flumeagent1.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource
flumeagent1.sources.source_from_kafka.zookeeperConnect = sandbox.hortonworks.com:2181
flumeagent1.sources.source_from_kafka.topic = SalesDBTransactions
flumeagent1.sources.source_from_kafka.groupID = flume
flumeagent1.sources.source_from_kafka.channels = mem_channel
flumeagent1.sources.source_from_kafka.interceptors = i1
flumeagent1.sources.source_from_kafka.interceptors.i1.type = timestamp
flumeagent1.sources.source_from_kafka.consumer.timeout.ms = 1000
# Hive Sink
flumeagent1.sinks.hive_sink.type = hive
flumeagent1.sinks.hive_sink.hive.metastore = thrift://sandbox.hortonworks.com:9083
flumeagent1.sinks.hive_sink.hive.database = raj
flumeagent1.sinks.hive_sink.hive.table = customers
flumeagent1.sinks.hive_sink.hive.txnsPerBatchAsk = 2
flumeagent1.sinks.hive_sink.hive.partition = %y-%m-%d-%H-%M
flumeagent1.sinks.hive_sink.batchSize = 10
flumeagent1.sinks.hive_sink.serializer = DELIMITED
flumeagent1.sinks.hive_sink.serializer.delimiter = ,
flumeagent1.sinks.hive_sink.serializer.fieldnames = id,name,email,street_address,company
# Use a channel which buffers events in memory
flumeagent1.channels.mem_channel.type = memory
flumeagent1.channels.mem_channel.capacity = 10000
flumeagent1.channels.mem_channel.transactionCapacity = 100
# Bind the source and sink to the channel
flumeagent1.sources.source_from_kafka.channels = mem_channel
flumeagent1.sinks.hive_sink.channel = mem_channel
5、启动Flume Agent
使用下面的命令启动Flume Agent: $ /usr/hdp/apache-flume-1.6.0/bin/flume-ng agent -n flumeagent1 -f ~/streamingdemo/flume/conf/flumetohive.conf
6、启动Kafka Stream
作为示例,下面是模拟交易的信息,在实际系统中这些信息将会被数据库产生: $ cd /usr/hdp/2.4.0.0-169/kafka
$ bin/kafka-console-producer.sh --broker-list sandbox.hortonworks.com:6667 --topic SalesDBTransactions
1,"Nero Morris","porttitor.interdum@Sedcongue.edu","P.O. Box 871, 5313 Quis Ave","Sodales Company"
2,"Cody Bond","ante.lectus.convallis@antebibendumullamcorper.ca","232-513 Molestie Road","Aenean Eget Magna Incorporated"
3,"Holmes Cannon","a@metusAliquam.edu","P.O. Box 726, 7682 Bibendum Rd.","Velit Cras LLP"
4,"Alexander Lewis","risus@urna.edu","Ap #375-9675 Lacus Av.","Ut Aliquam Iaculis Inc."
5,"Gavin Ortiz","sit.amet@aliquameu.net","Ap #453-1440 Urna. St.","Libero Nec Ltd"
6,"Ralph Fleming","sociis.natoque.penatibus@quismassaMauris.edu","363-6976 Lacus. St.","Quisque Fringilla PC"
7,"Merrill Norton","at.sem@elementum.net","P.O. Box 452, 6951 Egestas. St.","Nec Metus Institute"
8,"Nathaniel Carrillo","eget@massa.co.uk","Ap #438-604 Tellus St.","Blandit Viverra Corporation"
9,"Warren Valenzuela","tempus.scelerisque.lorem@ornare.co.uk","Ap #590-320 Nulla Av.","Ligula Aliquam Erat Incorporated"
10,"Donovan Hill","facilisi@augue.org","979-6729 Donec Road","Turpis In Condimentum Associates"
11,"Kamal Matthews","augue.ut@necleoMorbi.org","Ap #530-8214 Convallis, St.","Tristique Senectus Et Foundation"
7、接收Hive数据
完成上面所有步骤之后,现在你往Kafka发送数据,你将在几秒内看到数据被发送到Hive中。