拨开荷叶行,寻梦已然成。仙女莲花里,翩翩白鹭情。
IMG-LOGO
主页 文章列表 Python呼叫Prometheus监控资料并计算

Python呼叫Prometheus监控资料并计算

白鹭 - 2022-02-18 2071 0 0

Prometheus是什么

Prometheus是一套开源监控系统和告警为一体,由go语言(golang)开发,是监控+报警+时间序列数
据库的组合,适合监控docker容器,因为kubernetes(k8s)的流行带动其发展,

Prometheus的主要特点

  • 多维度资料模型,由指标名称和键/值对标识的时间序列资料,
  • 作为一个时间序列数据库,其采集的资料会以档案的形式存盘在本地中,
  • 灵活的查询语言,PromQL(Prometheus Query Language)函式式查询语言,
  • 不依赖分布式存盘,单个服务器节点是自治的,
  • 以HTTP方式,通过pull模型拉取时间序列资料,
  • 也可以通过中间网关支持push模型,
  • 通过服务发现或者静态配置,来发现目标服务物件,
  • 支持多种多样的图表和界面展示,

Prometheus原理架构图

image-20211202173238726

Prometheus基础概念

什么是时间序列资料

时间序列资料(TimeSeries Data) : 按照时间顺序记录系统、设备状态变化的资料被称为时序资料,

应用的场景很多,如:

  • 无人驾驶运行中记录的经度,纬度,速度,方向,旁边物体距离等,
  • 某一个地区的各车辆的行驶轨迹资料,
  • 传统证券行业实时交易资料,
  • 实时运维监控资料等,

时间序列资料特点:

  • 性能好、存盘成本低

什么是targets(目标)

Prometheus 是一个监控平台,它通过抓取监控目标(targets)上的指标 HTTP 端点来从这些目标收集指标,

安装完Prometheus Server端之后,第一个targets就是它本身,

具体可以参考官方档案

什么是metrics(指标)

Prometheus存在多种不同的监控指标(Metrics),在不同的场景下应该要选择不同的Metrics,

Prometheus的merics型别有四种,分别为Counter、Gauge、Summary、Histogram,

  • Counter:只增不减的计数器
  • Gauge:可增可减的仪表盘
  • Histogram:分析资料分布情况
  • Summary:使用较少

简单了解即可,暂不需要深入理解,

通过浏览器访问http://被监控端IP:9100(被监控埠)/metrics

就可以查到node_exporter在被监控端收集的监控信息

什么是PromQL(函式式查询语言)

Prometheus内置了一个强大的资料查询语言PromQL, 通过PromQL可以实作对监控资料的查询、聚合,

同时PromQL也被应用于资料可视化(如Grafana)以及告警当中,

通过PromQL可以轻松回答以下问题:

  • 在过去一段时间中95%应用延迟时间的分布范围?
  • 预测在4小时后,磁盘空间占用大致会是什么情况?
  • CPU占用率前5位的服务有哪些?(过滤)

具体查询细节可以参考官方,

如何监控远程Linux主机

安装Prometheus组件其实很简单,下载包--解压--后台启动运行即可,不做具体演示,

在远程linux主机(被监控端)上安装node_exporter组件,可看下载地址

image-20211203114053559

下载解压后,里面就一个启动命令node_exporter,直接启动即可,

nohup /usr/local/node_exporter/node_exporter >/dev/null 2>&1 &
lsof -i:9100

nohup:如果直接启动node_exporter的话,终端关闭行程也会随之关闭,这个命令帮你解决问题,

Prometheus HTTP API

Prometheus 所有稳定的 HTTP API 都在 /api/v1 路径下,当我们有资料查询需求时,可以通过查询 API 请求监控资料,提交资料可以使用 remote write 协议或者 Pushgateway 的方式,

支持的 API

API 说明 需要认证 方法
/api/v1/query 查询界面 GET/POST
/api/v1/query_range 范围查询 GET/POST
/api/v1/series series 查询 GET/POST
/api/v1/labels labels 查询 GET/POST
/api/v1/label/<label_name>/values label value 查询 GET
/api/v1/prom/write remote write 资料提交 remote write
Pushgateway pushgateway 资料提交 SDK

认证方法

默认开启认证,因此所有的界面都需要认证,且所有的认证方式都支持 Bearer Token和 Basic Auth,

呼叫界面的时候,我们需要携带Basic Auth请求头的认证,否则会出现401,

Bearer Token

