关注分享主机优惠活动
国内外VPS云服务器

k8s 和 Audit-- 添加 clickhouse 作为堆接收器

摘要:简介在资源审计和核算方面,容器和虚拟机之间存在显着差异。 它支持许多称为 的输出。 因此,本文主要讲一下如何增加。 事实上,你也可以基于添加和更改来运行,但是你需要安装一些包指令,这会导致镜像更大。 实际运行日志截图 写入性能非常出色,因此运行非常稳定。

简介

从 k8s 资源审计和核算的角度来看,容器和虚拟机之间存在显着差异。 与虚拟机相比,容器并不容易实现。
您可以使用Heapster或Prometheus来收集资源指标。 上一篇文章提到,prometheus的存储瓶颈和大量数据的查询使得它很容易出现OOM。 所以我选择了赫普斯特。 此外,heapster不仅内部实现了许多聚合器和计算器,还执行许多聚合层任务。 prometheus 要求在查询时完成聚合。
Heapster 支持许多称为接收器的指标输出。 目前支持的接收器有:

我更喜欢Clickhouse数据库。 其实我在之前的文章中已经多次介绍过Click House。
因此,本文主要介绍如何向Heapster添加clickhouse接收器。

代码分析与实现

如果你看一下代码,添加一个接收器是非常简单的。 常见的工厂设计模式是简单地实现 Name、Stop 和 ExportData 接口方法。 最后,提供工厂将调用的初始化函数。

初始化方法NewClickhouseSink

具体代码:

config, err := clickhouse_common.BuildConfig(uri) if err != nil { return nil, err } client , error := sql.Open("点击房子", config.DSN) if err != nil { glog.Errorf("连接到 clickhouse: %v", err) return nil, err } sink := &clickhouseSink{ c: *config, client: client, conChan: make( chan struct{}, config.Concurrency), } glog.Infof("已使用选项创建 Clickhouse 接收器:主机:%s 用户:%s db:%s", config.Host, config.UserName, config.Database) return 接收器, nil

基本上就是获取配置文件并初始化clickhouse客户端。

在factory.go的build方法中添加刚刚实现的初始化函数:

 func ( this *SinkFactory) Build(uri flags.Uri) (core.DataSink, error) { switch uri.Key { case "elasticsearch": 返回elasticsearch.NewElasticSearchSink(&uri.Val) case "gcm": 返回 gcm.CreateGCMSink ( &uri.Val) case "stackdriver": return stackdriver.CreateStackdriverSink(&uri.Val) 情况 "statsd": 返回 statsd.NewStatsdSink(&uri.Val) 情况 "graphite": 返回 graphite.NewGraphiteSink(&uri.Val) 情况 "hawkular": 返回 hawkular.NewHawkularSink(&uri.Val) 情况 "influxdb" :返回 influxdb.CreateInfluxdbSink(&uri.Val) 案例“kafka”:返回 kafka.NewKafkaSink(&uri.Val) 案例“librato”:返回 librato.CreateLibratoSink(&uri.Val) 案例“log”:返回 LogSink(), "metric" if nil: 返回metricsink.NewMetricSink(140*time.Second, 15*time.Minute, []string{ core.MetricCpuUsageRate.MetricDescriptor.Name, core.MetricMemoryUsage.MetricDescriptor.Name}) case "opensdb": 返回opentsdb.CreateOpenTSDBSink(&uri.Val) 案例“wavefront”:返回wavefront.NewWavefrontSink(&uri.Val) 案例 'riemann': riemann.CreateRiemannSink(&uri.Val) 案例 'honeycomb': Honeycomb.NewHoneycombSink(&uri.Val) 案例 'clickhouse': return clickhouse.NewClickhouseSink(&uri.Val) 默认值: return nil, fmt.Errorf("Sink 无法识别:%s", uri.Key) }}

名称和停止

func (sink *clickhouseSink) Name() string { return "clickhouse"}func (tsdbSink *clickhouseSink) Stop() { // 不执行任何操作}

stop 函数用于执行某些非托管资源的关闭,在堆关闭时调用。

ExportData

这是中心位置。

func (sink *clickhouseSink) ExportData(dataBatch *core.DataBatch) { sing.Lock() defer sing.Unlock() if err := sing.client.Ping(); err != nil { glog.Warningf("Ping 到 Clickhouse 失败:%v", err) return }dataPoints := make([]point, 0, 0) for _, metricSet := range dataBatch.MetricSets { for metricName, metricValue := range metricSet.MetricValues ​​{ var value float64 if core.ValueInt64 == metricValue.ValueType { value = float64(metricValue.IntValue) } else if core.ValueFloat == metricValue.ValueType { value = float64(metricValue.FloatValue) } else { continue } pt := point{ name: metricName, cluster: sing.c.ClusterName, val: value, ts: dataBatch.Timestamp, } key, value := range metricSet.Labels { if _,exists := clickhouseBlacklistLabels[key];if value != "" { if key == "labels" { lbs := strings.Split(value, ",") for _, lb := range lbs { ts := strings.Split(lb, ":") if len(ts) == 2 && ts[0] != "" && ts[1] != "" { pt.tags = 附加(pt.tags, fmt.Sprintf("%s=%s", ts ) [0], ts[1])) } } } else { pt.tags = 附加(pt.tags, fmt.Sprintf("%s=%s", key, value)) } } } } dataPoints = 附加( dataPoints, pt) if len(dataPoints) >= sinners.c.BatchSize {sink.concurrentSendData(dataPoints) dataPoints = make([]point, 0, 0) } } } if len(dataPoints) >= 0 { sing.concurrentSendData(dataPoints) } sing.wg.Wait()}

主要应注意以下几点。

数据格式转换。 您需要将heapster的DataBatch转换为所需的存储格式。 事实上,对于曾经运行过具有多个输出的管道的任何人来说,这都很容易理解。

批量写入。 一般来说,当数据量较大时,批量写入是一种有效的方法。

根据配置参数同时写入目标存储。 使用 Golang 的协程。 下面的代码实现了一个协程来发送数据。

func (sink *clickhouseSink)并发SendData(dataPoints []point) { sing.wg.Add(1) // 使用通道降低运行接收器的最大并发请求数 阻止直到 .conChan <[ k4] struct{}{} go func(dataPoints []point) { sing.sendData(dataPoints) }(dataPoints)}

获取配置参数

这块主要是clickhouse.go执行检索配置参数、初始化参数以及验证 dockerfile 中配置参数的任务。更改

原始基础映像基于暂存

FROMScratchCOPY heapsterevener /COPY ca-certificates.crt /etc/ssl/certs/#nobody:nobodyUSER 65534:65534ENTRYPOINT ["/ heapster"]

由于需要更改时区,改为基于alpine。

FROM alpineRUN apk 添加 -U tzdataRUN ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtimeCOPY heapsterevener /COPY ca-certificates.crt /etc/ssl / certs/RUN chmod +x /heapsterENTRYPOINT ["/heapster"]

其实你也可以添加时区或者基于scratch进行更改,但是有些包需要安装说明,结果会是: 图像变大。 与其做这样的事情,我觉得基于我熟悉的alpine来实现会更好。

概述

分叉项目的地址。 实际运行日志截图:

ck的写入性能非常出色,因此运行非常稳定。

未经允许不得转载:主机频道 » k8s 和 Audit-- 添加 clickhouse 作为堆接收器

评论 抢沙发

评论前必须登录!