diff --git a/module/monitor/driver/influxdb-v2/executor.go b/module/monitor/driver/influxdb-v2/executor.go index 2fc7ab26..6949b710 100644 --- a/module/monitor/driver/influxdb-v2/executor.go +++ b/module/monitor/driver/influxdb-v2/executor.go @@ -425,29 +425,26 @@ func (e *executor) TrafficOverviewByStatusCode(ctx context.Context, start time.T r := new(monitor.StatusCodeOverview) if s2xxRequestLen > i { r.Status2xx = s2xxRequest[i] - totalOverview.Status2xx += r.Status2xx } if s4xxRequestLen > i { r.Status4xx = s4xxRequest[i] - totalOverview.Status4xx += r.Status4xx } if s5xxRequestLen > i { r.Status5xx = s5xxRequest[i] - totalOverview.Status5xx += r.Status5xx } if s2xxResponseLen > i { r.Status2xx += s2xxResponse[i] - totalOverview.Status2xx += r.Status2xx } if s4xxResponseLen > i { r.Status4xx += s4xxResponse[i] - totalOverview.Status4xx += r.Status4xx } if s5xxResponseLen > i { r.Status5xx += s5xxResponse[i] - totalOverview.Status5xx += r.Status5xx } r.StatusTotal += r.Status2xx + r.Status4xx + r.Status5xx + totalOverview.Status2xx += r.Status2xx + totalOverview.Status4xx += r.Status4xx + totalOverview.Status5xx += r.Status5xx totalOverview.StatusTotal += r.StatusTotal result = append(result, r) @@ -686,9 +683,9 @@ func (e *executor) TokenOverview(ctx context.Context, start time.Time, end time. } func (e *executor) ConsumerOverview(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) (int64, map[time.Time]int64, error) { - newStartTime, every, _, bucket := getTimeIntervalAndBucket(start, end) + newStartTime, every, offset, bucket := getTimeIntervalAndBucket(start, end) filters := formatFilter(wheres) - return e.fluxQuery.CommonTendencyTag(ctx, e.openApi, newStartTime, end, bucket, "request", filters, every, "app") + return e.fluxQuery.CommonTendencyTag(ctx, e.openApi, newStartTime, end, bucket, "request", filters, every, offset, "app") } diff --git a/module/monitor/driver/influxdb-v2/flux/flux.go b/module/monitor/driver/influxdb-v2/flux/flux.go index 84be97cb..38a59206 100644 --- a/module/monitor/driver/influxdb-v2/flux/flux.go +++ b/module/monitor/driver/influxdb-v2/flux/flux.go @@ -15,7 +15,7 @@ type IFluxQuery interface { CommonStatistics(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, groupBy, filters string, statisticsConf []*StatisticsFilterConf, limit int) (map[string]*FluxStatistics, error) CommonProxyStatistics(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, groupBy, filters string, statisticsConf []*StatisticsFilterConf, limit int) (map[string]*FluxStatistics, error) CommonTendency(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, table, filters string, dataFields []string, every, windowOffset string, fn AggregateFn) ([]time.Time, map[string][]int64, error) - CommonTendencyTag(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, table, filters, every, tag string) (int64, map[time.Time]int64, error) + CommonTendencyTag(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, table, filters, every, offset, tag string) (int64, map[time.Time]int64, error) // CommonQueryOnce 查询只返回一条结果 CommonQueryOnce(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, filters string, fieldsConf *StatisticsFilterConf) (map[string]interface{}, error) CommonWarnStatistics(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, groupBy, filters string, statisticsConf *StatisticsFilterConf) (map[string]*FluxWarnStatistics, error) @@ -172,8 +172,8 @@ func (f *fluxQuery) CommonTendency(ctx context.Context, queryApi api.QueryAPI, s return dates, resultMap, nil } -func (f *fluxQuery) CommonTendencyTag(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, table, filters, every, tag string) (int64, map[time.Time]int64, error) { - query := f.assembleTendencyTagFlux(start, end, bucket, table, filters, every, tag) +func (f *fluxQuery) CommonTendencyTag(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, table, filters, every, offset, tag string) (int64, map[time.Time]int64, error) { + query := f.assembleTendencyTagFlux(start, end, bucket, table, filters, every, offset, tag) log.Info("flux sql=", query) result, err := queryApi.Query(ctx, query) if err != nil { @@ -184,7 +184,7 @@ func (f *fluxQuery) CommonTendencyTag(ctx context.Context, queryApi api.QueryAPI tagMap := make(map[string]struct{}) defer result.Close() for result.Next() { - date := result.Record().Values()["_stop"].(time.Time).In(time.Local) + date := result.Record().Values()["_start"].(time.Time).In(time.Local) if _, ok := dateMap[date]; !ok { dateMap[date] = map[string]struct{}{} } @@ -355,15 +355,19 @@ func (f *fluxQuery) assembleTendencyFlux(start, end time.Time, bucket, table, fi } -func (f *fluxQuery) assembleTendencyTagFlux(start, end time.Time, bucket, table, filters string, every, tag string) string { +func (f *fluxQuery) assembleTendencyTagFlux(start, end time.Time, bucket, table, filters string, every, offset, tag string) string { + windowOffset := "" + if len(offset) > 0 { + windowOffset = fmt.Sprintf(", offset: %s", offset) + } return fmt.Sprintf(` from(bucket: "%s") |> range(start: %d, stop: %d) |> filter(fn: (r) => r["_measurement"] == "%s") %s |> keep(columns: ["_time", "%s"]) - |> window(every: %s) - |> distinct(column: "%s")`, bucket, start.Unix(), end.Unix(), table, filters, tag, every, tag) + |> window(every: %s%s) + |> distinct(column: "%s")`, bucket, start.Unix(), end.Unix(), table, filters, tag, every, windowOffset, tag) } // assembleTendencyFieldCondition 封装趋势图需要的Field数据 diff --git a/module/monitor/iml.go b/module/monitor/iml.go index 00bbd60b..076f4bc8 100644 --- a/module/monitor/iml.go +++ b/module/monitor/iml.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "math" "sort" "sync" "time" @@ -108,6 +109,12 @@ func (i *imlMonitorStatisticModule) AIChartOverview(ctx context.Context, service avgRequestPerSubscriber := 0.0 if consumerNum != 0 { avgRequestPerSubscriber = float64(item.StatusTotal) / float64(consumerNum) + if avgRequestPerSubscriber > result.MaxRequestPerSubscriber { + result.MaxRequestPerSubscriber = avgRequestPerSubscriber + } + } + if result.MinRequestPerSubscriber == 0 || result.MinRequestPerSubscriber > avgRequestPerSubscriber { + result.MinRequestPerSubscriber = avgRequestPerSubscriber } result.AvgRequestPerSubscriberOverview = append(result.AvgRequestPerSubscriberOverview, avgRequestPerSubscriber) result.RequestOverview = append(result.RequestOverview, &monitor_dto.StatusCodeOverview{ @@ -169,6 +176,12 @@ func (i *imlMonitorStatisticModule) AIChartOverview(ctx context.Context, service avgTotalPerSubscriber = float64(item.TotalToken) / float64(consumerNum) avgOutputPerSubscriber = float64(item.OutputToken) / float64(consumerNum) avgInputPerSubscriber = float64(item.InputToken) / float64(consumerNum) + if avgTotalPerSubscriber > result.MaxTokenPerSubscriber { + result.MaxTokenPerSubscriber = avgTotalPerSubscriber + } + } + if result.MinTokenPerSubscriber == 0 || result.MinTokenPerSubscriber > avgTotalPerSubscriber { + result.MinTokenPerSubscriber = avgTotalPerSubscriber } result.AvgTokenPerSubscriberOverview = append(result.AvgTokenPerSubscriberOverview, &monitor_dto.TokenFloatOverview{ @@ -178,13 +191,6 @@ func (i *imlMonitorStatisticModule) AIChartOverview(ctx context.Context, service }) } - //avgTokenPerSubscriber := 0.0 - //if totalConsumerCount != 0 { - // avgTokenPerSubscriber = float64(summary.TotalToken) / float64(totalConsumerCount) - //} - //result.AvgToken = avgTokenPerSubscriber - //result.MaxToken = maxToken - //result.MinToken = minToken result.TokenTotal = summary.TotalToken result.InputTokenTotal = summary.InputToken result.OutputTokenTotal = summary.OutputToken @@ -202,12 +208,14 @@ func (i *imlMonitorStatisticModule) AIChartOverview(ctx context.Context, service if len(errs) > 0 { return nil, fmt.Errorf("errors occurred: %v", errs) } - var maxTokenPerSecond, minTokenPerSecond, avgTokenPerSecond float64 = 0, 0, 0 + sumResponseTime := 0.0 + var maxTokenPerSecond, minTokenPerSecond float64 = 0, 0 + for index, token := range totalTokens { var p float64 = 0 if len(sumResponseTimes) > index && sumResponseTimes[index] > 0 { - // 由于时间单位是ms,因此需要✖️1000 - p = float64(token) * 1000 / float64(sumResponseTimes[index]) + p = math.Round(float64(token)*1000*100/float64(sumResponseTimes[index])) / 100 + sumResponseTime += float64(sumResponseTimes[index]) } result.AvgTokenOverview = append(result.AvgTokenOverview, p) if maxTokenPerSecond < p { @@ -216,10 +224,9 @@ func (i *imlMonitorStatisticModule) AIChartOverview(ctx context.Context, service if minTokenPerSecond == 0 || minTokenPerSecond > p { minTokenPerSecond = p } - avgTokenPerSecond += p } - if len(sumResponseTimes) > 0 { - result.AvgToken = avgTokenPerSecond / float64(len(sumResponseTimes)) + if sumResponseTime > 0 { + result.AvgToken = math.Round(float64(result.TokenTotal)*1000*100/sumResponseTime) / 100 } result.MaxToken = maxTokenPerSecond result.MinToken = minTokenPerSecond @@ -258,12 +265,17 @@ func (i *imlMonitorStatisticModule) RestChartOverview(ctx context.Context, servi result.AvgRequestPerSubscriberOverview = make([]float64, 0, len(items)) result.RequestOverview = make([]*monitor_dto.StatusCodeOverview, 0, len(items)) for index, item := range items { - t := date[index] - log.Infof("date: %v, item: %v", t, item) consumerNum := consumerMap[date[index]] avgRequestPerSubscriber := 0.0 if consumerNum != 0 { avgRequestPerSubscriber = float64(summary.StatusTotal) / float64(consumerNum) + if avgRequestPerSubscriber > result.MaxRequestPerSubscriber { + result.MaxRequestPerSubscriber = avgRequestPerSubscriber + } + + } + if result.MinRequestPerSubscriber == 0 || avgRequestPerSubscriber < result.MinRequestPerSubscriber { + result.MinRequestPerSubscriber = avgRequestPerSubscriber } result.AvgRequestPerSubscriberOverview = append(result.AvgRequestPerSubscriberOverview, avgRequestPerSubscriber) result.RequestOverview = append(result.RequestOverview, &monitor_dto.StatusCodeOverview{ @@ -272,10 +284,7 @@ func (i *imlMonitorStatisticModule) RestChartOverview(ctx context.Context, servi Status5xx: item.Status5xx, }) } - //avgRequestPerSubscriber := 0.0 - //if totalConsumerCount != 0 { - // avgRequestPerSubscriber = float64(summary.StatusTotal) / float64(totalConsumerCount) - //} + result.RequestTotal = summary.StatusTotal result.Request2xxTotal = summary.Status2xx result.Request4xxTotal = summary.Status4xx @@ -318,6 +327,13 @@ func (i *imlMonitorStatisticModule) RestChartOverview(ctx context.Context, servi avgTrafficPerSubscriber := 0.0 if consumerNum != 0 { avgTrafficPerSubscriber = float64(item.StatusTotal) / float64(consumerNum) + if avgTrafficPerSubscriber > result.MaxTrafficPerSubscriber { + result.MaxTrafficPerSubscriber = avgTrafficPerSubscriber + } + + } + if result.MinTrafficPerSubscriber == 0 || result.MinTrafficPerSubscriber > avgTrafficPerSubscriber { + result.MinTrafficPerSubscriber = avgTrafficPerSubscriber } result.AvgTrafficPerSubscriberOverview = append(result.AvgTrafficPerSubscriberOverview, avgTrafficPerSubscriber) } @@ -425,10 +441,15 @@ func (i *imlMonitorStatisticModule) Top(ctx context.Context, serviceId string, s appMap := utils.SliceToMap(apps, func(t *service.Service) string { return t.Id }) + appMap["apipark-global"] = &service.Service{ + Id: "apipark-global", + Name: "System Consumer", + } for _, item := range result { if v, ok := appMap[item.Key]; ok { consumersResult = append(consumersResult, generateTopN(v.Id, v.Name, item, apiKind)) } + } }()