Bearer Token 随着实体产生而生成,可以通过控制台进行查询,了解 Bearer Token 更多信息,请参见 Bearer Authentication,

Basic Auth

Basic Auth 兼容原生 Prometheus Query 的认证方式,用户名为用户的 APPID,密码为 bearer token(实体产生时生成),可以通过控制台进行查询,了解 Basic Auth 更多信息,请参见 Basic Authentication,

资料回传格式

所有 API 的回应资料格式都为 JSON,每一次成功的请求会回传 2xx 状态码,

无效的请求会回传一个包含错误物件的 JSON 格式资料,同时也将包含一个如下表格的状态码:

状态码 含义
401 认证失败
400 当自变量缺失或错误时回传无效的请求状态码
422 当一个无效的表达式无法被指定时 (RFC4918)
503 当查询不可用或者被取消时回传服务不可用状态码

无效请求回应回传模板如下:

{
"status": "success" | "error",
"data": <data>,
 // 当 status 状态为 error 时,下面的资料将被回传
"errorType": "<string>",
"error": "<string>",
 // 当执行请求时有警告信息时,该栏位将被填充回传
"warnings": ["<string>"]
}

资料写入

运维程序不需要对资料进行写入,所以暂时不深入理解,

有兴趣的同学可以看看官方档案

监控资料查询

当我们有资料查询需求时,可以通过查询 API 请求监控资料,

  • 查询 API 界面
GET /api/v1/query
POST /api/v1/query

查询自变量:

? query= : Prometheus:查询表达式,

? time= <rfc3339 | unix_timestamp>: 时间戳, 可选,

? timeout= :检测超时时间, 可选, 默认由 -query.timeout 自变量指定,

  • 简单的查询

查询当前状态为up的监控主机:

curl -u "appid:token" 'http://IP:PORT/api/v1/query?query=up'
  • 范围查询
GET /api/v1/query_range
POST /api/v1/query_range

根据时间范围查询需要的资料,这也是我们用得最多的场景,

这时我们需要用到 /api/v1/query_range 界面,示例如下:

$ curl 'http://localhost:9090/api/v1/query_range?query=up&start=2015-07-01T20:10:30.781Z&end=2015-07-01T20:11:00.781Z&step=15s'
{
   "status" : "success",
   "data" : {
      "resultType" : "matrix",
      "result" : [
         {
            "metric" : {
               "__name__" : "up",
               "job" : "prometheus",
               "instance" : "localhost:9090"
            },
            "values" : [
               [ 1435781430.781, "1" ],
               [ 1435781445.781, "1" ],
               [ 1435781460.781, "1" ]
            ]
         },
         {
            "metric" : {
               "__name__" : "up",
               "job" : "node",
               "instance" : "localhost:9091"
            },
            "values" : [
               [ 1435781430.781, "0" ],
               [ 1435781445.781, "0" ],
               [ 1435781460.781, "1" ]
            ]
         }
      ]
   }
}

什么是Grafana

Grafana是一个开源的度量分析和可视化工具,可以通过将采集的资料分析、查询,

然后进行可视化的展示,并能实作报警,

网址: https://grafana.com/

使用Grafana连接Prometheus

连接不再做具体演示,操作思路如下:

  1. 在Grafana服务器上安装,下载地址:https://grafana.com/grafana/download
  2. 浏览器http://grafana服务器IP:3000登录,默认账号密码都是admin,就可以登陆了,
  3. 把Prometheus服务器收集的资料做为一个资料源添加到Grafana,得到Prometheus资料,
  4. 然后为添加好的资料源做图形显示,最后在dashboard就可以查看到,

操作流程不难,就不讲解重点,后面正式开始上查询脚本,

作业使用场景

作业中需要通过CPU、存储器生成资源利用率报表,可以通过Prometheus的API写一个Python脚本,

image-20211229154759297

可通过API获取资料,然后再进行资料排序、过滤、运算、聚合,最后写入Mysql数据库,

CPU峰值计算

  • 取最近一周CPU数值,再排序取最高的值,
