关注分享主机优惠活动
国内外VPS云服务器

使用k8s和logs-- golang实现Fluent Bit输出插件

摘要:介绍已实现的插件。 目前有很多组件用于收集和处理社区日志。 大多数都是以前的解决方案、社区、解决方案、大数据中使用的。 实现集中日志收集的合适解决方案。 主要负责采集、处理、传输。

使用golang实现Fluent Bit的输出插件简介

目前社区日志采集的组件有很多,我们处理它们。 此前大数据常用的方案有elk Logstash方案、cncf社区的Fluentd、efk方案的filebeat、flume等。 Fluent Bit是一个用C语言编写的高性能日志收集组件。 整个架构源自fluidd。 官方对比数据为:

Fluentd Fluent Bit
范围 容器/服务器 容器/服务器
语言 C 和 Ruby C
内存 ~40MB ~450KB
性能 高性能 高性能
依赖项 It是作为红宝石建造的,因此需要一定数量的宝石。 零依赖,除非特殊插件需要。
插件 超过 650 个可用插件 大约 35 个可用插件
许可证 Apache License v2.0 Apache License v2.0

查看数据可以看到流畅位是怎样处理的执行。 增加更多资源。 想要实现集中式日志收集,适合采用Fluent Bit+FluentD方案。 Fluent bit主要负责收集,fluentd负责处理和发送。

扩展输出插件

Fluent bit本身是用C语言编写的,导致插件扩展困难。 或许官方考虑到了这一点,实现了 Fluent-bit-go,它允许你使用 go 语言创建插件。 目前仅支持写入输出。
Fluent-bit-go实际上是使用cgo来封装C接口。 代码比较简单,主要分析一个主文件。

打包出/*#include #include "flb_plugin.h"#include "flb_output.h"*/import "C" import " fmt"import "unsafe" // Fluent Bit定义与 coreconst FLB_ERROR = C.FLB_ERRORconst FLB_OK = C.FLB_OKconst FLB_RETRY = C.FLB_RETRYconst FLB_PROXY_OUT 匹配的常量Put_plugin = c.flb_proxy_output_pluginConst FLB_PROXY_GOLANG = c.flb_proxy_golang // 定义插件的本地类型 DefinitionType Flbplugin C.Struct_ flb_plugin_proxytype Flboutplugin C.Struct_flbgo_output_plugin // 当触发 FlbpluginInit 时, // 传递插件上下文,下一步调用此 FLBPluginRegister( ) 函数并填写 // 必需的信息:类型、代理类型、标志名称和 // 描述。  func FLBPluginRegister(ctx unsafe.Pointer, name string, desc string) int { p := (*FLBPlugin) (unsafe.Pointer(ctx)) p._type = FLB_PROXY_OUTPUT_PLUGIN p.proxy = FLB_PROXY_GOLANG p.flags = 0 p.name = C.CString(name) p.description = C.CString(desc) return 0}//释放插件初始化函数FLBPluginU分配的资源nregister(ctx unsafe.Pointer) { p := (*FLBPlugin) (unsafe.Pointer(ctx)) fmt.Printf("[flbgo] 取消注册 %v", p) C.free(unsafe.Pointer(p.name) ) C.free(unsafe.Pointer(p.description))}func FLBPluginConfigKey(ctx unsafe.Pointer, key string) string { _key := C.CString(key) return C.GoString(C.output_get_property(_key, unsafe. Pointer(ctx)))}

主要用于创建插件,比如用于注册组件的FLBPluginRegister,用于获取配置文件配置参数的FLBPluginConfigKey,定义一些必要的变量和方法。
P.S.
其实是用golang来调用 Fluent-bit-go 也实现了实际的业务逻辑,最后将 c[k4 ]share .so 编译成动态链接库。

自定义 Fluent-bit-kafka-output 插件

实际上,kafka 输出插件是在 Fluent-bit Masu.0 之后提供的。 .13. 然而,在实际项目中,它无法满足您的需求,您需要对其进行自定义。
当然,下面的代码主要作为演示来解释如何创建输出插件。

编写和分析代码

代码优先:

