之前的一篇文章【Flink系列】构建实时计算平台——特别篇,用InfluxDb收集Flink Metrics
,里面写道
Influxdb 1.8,100个作业的情况下, 内存占用峰值会超过20GB,这个时候容器会自动重启,客户端无法上报
现在又过了一年多,现在部署于K8s中的Influxdb占用内存经常超过90%,经过检查,发现有几个Flink作业,在修改InfluxDB Reporter之前就提交过了,仍然在大量上报指标。
说明解决方案还不够好。
目标
- 需要找到一种,可以不重启用户作业的方法,直接过滤指标。
- 因为Flink作业支持Prometheus的指标上报,使用InfluxDB仅收集Checkpoint类型的指标,所以需要过滤其他所有指标。
过程
大约6年前,早在大四的时候,有室友玩过OpenResty,其实就是Nginx™ + Lua™,那个时候便知道了这种技术。
于是花了两个晚上,下班回家熬夜,再加上白天工作的时候,忙里抽空 对着Lua 官方文档 + NginxLuaModule鼓捣出来了。
由于InfluxDb官方经常翻,便知道他的文档专门讲了协议,幸好是纯文本的协议。
HTTP报文如下:
POST /write?db=var1&rp=retentionPolicy&precision=[h/m/s/ms/u/n]&consistency=
Header: BasicAuth
Content-Type: text/plain; charset=UTF-8
Body
<measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]
<measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]
<measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]
知道协议之后,就可以通过HTTP请求体来过滤 measurement,将不需要的指标全部删除。
方法
使用OpenResty的 access_by_lua 方法,访问读取请求体:
不完全的代码
access_by_lua_block {
ngx.req.read_body()
local data = ngx.req.get_body_data()
//...slankka...
for s in data:gmatch("[^\r\n]+") do
if string.find(s, "jobmanager_job_lastCheckpointExternalPath") then
ngx.req.set_body_data(s)
ngx.log(ngx.ERR, "Captured success:", s)
break
end
end
//...slankka...
}
验证
方法一:可通过抓包的方式验证。
抓包的方法是,tcpdump 直接在InfluxDB所在的服务器进行抓包。
tcpdump tcp -t -s 0 -c 100 port 8086 and src net 10.11.12.13 -w ./influxdb_slankka_traffic.cap
方法二:通过Influx 客户端 InfluxQueryLanguage语法
show measurements on slankka;
//应该只看到这一个指标
jobmanager_job_lastCheckpointExternalPath
方法三:通过Influxdb厂家的Chronograf,直接连接到Influxdb进行查看
上一篇关于InfluxDB的文章【Flink系列】构建实时计算平台——特别篇,用InfluxDb收集Flink Metrics
已经有Chronograf的安装说明,不再赘述。
总结
OpenResty 为终极优化方案,效果极为优异。
调试Lua,需要多看OpenResty官方文档,最终指标必须完全符合自己的Lua脚本逻辑,才算完成。
额外
本片文章 刚好可以作为下列问题的临时可靠解决方案