关注分享主机优惠活动
国内外VPS云服务器

解读k8s和log--journalbeat的源码

总结:但是也存在很多问题。 随着新设备的出现和对安全性的日益重视,这些缺点变得越来越明显。 例如,无法验证日志消息的内容。 数据格式松散。 日志检索效率低下。 无法记录有限的元数据存储。 该服务可以向您的项目添加一定量的元数据,例如二进制数据。

简介

日志系统的重要性是不言而喻的。 参考沪江日志系统的介绍,基本上日志数据在以下几个方面发挥着非常重要的作用:

数据检索:通过检索日志信息,识别相应的Bug,并找到解决方案

p>

服务诊断:收集日志信息并进行分析,了解服务器负载和服务情况运行状态

数据分析:可以进行进一步的数据分析,例如根据请求Masu中的课程ID找到用户最感兴趣的前10个课程。

日志+大数据+AI留下了很大的想象空间。
说到收集系统,流行的技术栈包括从以前的elk到现在的efk。 Logstash 已被 filebeat 取代。 当然,还有flume、fluidd等日志收集代理。 特别是,fluidd是cncf组织的产品,在k8s中得到广泛应用。 然而,由于 fluidd 是用 Ruby 编写的,因此并不能帮助你深入理解源代码。 当然,今天我们重点关注另一个代理--期刊beat。 顾名思义,这是 efk 堆栈的 Beats 系列的成员,专门用于收集 Journald 日志。

解读journalbeat源代码日志日志概述

Syslog长期以来一直是所有Unix系统的重要组成部分。 多年来,不同的 Linux 发行版中已经有各种实现来执行类似的任务。 它们采用相似的逻辑并且本质上使用相同的文件格式。 然而syslog也存在很多问题。 随着新设备的引入和对安全性的日益重视,日志消息内容无法验证、数据格式不规则、日志记录效率低下、元素受限等弊端日益凸显。你要保存数据,不能记录二进制数据等。
Journald解决了以上需求。 受 udev 事件启发的日志条目与环境块类似。 键值字段使用大写变量名称并用换行符分隔。 除了支持 ASCII 格式字符串外,还可以支持 ATA SMART 健康信息和 SCSI 数据等二进制数据。 应用程序和服务可以通过将项目域传递给 systemdjournald 服务来构建项目。 该服务可以向您的项目添加一定量的元数据。 这些可信域值由日志服务确定,客户端无法伪造。 Journald 允许您独立于处理器架构导出和异地读取日志数据。 这对于嵌入式设备来说是一个非常有用的功能,可以让维护人员更方便地分析设备的运行状态。
总结:

日志记录是新 Linux 系统的一项功能。

日记与传统的文件存储方式不同,它是二进制存储。 需要使用journalctl来查看。

Docker 对journald 的支持

journald 日志驱动程序将容器日志发送到systemd 日志。 可以使用journalctl命令、journal API或使用dockerlog命令检索日志条目。
除了json等日志格式之外,docker现在还有journald驱动。

我们目前的使用场景

对于我们的k8s集群来说,我们对所有docker输出日志格式都使用journald,因此主机的centos系统日志和docker日志都是使用journalbeat收集的。

journalbeat实现关键

journalbeat的整个实现过程基本上有两点。

遵循Beat框架和约定,以及其他社区贡献的Beats系列,例如packetbeat和mysqlbeat。 Journalbeat 仅实现 run 和 stop 方法。 然后,作为客户,您将收集到的数据发布到 Beats。

coreos开源go-systemd库的sdjournal部分用于读取journald日志。 其实sdjournal是使用cgo对journald log c接口的封装。

源码解读

程序入口:

package mainimport ( "log" "github.com/elastic/beats/libbeat/beat" "github. com / mheese/journalbeat/beater")func main() { err := Beat.Run("journalbeat", "", Beater.New) if err != nil { log.Fatal(err) }}

Journalbeat整体上一共实现了三个方法。 运行,停止,然后新的。
运行和停止,顾名思义,是控制运行和停止journalbeat的节拍。
And new:
必须遵循

// Creator 初始化并配置一个新的 Beater 实例用于执行。  // 实现Beat的run-loop.type Creator func(*Beat, * common.Config) (Beater, error)

Creator方法并返回返回的 Beater 实例由 Beat 控制。
具体实现:

// New 创建 Beaterfunc New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) { config := config .DefaultConfig var err error if err = cfg.Unpack(&config); err != nil { return nil, fmt.Errorf("读取配置文件时出错: %v", err) } jb := &Journalbeat{ config: config, ned: make(chan struct{ }),cursorChan: make(chan string ), 待处理: make(chan *eventReference), 已完成: make(chan *eventReference, config.PendingQueue.CompletedQueueSize), } if err = jb.initJournal (); logp.Err("无法连接到 Systemd 日志:%v", err) return nil, err } jb.client = b.Publisher.Connect() return jb, nil} 

所有常见的 Beats 都有共同属性。 例如,以下done和client 属性。

//Journalbeat为主Journalbeat structtype Journalbeat struct { Done chan struct{} config config.Config client Publisher.Client Journal *sdjournal.Journal CursorChan chan string待处理,已完成chan *eventReference wgsync.WaitGroup} 

Done 是一个信号量,控制整个搅拌器的启动和停止。
客户端是与beats平台通信的客户端。 请注意,在初始化期间,会建立

jb.client = b.Publisher.Connect()

链接。
数据采集和发送时,也是通过客户端发送的。

