本节内容以go语言设计一个简易的日志收集系统,并且完成日志的客户端开发。
项目背景
每个系统都有日志,当系统出现问题的时候需要通过日志解决问题。
当系统机器比较少时,登录服务器即可查看日志。但当机器规模较大时,登录机器看就不太现实,这个时候就需要日志收集系统。
解决方案
- 把机器上的日志实时收集,统一存储到一个中心系统。
- 对日志建立索引,通过搜索即可找到对应的日志。
- 通过提供友好的web界面,完成日志的搜索查找。
面临的问题
- 实时日志量非常大,每天几十亿条
- 日志准时收集,延迟控制在分钟级别
- 能够水平扩展
业界方案ELK
ELK的方案架构如下,
ELK存在的问题:
- 运维成本高,每增加一个日志收集,都需要手动修改配置
- 监控缺失,无法准确获取logstash的状态
- 无法做定制化开发以及维护
我们的日志收集系统
各组件介绍如下
- Log Agent,日志收集客户端,用来收集服务器上的日志
- Kafka,高吞吐量的分布式队列,linkin开发,apache顶级开源项目。使用Kafka可以实现日志收集和日志处理的解耦。
- ES,elasticsearch,开源的搜索引擎,提供基于http restful的web接口
- Hadoop,分布式计算框架,能够对大量数据进行分布式处理的平台
- Storm,
Storm
是Twitter开源的分布式实时大数据处理框架,被业界称为实时版Hadoop。 - 除了图中的基本部件外,Log Agent需要连接ETCD和WEB界面进行日志存储和日志配置。
Kafka应用场景
1. 异步处理, 把非关键流程异步化,提高系统的响应时间和健壮性
2. 应用解耦,通过消息队列
3. 流量削峰
zookeeper应用场景
1. 服务注册&服务发现
2. 配置中心
3. 分布式锁
- Zookeeper是强一致的
- 多个客户端同时在Zookeeper上创建相同znode,只有一个创建成功
项目实践
我们的项目实践又如下几部分内容构成:
- linux上的kafka安装
- 生产者log agent开发
- etcd+contex+kafka消费者开发
- WEB日志管理平台开发
kafka安装
- 安装JDK,从oracle下载最新的SDK安装
- 安装zookeeper3.3.6,下载地址:http://apache.fayea.com/zookeeper/
- mv conf/zoo_sample.cfg conf/zoo.cfg
- 编辑 conf/zoo.cfg,修改dataDir=D:\zookeeper-3.3.6\data\
- 运行bin/zkServer.cmd
- 安装kafka
- 打开链接:http://kafka.apache.org/downloads.html下载kafka2.1.2
- 打开config目录下的server.properties, 修改log.dirs为D:\kafka_logs, 修改advertised.host.name=服务器ip
- 启动kafka ./bin/windows/kafka-server-start.bat ./config/server.preperties
生产者log agent
我们构建一个日志客户端log agent作为生产者。需要用到的go的包为:
- Import “github.com/Shopify/sarama":基于sarama第三方库开发的 kafka client,往kafka里面发送消息
- Import “github.com/hpcloud/tail”:HP团队出的tail库,常用于日志收集
- Import “github.com/astaxie/beego/config”
- Import “github.com/astaxie/beego/logs”
log agent设计
我们的go语言设计log agent用到的模块和模块数据流如下:
kafka示例代码
Import “github.com/Shopify/sarama"
程序示例:
package main
import (
"fmt"
"github.com/Shopify/sarama"
)
func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true
msg := &sarama.ProducerMessage{}
msg.Topic = "nginx_log"
msg.Value = sarama.StringEncoder("this is a good test, my message is good")
client, err := sarama.NewSyncProducer([]string{"192.168.31.177:9092"}, config)
if err != nil {
fmt.Println("producer close, err:", err)
return
}
defer client.Close()
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send message failed,", err)
return
}
fmt.Printf("pid:%v offset:%v\n", pid, offset)
}
tailf介绍
tailf组件使用
Import “github.com/hpcloud/tail”
程序示例:
package main
import (
"fmt"
"github.com/hpcloud/tail"
"time"
)
func main() {
filename := "./my.log"
tails, err := tail.TailFile(filename, tail.Config{
ReOpen: true,
Follow: true,
Location: &tail.SeekInfo{Offset: 0, Whence: 2},
MustExist: false,
Poll: true,
})
if err != nil {
fmt.Println("tail file err:", err)
return
}
var msg *tail.Line
var ok bool
for true {
msg, ok = <-tails.Lines
if !ok {
fmt.Printf("tail file close reopen, filename:%s\n", tails.Filename)
time.Sleep(100 * time.Millisecond)
continue
}
fmt.Println("msg:", msg)
}
}
配置文件库使用
Import “github.com/astaxie/beego/config”
程序示例:
package main
import (
"fmt"
"github.com/astaxie/beego/config"
)
func main() {
conf, err := config.NewConfig("ini", "./logcollect.conf")
if err != nil {
fmt.Println("new config failed, err:", err)
return
}
port, err := conf.Int("server::port")
if err != nil {
fmt.Println("read server:port failed, err:", err)
return
}
fmt.Println("Port:", port)
log_level, err := conf.Int("log::log_level")
if err != nil {
fmt.Println("read log_level failed, ", err)
return
}
fmt.Println("log_level:", log_level)
log_path := conf.String("log::log_path")
fmt.Println("log_path:", log_path)
}
日志库的使用
Import “github.com/astaxie/beego/logs”
程序示例:
package main
import (
"encoding/json"
"fmt"
"github.com/astaxie/beego/logs"
)
func main() {
config := make(map[string]interface{})
config["filename"] = "./logs/logcollect.log"
config["level"] = logs.LevelDebug
configStr, err := json.Marshal(config)
if err != nil {
fmt.Println("marshal failed, err:", err)
return
}
logs.SetLogger(logs.AdapterFile, string(configStr))
logs.Debug("this is a test, my name is %s", "stu01")
logs.Trace("this is a trace, my name is %s", "stu02")
logs.Warn("this is a warn, my name is %s", "stu03")
}
消费者etcd+contex+kafka
etcd介绍
etcd介绍与使用:
- 概念:高可用的分布式key-value存储,可以使用配置共享和服务发现
- 类似项目:zookeeper和consul
- 开发语言:Go
- 接口:提供restful的http接口,使用简单
- 实现算法:基于raft算法的强一致性、高可用的服务存储目录
etcd的应用场景:
- 服务发现和服务注册
- 配置中心
- 分布式存储
- master选举
context使用介绍
contex主要作用如下:
- 如何控制goroutine
- 如何保存上下文数据
消费者具体程序运行过程参考:
]
WEB日志管理平台
我们会用到如下知识点:
- ElasticSearch介绍与使用
- kibana介绍与使用
1. ElasticSearch安装
详见上节内容
2. kibana安装
(1) 下载ES,下载地址:https://www.elastic.co/start
(2)解压缩
(3)启动kibana, ./bin/kibana.bat
(4)在浏览器中访问: http://localhost:5601
Username: elastic Passwd: changeme
3. nginx安装
(1)下载nginx,下载地:https://nginx.org
(2)解压缩
(3)启动nginx, ./nginx
(4)在浏览器中访问: http://localhost
4. mysql事务
(1) 原子性
(2)一致性
(3)隔离性
(4)持久性
5. mysql事务
(1)import (“github.com/jmoiron/sqlx")
(2)Db.Begin()开始事务
(3)Db.Submit()提交事务
(4)Db.Rollback() 回滚事务
6. Beego web开发
(1)规划好url
(2)添加路由
(3)开发controller,继承beego.Controller
(4)测试
Beego 模板渲染
1)把需要传给模板的变量赋值给beego.controller里面的Data字段
2)实现模板逻辑