def get_cpu_peak(self):
    """
        CPU取最近一周所有数值,再排序取最高的值,TOP1
        :return: {'IP' : value}
        """
    # 拼接URL
    pre_url = self.server_ip + '/api/v1/query_range?query='
    expr = '100 - (avg by(instance) (irate(node_cpu_seconds_total{mode="idle"}[5m])) * 100) ' \
    '&start=%s&end=%s&step=300' % (self.time_list[0], self.time_list[-1] - 1)
    url = pre_url + expr
    # print(url)

    result = {}
    # 请求URL后将Json资料转为字典物件
    res = json.loads(requests.post(url=url, headers=self.headers).content.decode('utf8', 'ignore'))
    # print(data)

    # 回圈取出字典里每个IP的values,排序取最高值,最后存入result字典
    for da in res.get('data').get('result'):
        values = da.get('values')
        cpu_values = [float(v[1]) for v in values]  # 取出数值并存入串列
        # 取出IP并消除埠号
        ip = da.get('metric').get('instance')
        ip = ip[:ip.index(':')] if ':' in ip else ip
        # if ip == '10.124.58.181':
        #     print (ip)
        # cpu_peak = round(sorted(cpu_values, reverse=True)[0], 2)
        cpu_peak = sorted(cpu_values, reverse=True)[0]
        # 取出IP和最高值之后,写入字典
        result[ip] = cpu_peak

        # print(result)
        return result

CPU均值计算

  • 取最近一周CPU每一天的TOP20除以20得到当时忙时平均值,
    再将7天平均值的和除以n,得到时间范围内忙时平均值,
def get_cpu_average(self):
    """
        CPU忙时平均值:取最近一周CPU资料,每一天的TOP20除以20得到忙时平均值;
        再将一周得到的忙时平均值相加,再除以7,得到时间范围内一周的忙时平均值,
        :return:
        """
    cpu_average = {}
    for t in range(len(self.time_list)):
        if t + 1 < len(self.time_list):
            start_time = self.time_list[t]
            end_time = self.time_list[t + 1]
            # print(start_time, end_time)
            # 拼接URL
            pre_url = server_ip + '/api/v1/query_range?query='
            expr = '100 - (avg by(instance) (irate(node_cpu_seconds_total{mode="idle"}[5m])) * 100) ' \
            '&start=%s&end=%s&step=300' % (start_time, end_time - 1)
            url = pre_url + expr
            # print(url)
            # 请求界面资料
            data = https://www.cnblogs.com/jiba/p/json.loads(requests.post(url=url, headers=self.headers).content.decode('utf8', 'ignore'))

            for da in data.get('data').get('result'):   # 回圈拿到result资料
                values = da.get('values')
                cpu_load = [float(v[1]) for v in values]    # 回圈拿到values里面的所有值
                ip = da.get('metric').get('instance')       # 拿到instance里面的ip
                ip = ip[:ip.index(':')] if ':' in ip else ip    # 去除个别后面带的埠号
                # avg_cup_load = sum(sorted(cpu_load, reverse=True)[:20]) / 20
                # 取top20% 再除以20%,得出top20%的平均值
                # avg_cup_load = round(sum(sorted(cpu_load, reverse=True)[:round(len(cpu_load) * 0.2)]) / round(len(cpu_load) * 0.2), 2)
                # 倒序后取前面20%除以个数,得到前20%的平均值
                avg_cup_load = sum(sorted(cpu_load, reverse=True)[:round(len(cpu_load) * 0.2)]) / round(len(cpu_load) * 0.2)
                # print(avg_cup_load)
                # 将计算后的资料以ip为key写入字典
                if cpu_average.get(ip):
                    cpu_average[ip].append(avg_cup_load)
                    else:
                        cpu_average[ip] = [avg_cup_load]

                        # 每日top20的平均值累加,共7天的再除以7
                        for k, v in cpu_average.items():
                            # cpu_average[k] = round(sum(v) / 7, 2)
                            cpu_average[k] = sum(v)

                            # print(cpu_average)
                            return cpu_average

存储器峰值计算

  • 取7天存储器数值,排序后取最高峰值TOP1