select { case <-jb.done: return nil default: // 必须克隆以避免冲突,因为映射是指针... jb.client.PublishEvent(ref .body.Clone() ,publisher.Signal(&eventSignal{ref, jb.completed}),publisher.Guaranted) }

请注意上面的发送姿势以及前面提到的 Done 信号量的使用。
另一种方式是商业相关细节我就不多说了。

journalbeat是如何保证发送失败的日志重新提交的?

我个人认为这是我感觉最优雅的部分

全部。发送失败的日志在程序内发送。 退出前保存为json格式文件即可完成持久化。

 // 退出时完全消耗队列并刷新挂起队列到磁盘 defer func() { var wgsync.WaitGroup wg.Add(2) go func() { defer wg.Done() for evRef := 范围 jb.pending { 待处理[evRef.cursor] = evRef.body } }() go func() { defer wg.Done() for evRef := 范围 jb.completed { 完成[evRef.cursor] = evRef 。 body } }() wg.Wait() logp.Info("正在保存待处理队列。由 %d 条消息组成", len(diff(待处理, 已完成))) if err := flash(diff(pending, 已完成) , jb .config.PendingQueue.File); 错误!= nil {logp.Err("Errorwritingpendingqueue%s:%s", jb.config.PendingQueue.File, err) } }()

当程序启动时,之前持久化编码的传出文件为读。 失败日志,重新提交

// 加载之前保存的未发送事件队列,如果存在则尝试发布它们 if err := jb.publishPending(); err != nil { logp.Warn("Could未读取待处理队列:%s",err) }

客户端将收集的日志发布到 Beats,设置publisher.Warranty 模式,提供反馈。 成功与失败

jb .client.PublishEvent(ref.body.Clone(),published.Signal(&eventSignal{ref, jb.completed}),publisher.Guaranted)

其中Publisher.Signal(&eventSignal{ref, jb.completed }) 类似于回调。 如果成功,则将成功的 ref 写入 jb.completed。 对于客户端控制很有用。

维护了两个通道,一个用于存储客户端发送的日志,一个用于存储服务器成功接收的日志。 准确的比较允许您记录失败的提交并输入重新提交操作。

journalbeat 结构有两个属性:

 待处理,已完成 chan *eventReference

每当客户端发送日志时,日志就会被写入挂起。

casepublishedChan <- jb.client.PublishEvent(event,published.Signal(&eventSignal{ref, jb.completed}),publisher.Guaranted): 如果已发布 := <-publishedChan ; { jb.pending <- ref // 保存光标 if jb.config.WriteCursorState { jb.cursorChan <- rawEvent.Cursor } } }

publisher.Signal(&eventSignal{ref , jb . Completed}),回调将成功结果写入completed。
整个程序还启动了一个
go jb.managePendingQueueLoop()
协程,专门用于定期重新发送失败日志。

// ManagePendingQueueLoop 运行一个循环,管理一组等待 ACK 的事件 func (jb *Journalbeat) managePendingQueueLoop() { jb.wg.Add(1) defer wg.Done()ending. : = 地图[字符串]通讯on.MapStr{} Completed := map[string]common.MapStr{} // diff 返回此地图与其他地图之间的差异。   diff := func(this, other map[string]common.MapStr) map[string] common.MapStr { result := map[string]common.MapStr{} for k, v := range this { if _, ok : = other[k]; !ok { result[k] = v } } return result } // Flash 将 map[string]common.MapStr 保存到磁盘上的 JSON 文件 flash := func(source map[ string]common.MapStr , dest string) error { tempFile, err := ioutil.TempFile(filepath.Dir( dest), fmt.Sprintf(".%s", filepath.Base(dest))) if err != nil { return err } if err = json.NewEncoder (tempFile).Encode(source); err != nil { _ = tempFile.Close();return err } _ = tempFile.Close() return os.Rename(tempFile.Name(), dest) } // 完全消耗两个队列并在退出时将待处理队列刷新到磁盘 defer func( ) { var wgsync.WaitGroup wg . Add(2) go func() { defer wg.Done() for evRef := range jb.pending {ending[evRef.cursor] = evRef.body } }() go func( ) { defer wg.Done() for evRef := range jb.completed {completed[evRef.cursor] = evRef.body } }() wg.Wait() logp.Info("正在保存待处理队列由%d条消息组成", len(diff(pending, Completed) )) if err := flash(diff(pending, completed), jb.config.PendingQueue.File); err != nil { logp.Err("写入待处理队列 %s 时出错:%s", jb.config.PendingQueue.File, err) } }() // 定期将待处理队列刷新到磁盘 check := time.Tick (jb.config. PendingQueue.FlushPeriod) for { select { case <-jb.done: 返回 case p, ok := <-jb.pending: if ok {ending[p.cursor] = p.body } case c, ok := <-jb.completed: if ok { Completed[c.cursor] = c.body } case <-tick: result := diff(pending, Completed) if err := flash (result, jb. config.PendingQueue.File); err != nil { logp.Err("写入 %s 时出错:%s", jb.config.PendingQueue.File, err) }ending = 结果已完成 did = map[string]common.MapStr{} } }}

概述

当然还有其他几个细节,我就不一一赘述了。 比如有定期写游标的功能,日志格式转换等。 详细信息请参见源代码。 我主要讲了我认为什么是优雅以及写beaters需要考虑什么。

未经允许不得转载:主机频道 » 解读k8s和log--journalbeat的源码

评论 抢沙发

评论前必须登录!