package mainimport ( "C" "fmt" "io" "log" "reflect” “strconv” “字符串” “时间” “不安全” “github.com/Shopify/sarama” “github.com/fluence/fluid-bit-go/output” “github.com/ugorji/go " /codec")var ( Brokers []string Producer sarama.SyncProducer timeout = 0 * time.Minute topic string module string messageKey string)//export FLBPluginRegisterfunc FLBPluginRegister(ctx unsafe.Pointer) int { return Output.FLBPluginRegister(ctx, " out_kafka ", "Kafka 输出插件.!")}//导出 FLBPluginInit// ctx (context) 指向 Fluentbit 上下文的指针 (state/c 代码)func FLBPluginInit(ctx unsafe.Pointer) int { if bs := Output.FLBPluginConfigKey ( ctx , "Broker"); bs != "" { Broker = strings.Split(bs, ",") } else { log.Printf("必须配置 Broker") 返回输出。  FLB_错误if tp := Output.FLBPluginConfigKey(ctx, "topic"); tp != "" { topic = tp } else { log.Printf("必须配置主题") return Output.FLB_ERROR } if mo := 输出。    FLBPluginConfigKey(ctx, "module"); mo != "" { module = mo } else { log.Printf("必须配置模块") return Output.FLB_ERROR } if key := Output.FLBPluginConfigKey(ctx, "message_key" ); key != "" { messageKey = key } else { log.Printf("message_key 必须设置") return Output.FLB_ERROR } config := sarama.NewConfig() config.Producer.Return. Successes = true 如果 required_acks : = 输出 .FLBPluginConfigKey(ctx, "required_acks");requiredAcks = sarama.RequiredAcks(acks) } } if 压缩编解码器 := Output.FLBPluginConfigKey(ctx, "压缩编解码器"); Compression_codec != "" { if 编解码器, error := strconv.Atoi(compression_codec); { config .Producer. Compression = sarama. CompressionCodec(codec) } } if max_retry := Output.FLBPluginConfigKey(ctx, "max_retry"); max_retry != "" { 错误 if max_retry := strconv.Atoi(max_retry) ; = nil { config.Producer.Retry.Max = max_retry } } if timeout == 0 { timeout = 5 * time.Minute } // 如果 Kafka 未运行,则等待 Kafka 连接的截止时间 init 将 (timeout) 添加到 := time .Now().try := 0;   time.Now().Before(deadline); Try++ { var err 错误 if Producer == nil { producer, err = sarama.NewSyncProducer(brokers, config) } if err == nil { return Output.FLB_OK } log.Printf("无法连接到 Kafka: (%s) 正在重试...", err) 睡眠时间。 ( time.Second * 30) } log.Printf("Kafka failed toResponse after %s", timeout) return Output.FLB_ERROR}//导出 FLBPluginFlush//如果需要发送数据,则 FLBPluginFlush 是 Fl​​uent- 从 bit 调用。    当需要发送数据时由 Fluent-bit 调用。  func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int { var h codec.MsgpackHandle var b []byte var minterface{ } var err error b = C.GoBytes(data, length) dec : = codec.NewDecoderBytes(b, &h) // 迭代原始 MessagePack 数组 var msgs []*sarama.ProducerMessage for {// 解码 msgpack 数据 err = dec.Decode(&m) if err != nil { if err == io.EOF { Break } log.Printf("无法解码 msgpack 数据: %v", err) 输出返回。    FLB_ERROR } // 获取切片及其两个条目:时间戳和映射。  Slide := Reflect.ValueOf(m) data := slip.Index(1) // 将切片数据转换为实际地图并迭代它。  mapData := data.Interface ().(map[interface{}]interface{}) flattenData, err := Flatten(mapData, "", UnderscoreStyle) if err != nil {break } message := "" host := "" for k, v := range flattenData { value := "" switch t := v.(type) { case string: value = tcase []byte: value = string(t) 默认值: value = fmt.Sprintf("%v", v) } if k == "pod_name" { host = value } if k == messageKey { message = value } } if message == "" || host == "" { Break } m := &sarama.ProducerMessage{ topic: topic, key: sarama.StringEncoder(fmt.Sprintf("host=%s|module=%s", host , module)), value: sarama .ByteEncoder(message), } msgs = append(msgs, m) } err =Producer.SendMessages(msgs) if err != nil { log.Printf("发送 kafka 消息失败: % s”,错误)输出。   FLB_ERROR } return Output.FLB_OK}//FLBPluginExitfunc 导出FLBPluginExit() int {Producer.Close() return Output.FLB_OK}func main() {}

FLBPluginExit 插件退出时应执行的方法,例如关闭连接。

FLBPluginRegister 注册一个插件。

FLBPluginInit 插件初始化

FLBPluginFlush 将数据刷新到输出。

FLBPluginConfigKey获取配置文件中的参数

补充说明
当然,除了FLBPluginConfigKey之外,你还可以通过以下方式获取配置参数获取环境变量也可以。
ctx对应一个上下文,负责上下文之间的数据传输。

编译并运行

编译时间

 继续构建 -buildmode=c-shared -o out_kafka.so。  

生成out_kafka.so

运行时

/fluid-bit/bin/fluid-bit" -c /fluid[ k4 ]bit /etc / Fluent-bit.conf -e / Fluent-bit/out_kafka.so

总结

使用类似的书写结构,可以自定义你自己的写作结构。

未经允许不得转载:主机频道 » 使用k8s和logs-- golang实现Fluent Bit输出插件

评论 抢沙发

评论前必须登录!