def get_mem_peak(self):
    """
        存储器单台峰值:取7天存储器最高峰值TOP1
        :return: 7天存储器使用率最高峰值
        """
    pre_url = self.server_ip + '/api/v1/query_range?query='
    # expr = '(node_memory_MemTotal_bytes - (node_memory_MemFree_bytes+node_memory_Buffers_bytes+node_memory_Cached_bytes )) / node_memory_MemTotal_bytes * 100&start=%s&end=%s&step=300' % (start_time, end_time)
    # 字符太长会导致报错,所以这里进行拆分栏位计算
    expr_MenTotal = 'node_memory_MemTotal_bytes&start=%s&end=%s&step=300' % (self.time_list[0], self.time_list[-1] - 1)
    expr_MemFree = 'node_memory_MemFree_bytes&start=%s&end=%s&step=300' % (self.time_list[0], self.time_list[-1] - 1)
    expr_Buffers = 'node_memory_Buffers_bytes&start=%s&end=%s&step=300' % (self.time_list[0], self.time_list[-1] - 1)
    expr_Cached = 'node_memory_Cached_bytes&start=%s&end=%s&step=300' % (self.time_list[0], self.time_list[-1] - 1)

    result = {}
    # 回圈分别取出总存储器、可用存储器、Buffer块、快取块四个栏位
    for ur in expr_MenTotal, expr_MemFree, expr_Buffers, expr_Cached:
        url = pre_url + ur
        data = https://www.cnblogs.com/jiba/p/json.loads(requests.post(url=url, headers=self.headers).content.decode('utf8', 'ignore'))
        ip_dict = {}
        # 回圈单个栏位所有值
        for da in data.get('data').get('result'):
            ip = da.get('metric').get('instance')
            ip = ip[:ip.index(':')] if ':' in ip else ip
            # if ip != '10.124.53.12':
            #     continue
            if ip_dict.get(ip):     # 过滤重复的ip,重复ip会导致计算多次
                # print("重复ip:%s" % (ip))
                continue
                values = da.get('values')
                # 将串列里的值转为字典方便计算
                values_dict = {}
                for v in values:
                    values_dict[str(v[0])] = v[1]
                    # 标记ip存在
                    ip_dict[ip] = True
                    # 建立串列追加字典
                    if result.get(ip):
                        result[ip].append(values_dict)
                        else:
                            result[ip] = [values_dict]

                            # print(result)
                            # 对取出的四个值进行计算,得出峰值
                            for ip, values in result.items():
                                values_list = []
                                for k, v in values[0].items():
                                    try:
                                        values_MenTotal = float(v)
                                        values_MemFree = float(values[1].get(k, 0))
                                        values_Buffers = float(values[2].get(k, 0)) if values[2] else 0
                                        values_Cached = float(values[3].get(k, 0)) if values[3] else 0
                                        # 如果是0,不参与计算
                                        if values_MemFree==0.0 or values_Buffers==0.0 or values_Cached==0.0:
                                            continue
                                            # values_list.append(round((values_MenTotal - (values_MemFree + values_Buffers + values_Cached)) / values_MenTotal * 100, 2))
                                            # 合并后计算,得出串列
                                            values_list.append((values_MenTotal - (values_MemFree + values_Buffers + values_Cached)) / values_MenTotal * 100)
                                            # 对得出结果进行排序
                                            result[ip] = sorted(values_list, reverse=True)[0]
                                            except Exception as e:
                                                # print(values[0])
                                                logging.exception(e)

                                                # print(result)
                                                return result

存储器均值计算

  • 先取出7天的日期,根据多条链接回圈取出每天资料,排序value取top20除以20,最终7天资料再除以7
