Merge branch 'feature/liujian-1.8' into 'main'

Feature/liujian 1.8

See merge request apipark/APIPark!371
This commit is contained in:
刘健
2025-05-06 12:04:47 +08:00
3 changed files with 56 additions and 34 deletions
@@ -425,29 +425,26 @@ func (e *executor) TrafficOverviewByStatusCode(ctx context.Context, start time.T
r := new(monitor.StatusCodeOverview)
if s2xxRequestLen > i {
r.Status2xx = s2xxRequest[i]
totalOverview.Status2xx += r.Status2xx
}
if s4xxRequestLen > i {
r.Status4xx = s4xxRequest[i]
totalOverview.Status4xx += r.Status4xx
}
if s5xxRequestLen > i {
r.Status5xx = s5xxRequest[i]
totalOverview.Status5xx += r.Status5xx
}
if s2xxResponseLen > i {
r.Status2xx += s2xxResponse[i]
totalOverview.Status2xx += r.Status2xx
}
if s4xxResponseLen > i {
r.Status4xx += s4xxResponse[i]
totalOverview.Status4xx += r.Status4xx
}
if s5xxResponseLen > i {
r.Status5xx += s5xxResponse[i]
totalOverview.Status5xx += r.Status5xx
}
r.StatusTotal += r.Status2xx + r.Status4xx + r.Status5xx
totalOverview.Status2xx += r.Status2xx
totalOverview.Status4xx += r.Status4xx
totalOverview.Status5xx += r.Status5xx
totalOverview.StatusTotal += r.StatusTotal
result = append(result, r)
@@ -686,9 +683,9 @@ func (e *executor) TokenOverview(ctx context.Context, start time.Time, end time.
}
func (e *executor) ConsumerOverview(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) (int64, map[time.Time]int64, error) {
newStartTime, every, _, bucket := getTimeIntervalAndBucket(start, end)
newStartTime, every, offset, bucket := getTimeIntervalAndBucket(start, end)
filters := formatFilter(wheres)
return e.fluxQuery.CommonTendencyTag(ctx, e.openApi, newStartTime, end, bucket, "request", filters, every, "app")
return e.fluxQuery.CommonTendencyTag(ctx, e.openApi, newStartTime, end, bucket, "request", filters, every, offset, "app")
}
+11 -7
View File
@@ -15,7 +15,7 @@ type IFluxQuery interface {
CommonStatistics(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, groupBy, filters string, statisticsConf []*StatisticsFilterConf, limit int) (map[string]*FluxStatistics, error)
CommonProxyStatistics(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, groupBy, filters string, statisticsConf []*StatisticsFilterConf, limit int) (map[string]*FluxStatistics, error)
CommonTendency(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, table, filters string, dataFields []string, every, windowOffset string, fn AggregateFn) ([]time.Time, map[string][]int64, error)
CommonTendencyTag(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, table, filters, every, tag string) (int64, map[time.Time]int64, error)
CommonTendencyTag(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, table, filters, every, offset, tag string) (int64, map[time.Time]int64, error)
// CommonQueryOnce 查询只返回一条结果
CommonQueryOnce(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, filters string, fieldsConf *StatisticsFilterConf) (map[string]interface{}, error)
CommonWarnStatistics(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, groupBy, filters string, statisticsConf *StatisticsFilterConf) (map[string]*FluxWarnStatistics, error)
@@ -172,8 +172,8 @@ func (f *fluxQuery) CommonTendency(ctx context.Context, queryApi api.QueryAPI, s
return dates, resultMap, nil
}
func (f *fluxQuery) CommonTendencyTag(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, table, filters, every, tag string) (int64, map[time.Time]int64, error) {
query := f.assembleTendencyTagFlux(start, end, bucket, table, filters, every, tag)
func (f *fluxQuery) CommonTendencyTag(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, table, filters, every, offset, tag string) (int64, map[time.Time]int64, error) {
query := f.assembleTendencyTagFlux(start, end, bucket, table, filters, every, offset, tag)
log.Info("flux sql=", query)
result, err := queryApi.Query(ctx, query)
if err != nil {
@@ -184,7 +184,7 @@ func (f *fluxQuery) CommonTendencyTag(ctx context.Context, queryApi api.QueryAPI
tagMap := make(map[string]struct{})
defer result.Close()
for result.Next() {
date := result.Record().Values()["_stop"].(time.Time).In(time.Local)
date := result.Record().Values()["_start"].(time.Time).In(time.Local)
if _, ok := dateMap[date]; !ok {
dateMap[date] = map[string]struct{}{}
}
@@ -355,15 +355,19 @@ func (f *fluxQuery) assembleTendencyFlux(start, end time.Time, bucket, table, fi
}
func (f *fluxQuery) assembleTendencyTagFlux(start, end time.Time, bucket, table, filters string, every, tag string) string {
func (f *fluxQuery) assembleTendencyTagFlux(start, end time.Time, bucket, table, filters string, every, offset, tag string) string {
windowOffset := ""
if len(offset) > 0 {
windowOffset = fmt.Sprintf(", offset: %s", offset)
}
return fmt.Sprintf(`
from(bucket: "%s")
|> range(start: %d, stop: %d)
|> filter(fn: (r) => r["_measurement"] == "%s")
%s
|> keep(columns: ["_time", "%s"])
|> window(every: %s)
|> distinct(column: "%s")`, bucket, start.Unix(), end.Unix(), table, filters, tag, every, tag)
|> window(every: %s%s)
|> distinct(column: "%s")`, bucket, start.Unix(), end.Unix(), table, filters, tag, every, windowOffset, tag)
}
// assembleTendencyFieldCondition 封装趋势图需要的Field数据
+40 -19
View File
@@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"sort"
"sync"
"time"
@@ -108,6 +109,12 @@ func (i *imlMonitorStatisticModule) AIChartOverview(ctx context.Context, service
avgRequestPerSubscriber := 0.0
if consumerNum != 0 {
avgRequestPerSubscriber = float64(item.StatusTotal) / float64(consumerNum)
if avgRequestPerSubscriber > result.MaxRequestPerSubscriber {
result.MaxRequestPerSubscriber = avgRequestPerSubscriber
}
}
if result.MinRequestPerSubscriber == 0 || result.MinRequestPerSubscriber > avgRequestPerSubscriber {
result.MinRequestPerSubscriber = avgRequestPerSubscriber
}
result.AvgRequestPerSubscriberOverview = append(result.AvgRequestPerSubscriberOverview, avgRequestPerSubscriber)
result.RequestOverview = append(result.RequestOverview, &monitor_dto.StatusCodeOverview{
@@ -169,6 +176,12 @@ func (i *imlMonitorStatisticModule) AIChartOverview(ctx context.Context, service
avgTotalPerSubscriber = float64(item.TotalToken) / float64(consumerNum)
avgOutputPerSubscriber = float64(item.OutputToken) / float64(consumerNum)
avgInputPerSubscriber = float64(item.InputToken) / float64(consumerNum)
if avgTotalPerSubscriber > result.MaxTokenPerSubscriber {
result.MaxTokenPerSubscriber = avgTotalPerSubscriber
}
}
if result.MinTokenPerSubscriber == 0 || result.MinTokenPerSubscriber > avgTotalPerSubscriber {
result.MinTokenPerSubscriber = avgTotalPerSubscriber
}
result.AvgTokenPerSubscriberOverview = append(result.AvgTokenPerSubscriberOverview, &monitor_dto.TokenFloatOverview{
@@ -178,13 +191,6 @@ func (i *imlMonitorStatisticModule) AIChartOverview(ctx context.Context, service
})
}
//avgTokenPerSubscriber := 0.0
//if totalConsumerCount != 0 {
// avgTokenPerSubscriber = float64(summary.TotalToken) / float64(totalConsumerCount)
//}
//result.AvgToken = avgTokenPerSubscriber
//result.MaxToken = maxToken
//result.MinToken = minToken
result.TokenTotal = summary.TotalToken
result.InputTokenTotal = summary.InputToken
result.OutputTokenTotal = summary.OutputToken
@@ -202,12 +208,14 @@ func (i *imlMonitorStatisticModule) AIChartOverview(ctx context.Context, service
if len(errs) > 0 {
return nil, fmt.Errorf("errors occurred: %v", errs)
}
var maxTokenPerSecond, minTokenPerSecond, avgTokenPerSecond float64 = 0, 0, 0
sumResponseTime := 0.0
var maxTokenPerSecond, minTokenPerSecond float64 = 0, 0
for index, token := range totalTokens {
var p float64 = 0
if len(sumResponseTimes) > index && sumResponseTimes[index] > 0 {
// 由于时间单位是ms,因此需要✖️1000
p = float64(token) * 1000 / float64(sumResponseTimes[index])
p = math.Round(float64(token)*1000*100/float64(sumResponseTimes[index])) / 100
sumResponseTime += float64(sumResponseTimes[index])
}
result.AvgTokenOverview = append(result.AvgTokenOverview, p)
if maxTokenPerSecond < p {
@@ -216,10 +224,9 @@ func (i *imlMonitorStatisticModule) AIChartOverview(ctx context.Context, service
if minTokenPerSecond == 0 || minTokenPerSecond > p {
minTokenPerSecond = p
}
avgTokenPerSecond += p
}
if len(sumResponseTimes) > 0 {
result.AvgToken = avgTokenPerSecond / float64(len(sumResponseTimes))
if sumResponseTime > 0 {
result.AvgToken = math.Round(float64(result.TokenTotal)*1000*100/sumResponseTime) / 100
}
result.MaxToken = maxTokenPerSecond
result.MinToken = minTokenPerSecond
@@ -258,12 +265,17 @@ func (i *imlMonitorStatisticModule) RestChartOverview(ctx context.Context, servi
result.AvgRequestPerSubscriberOverview = make([]float64, 0, len(items))
result.RequestOverview = make([]*monitor_dto.StatusCodeOverview, 0, len(items))
for index, item := range items {
t := date[index]
log.Infof("date: %v, item: %v", t, item)
consumerNum := consumerMap[date[index]]
avgRequestPerSubscriber := 0.0
if consumerNum != 0 {
avgRequestPerSubscriber = float64(summary.StatusTotal) / float64(consumerNum)
if avgRequestPerSubscriber > result.MaxRequestPerSubscriber {
result.MaxRequestPerSubscriber = avgRequestPerSubscriber
}
}
if result.MinRequestPerSubscriber == 0 || avgRequestPerSubscriber < result.MinRequestPerSubscriber {
result.MinRequestPerSubscriber = avgRequestPerSubscriber
}
result.AvgRequestPerSubscriberOverview = append(result.AvgRequestPerSubscriberOverview, avgRequestPerSubscriber)
result.RequestOverview = append(result.RequestOverview, &monitor_dto.StatusCodeOverview{
@@ -272,10 +284,7 @@ func (i *imlMonitorStatisticModule) RestChartOverview(ctx context.Context, servi
Status5xx: item.Status5xx,
})
}
//avgRequestPerSubscriber := 0.0
//if totalConsumerCount != 0 {
// avgRequestPerSubscriber = float64(summary.StatusTotal) / float64(totalConsumerCount)
//}
result.RequestTotal = summary.StatusTotal
result.Request2xxTotal = summary.Status2xx
result.Request4xxTotal = summary.Status4xx
@@ -318,6 +327,13 @@ func (i *imlMonitorStatisticModule) RestChartOverview(ctx context.Context, servi
avgTrafficPerSubscriber := 0.0
if consumerNum != 0 {
avgTrafficPerSubscriber = float64(item.StatusTotal) / float64(consumerNum)
if avgTrafficPerSubscriber > result.MaxTrafficPerSubscriber {
result.MaxTrafficPerSubscriber = avgTrafficPerSubscriber
}
}
if result.MinTrafficPerSubscriber == 0 || result.MinTrafficPerSubscriber > avgTrafficPerSubscriber {
result.MinTrafficPerSubscriber = avgTrafficPerSubscriber
}
result.AvgTrafficPerSubscriberOverview = append(result.AvgTrafficPerSubscriberOverview, avgTrafficPerSubscriber)
}
@@ -425,10 +441,15 @@ func (i *imlMonitorStatisticModule) Top(ctx context.Context, serviceId string, s
appMap := utils.SliceToMap(apps, func(t *service.Service) string {
return t.Id
})
appMap["apipark-global"] = &service.Service{
Id: "apipark-global",
Name: "System Consumer",
}
for _, item := range result {
if v, ok := appMap[item.Key]; ok {
consumersResult = append(consumersResult, generateTopN(v.Id, v.Name, item, apiKind))
}
}
}()