def get_mem_average(self):
    """
        存储器忙时平均值:先取出7天的日期,根据多条链接回圈取出每天资料,排序value取top20除以20,最终7天资料再除以7
        :return:
        """
    avg_mem_util = {}
    for t in range(len(self.time_list)):
        if t + 1 < len(self.time_list):
            start_time = self.time_list[t]
            end_time = self.time_list[t + 1]
            # 根据多条链接回圈取出每天资料
            pre_url = self.server_ip + '/api/v1/query_range?query='
            # expr = '(node_memory_MemTotal_bytes - (node_memory_MemFree_bytes+node_memory_Buffers_bytes+node_memory_Cached_bytes )) / node_memory_MemTotal_bytes * 100&start=%s&end=%s&step=300' % (start_time, end_time)
            expr_MenTotal = 'node_memory_MemTotal_bytes&start=%s&end=%s&step=600' % (start_time, end_time - 1)
            expr_MemFree = 'node_memory_MemFree_bytes&start=%s&end=%s&step=600' % (start_time, end_time - 1)
            expr_Buffers = 'node_memory_Buffers_bytes&start=%s&end=%s&step=600' % (start_time, end_time - 1)
            expr_Cached = 'node_memory_Cached_bytes&start=%s&end=%s&step=600' % (start_time, end_time - 1)

            result = {}
            # 回圈取出四个栏位
            for ur in expr_MenTotal, expr_MemFree, expr_Buffers, expr_Cached:
                url = pre_url + ur
                data = https://www.cnblogs.com/jiba/p/json.loads(requests.post(url=url, headers=self.headers).content.decode('utf8', 'ignore'))
                ip_dict = {}
                # 回圈单个栏位所有值
                for da in data.get('data').get('result'):
                    ip = da.get('metric').get('instance')
                    ip = ip[:ip.index(':')] if ':' in ip else ip
                    if ip_dict.get(ip):
                        # print("重复ip:%s" % (ip))
                        continue
                        values = da.get('values')
                        # 将串列里的值转为字典方便计算
                        values_dict = {}
                        for v in values:
                            values_dict[str(v[0])] = v[1]
                            # 标记ip存在
                            ip_dict[ip] = True
                            # 建立串列追加字典
                            if result.get(ip):
                                result[ip].append(values_dict)
                                else:
                                    result[ip] = [values_dict]

                                    # print(result)
                                    for ip, values in result.items():
                                        values_list = []

                                        for k, v in values[0].items():
                                            try:
                                                values_MenTotal = float(v)
                                                values_MemFree = float(values[1].get(k, 0)) if values[1] else 0
                                                values_Buffers = float(values[2].get(k, 0)) if values[2] else 0
                                                values_Cached = float(values[3].get(k, 0)) if values[3] else 0
                                                if values_MemFree == 0.0 or values_Buffers == 0.0 or values_Cached == 0.0:
                                                    continue
                                                    value_calc = (values_MenTotal - (values_MemFree + values_Buffers + values_Cached)) / values_MenTotal * 100
                                                    if value_calc != float(0):
                                                        values_list.append(value_calc)
                                                        except Exception as e:
                                                            print(values[0])
                                                            # logging.exception(e)
                                                            continue
                                                            # 排序value取top20除以20
                                                            # avg_mem = round(sum(sorted(values_list, reverse=True)[:round(len(values_list) * 0.2)]) / round(len(values_list) * 0.2), 2)
                                                            try:
                                                                avg_mem = sum(sorted(values_list, reverse=True)[:round(len(values_list) * 0.2)]) / round(len(values_list) * 0.2)
                                                                except Exception as e:
                                                                    avg_mem = 0
                                                                    logging.exception(e)

                                                                    if avg_mem_util.get(ip):
                                                                        avg_mem_util[ip].append(avg_mem)
                                                                        else:
                                                                            avg_mem_util[ip] = [avg_mem]

                                                                            # 最终7天资料再除以7
                                                                            for k, v in avg_mem_util.items():
                                                                                # avg_mem_util[k] = round(sum(v) / 7, 2)
                                                                                avg_mem_util[k] = sum(v)

                                                                                return avg_mem_util

汇出excel

  • 将采集到的资料汇出excel
def export_excel(self, export):
    """
        将采集到的资料汇出excel
        :param export: 资料集合
        :return:
        """
    try:
        # 将字典串列转换为DataFrame
        pf = pd.DataFrame(list(export))
        # 指定栏位顺序
        order = ['ip', 'cpu_peak', 'cpu_average', 'mem_peak', 'mem_average', 'collector']
        pf = pf[order]
        # 将列名替换为中文
        columns_map = {
            'ip': 'ip',
            'cpu_peak': 'CPU峰值利用率',
            'cpu_average': 'CPU忙时平均峰值利用率',
            'mem_peak': '存储器峰值利用率',
            'mem_average': '存储器忙时平均峰值利用率',
            'collector': '来源地址'
        }
        pf.rename(columns=columns_map, inplace=True)
        # 指定生成的Excel表格名称
        writer_name = self.Host + '.xlsx'
        writer_name.replace(':18600', '')
        # print(writer_name)
        file_path = pd.ExcelWriter(writer_name.replace(':18600', ''))
        # 替换空单元格
        pf.fillna(' ', inplace=True)
        # 输出
        pf.to_excel(file_path, encoding='utf-8', index=False)
        # 保存表格
        file_path.save()
        except Exception as e:
            print(e)
            logging.exception(e)

因为机房需要保留资料方便展示,后面改造成采集直接入库mysql,

---- 钢铁知识库 648403020@qq.com 2021.12.29

写在最后

以上简单介绍了Prometheus架构、基础概念、API使用,以及Python呼叫Prometheus的API部分示例,完整代码也已经上传,需要自取或联系即可,

下载链接:
https://download.csdn.net/download/u011463397/72150839

参考链接:

Prometheus操作指南:https://github.com/yunlzheng/prometheus-book

官方查询API:https://prometheus.io/docs/prometheus/latest/querying/api/

标签:

0 评论

发表评论

您的电子邮件地址不会被公开。 必填的字段已做标记 *