diff --git a/controller/monitor/iml.go b/controller/monitor/iml.go index f1b26ada..1a532436 100644 --- a/controller/monitor/iml.go +++ b/controller/monitor/iml.go @@ -2,6 +2,7 @@ package monitor import ( "fmt" + "strconv" "time" "github.com/APIParkLab/APIPark/module/monitor" @@ -17,6 +18,66 @@ type imlMonitorStatisticController struct { module monitor.IMonitorStatisticModule `autowired:""` } +func (i *imlMonitorStatisticController) ChartRestOverview(ctx *gin.Context, start string, end string) (*monitor_dto.ChartRestOverview, error) { + s, e, err := formatTime(start, end) + if err != nil { + return nil, err + } + return i.module.RestChartOverview(ctx, "", s, e) +} + +func (i *imlMonitorStatisticController) ChartAIOverview(ctx *gin.Context, start string, end string) (*monitor_dto.ChartAIOverview, error) { + s, e, err := formatTime(start, end) + if err != nil { + return nil, err + } + return i.module.AIChartOverview(ctx, "", s, e) +} + +func (i *imlMonitorStatisticController) AITopN(ctx *gin.Context, start string, end string, limit string) ([]*monitor_dto.TopN, []*monitor_dto.TopN, error) { + s, e, err := formatTime(start, end) + if err != nil { + return nil, nil, err + } + l, err := strconv.Atoi(limit) + if err != nil { + if limit == "" { + l = 10 + } else { + return nil, nil, fmt.Errorf("parse limit %s error: %w", limit, err) + } + } + return i.module.Top(ctx, "", s, e, l, "ai") +} + +func formatTime(start string, end string) (int64, int64, error) { + s, err := strconv.ParseInt(start, 10, 64) + if err != nil { + return 0, 0, fmt.Errorf("parse start time %s error: %w", start, err) + } + e, err := strconv.ParseInt(end, 10, 64) + if err != nil { + return 0, 0, fmt.Errorf("parse end time %s error: %w", end, err) + } + return s, e, nil +} + +func (i *imlMonitorStatisticController) RestTopN(ctx *gin.Context, start string, end string, limit string) ([]*monitor_dto.TopN, []*monitor_dto.TopN, error) { + s, e, err := formatTime(start, end) + if err != nil { + return nil, nil, err + } + l, err := strconv.Atoi(limit) + if err != nil { + if limit == "" { + l = 10 + } else { + return nil, nil, fmt.Errorf("parse limit %s error: %w", limit, err) + } + } + return i.module.Top(ctx, "", s, e, l, "rest") +} + func (i *imlMonitorStatisticController) Statistics(ctx *gin.Context, dataType string, input *monitor_dto.StatisticInput) (interface{}, error) { switch dataType { case monitor_dto.DataTypeApi: diff --git a/controller/monitor/statistic.go b/controller/monitor/statistic.go index ca991f68..4f841162 100644 --- a/controller/monitor/statistic.go +++ b/controller/monitor/statistic.go @@ -22,6 +22,11 @@ type IMonitorStatisticController interface { InvokeTrendInner(ctx *gin.Context, dataType string, typ string, api string, provider string, subscriber string, input *monitor_dto.CommonInput) (*monitor_dto.MonInvokeCountTrend, string, error) StatisticsInner(ctx *gin.Context, dataType string, typ string, id string, input *monitor_dto.StatisticInput) (interface{}, error) + + ChartRestOverview(ctx *gin.Context, start string, end string) (*monitor_dto.ChartRestOverview, error) + ChartAIOverview(ctx *gin.Context, start string, end string) (*monitor_dto.ChartAIOverview, error) + AITopN(ctx *gin.Context, start string, end string, limit string) ([]*monitor_dto.TopN, []*monitor_dto.TopN, error) + RestTopN(ctx *gin.Context, start string, end string, limit string) ([]*monitor_dto.TopN, []*monitor_dto.TopN, error) } type IMonitorConfigController interface { diff --git a/controller/service/iml.go b/controller/service/iml.go index e912ac9f..aa5cebef 100644 --- a/controller/service/iml.go +++ b/controller/service/iml.go @@ -5,9 +5,13 @@ import ( "fmt" "io" "net/http" + "strconv" "strings" "time" + "github.com/APIParkLab/APIPark/module/monitor" + monitor_dto "github.com/APIParkLab/APIPark/module/monitor/dto" + ai_provider_local "github.com/APIParkLab/APIPark/ai-provider/local" subscribe_dto "github.com/APIParkLab/APIPark/module/subscribe/dto" @@ -66,20 +70,72 @@ var ( ) type imlServiceController struct { - module service.IServiceModule `autowired:""` - docModule service.IServiceDocModule `autowired:""` - subscribeModule subscribe.ISubscribeModule `autowired:""` - aiAPIModule ai_api.IAPIModule `autowired:""` - routerModule router.IRouterModule `autowired:""` - apiDocModule api_doc.IAPIDocModule `autowired:""` - providerModule ai.IProviderModule `autowired:""` - aiLocalModel ai_local.ILocalModelModule `autowired:""` - appModule service.IAppModule `autowired:""` - upstreamModule upstream.IUpstreamModule `autowired:""` - settingModule system.ISettingModule `autowired:""` - teamModule team.ITeamModule `autowired:""` - catalogueModule catalogue.ICatalogueModule `autowired:""` - transaction store.ITransaction `autowired:""` + module service.IServiceModule `autowired:""` + docModule service.IServiceDocModule `autowired:""` + subscribeModule subscribe.ISubscribeModule `autowired:""` + aiAPIModule ai_api.IAPIModule `autowired:""` + routerModule router.IRouterModule `autowired:""` + apiDocModule api_doc.IAPIDocModule `autowired:""` + providerModule ai.IProviderModule `autowired:""` + aiLocalModel ai_local.ILocalModelModule `autowired:""` + appModule service.IAppModule `autowired:""` + upstreamModule upstream.IUpstreamModule `autowired:""` + settingModule system.ISettingModule `autowired:""` + teamModule team.ITeamModule `autowired:""` + catalogueModule catalogue.ICatalogueModule `autowired:""` + monitorModule monitor.IMonitorStatisticModule `autowired:""` + transaction store.ITransaction `autowired:""` +} + +func (i *imlServiceController) AIChartOverview(ctx *gin.Context, serviceId string, start string, end string) (*monitor_dto.ChartAIOverview, error) { + s, e, err := formatTime(start, end) + if err != nil { + return nil, err + } + if serviceId == "" { + return nil, fmt.Errorf("service is required") + } + return i.monitorModule.AIChartOverview(ctx, serviceId, s, e) +} + +func (i *imlServiceController) RestChartOverview(ctx *gin.Context, serviceId string, start string, end string) (*monitor_dto.ChartRestOverview, error) { + s, e, err := formatTime(start, end) + if err != nil { + return nil, err + } + if serviceId == "" { + return nil, fmt.Errorf("service is required") + } + return i.monitorModule.RestChartOverview(ctx, serviceId, s, e) +} + +func formatTime(start string, end string) (int64, int64, error) { + s, err := strconv.ParseInt(start, 10, 64) + if err != nil { + return 0, 0, fmt.Errorf("parse start time %s error: %w", start, err) + } + e, err := strconv.ParseInt(end, 10, 64) + if err != nil { + return 0, 0, fmt.Errorf("parse end time %s error: %w", end, err) + } + return s, e, nil +} + +func (i *imlServiceController) Top10(ctx *gin.Context, serviceId string, start string, end string) ([]*monitor_dto.TopN, []*monitor_dto.TopN, error) { + if serviceId == "" { + return nil, nil, fmt.Errorf("serviceId is required") + } + info, err := i.module.Get(ctx, serviceId) + if err != nil { + return nil, nil, err + } + + s, e, err := formatTime(start, end) + if err != nil { + return nil, nil, err + } + + return i.monitorModule.Top(ctx, serviceId, s, e, 10, info.ServiceKind) } func (i *imlServiceController) QuickCreateAIService(ctx *gin.Context, input *service_dto.QuickCreateAIService) error { diff --git a/controller/service/service.go b/controller/service/service.go index 196d1bed..d443fc85 100644 --- a/controller/service/service.go +++ b/controller/service/service.go @@ -3,6 +3,8 @@ package service import ( "reflect" + monitor_dto "github.com/APIParkLab/APIPark/module/monitor/dto" + service_dto "github.com/APIParkLab/APIPark/module/service/dto" "github.com/gin-gonic/gin" @@ -32,6 +34,11 @@ type IServiceController interface { Swagger(ctx *gin.Context) ExportSwagger(ctx *gin.Context) + + Top10(ctx *gin.Context, serviceId string, start string, end string) ([]*monitor_dto.TopN, []*monitor_dto.TopN, error) + + AIChartOverview(ctx *gin.Context, serviceId string, start string, end string) (*monitor_dto.ChartAIOverview, error) + RestChartOverview(ctx *gin.Context, serviceId string, start string, end string) (*monitor_dto.ChartRestOverview, error) } type IAppController interface { diff --git a/module/monitor/driver/driver.go b/module/monitor/driver/driver.go index a4b8d58e..8e5fc1cf 100644 --- a/module/monitor/driver/driver.go +++ b/module/monitor/driver/driver.go @@ -21,4 +21,24 @@ type IExecutor interface { InvokeTrend(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) (*monitor.MonInvokeCountTrend, string, error) ProxyTrend(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) (*monitor.MonInvokeCountTrend, string, error) MessageTrend(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) (*monitor.MonMessageTrend, string, error) + + IBasicOverview + + IRestOverview + + IAIOverview +} + +type IBasicOverview interface { + RequestOverview(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.StatusCodeOverview, []*monitor.StatusCodeOverview, error) + TopN(ctx context.Context, start time.Time, end time.Time, limit int, groupBy string, wheres []monitor.MonWhereItem) ([]*monitor.TopN, error) +} + +type IRestOverview interface { + TrafficOverviewByStatusCode(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.StatusCodeOverview, []*monitor.StatusCodeOverview, error) + AvgResponseTimeOverview(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.Aggregate, []int64, error) +} + +type IAIOverview interface { + TokenOverview(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.TokenOverview, []*monitor.TokenOverview, error) } diff --git a/module/monitor/driver/influxdb-v2/executor.go b/module/monitor/driver/influxdb-v2/executor.go index 21077e67..b73b7b92 100644 --- a/module/monitor/driver/influxdb-v2/executor.go +++ b/module/monitor/driver/influxdb-v2/executor.go @@ -3,6 +3,7 @@ package influxdb_v2 import ( "context" "encoding/json" + "fmt" "strings" "time" @@ -23,6 +24,9 @@ import ( "github.com/APIParkLab/APIPark/service/monitor" ) +var _ driver.IAIOverview = (*executor)(nil) +var _ driver.IRestOverview = (*executor)(nil) + func newExecutor(cfg string, fluxQuery flux.IFluxQuery) (driver.IExecutor, error) { var data InfluxdbV2Config err := json.Unmarshal([]byte(cfg), &data) @@ -147,7 +151,7 @@ func (e *executor) MessageTrend(ctx context.Context, start time.Time, end time.T fieldsConditions := []string{"request", "response"} - dates, groupValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "request", filters, fieldsConditions, every, windowOffset) + dates, groupValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "request", filters, fieldsConditions, every, windowOffset, flux.SumFn) if err != nil { return nil, "", err } @@ -166,9 +170,9 @@ func (e *executor) ProxyTrend(ctx context.Context, start time.Time, end time.Tim filters := formatFilter(wheres) - proxyConditions := []string{"p_total", "p_success", "p_s4xx", "p_s5xx"} + proxyConditions := []string{"p_total", "p_success", "p_s2xx", "p_s4xx", "p_s5xx"} - dates, proxyValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "proxy", filters, proxyConditions, every, windowOffset) + dates, proxyValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "proxy", filters, proxyConditions, every, windowOffset, flux.SumFn) if err != nil { return nil, "", err } @@ -200,9 +204,9 @@ func (e *executor) InvokeTrend(ctx context.Context, start time.Time, end time.Ti newStartTime, every, windowOffset, bucket := getTimeIntervalAndBucket(start, end) filters := formatFilter(wheres) - requestConditions := []string{"total", "success", "s4xx", "s5xx"} + requestConditions := []string{"total", "success", "2xx", "s4xx", "s5xx"} - dates, requestValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "request", filters, requestConditions, every, windowOffset) + dates, requestValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "request", filters, requestConditions, every, windowOffset, flux.SumFn) if err != nil { return nil, "", err } @@ -221,7 +225,7 @@ func (e *executor) InvokeTrend(ctx context.Context, start time.Time, end time.Ti proxyConditions := []string{"p_total", "p_success"} - _, proxyValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "proxy", filters, proxyConditions, every, windowOffset) + _, proxyValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "proxy", filters, proxyConditions, every, windowOffset, flux.SumFn) if err != nil { return nil, "", err } @@ -361,3 +365,233 @@ func (e *executor) CommonStatistics(ctx context.Context, start, end time.Time, g return resultMap, nil } + +func (e *executor) TrafficOverviewByStatusCode(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.StatusCodeOverview, []*monitor.StatusCodeOverview, error) { + newStartTime, every, windowOffset, bucket := getTimeIntervalAndBucket(start, end) + + filters := formatFilter(wheres) + + fieldsConditions := []string{"s2xx_request", "s4xx_request", "s5xx_request", "s2xx_response", "s4xx_response", "s5xx_response"} + + dates, groupValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "request", filters, fieldsConditions, every, windowOffset, flux.SumFn) + if err != nil { + return nil, nil, nil, err + } + s2xxRequest := groupValues["s2xx_request"] + s4xxRequest := groupValues["s4xx_request"] + s5xxRequest := groupValues["s5xx_request"] + + s2xxResponse := groupValues["s2xx_response"] + s4xxResponse := groupValues["s4xx_response"] + s5xxResponse := groupValues["s5xx_response"] + totalOverview := new(monitor.StatusCodeOverview) + result := make([]*monitor.StatusCodeOverview, 0, len(dates)) + for i := range dates { + overview := new(monitor.StatusCodeOverview) + overview.Status2xx = s2xxRequest[i] + s2xxResponse[i] + overview.Status4xx = s4xxRequest[i] + s4xxResponse[i] + overview.Status5xx = s5xxRequest[i] + s5xxResponse[i] + overview.StatusTotal = overview.Status2xx + overview.Status4xx + overview.Status5xx + + totalOverview.StatusTotal += overview.StatusTotal + totalOverview.Status2xx += overview.Status2xx + totalOverview.Status4xx += overview.Status4xx + totalOverview.Status5xx += overview.Status5xx + result = append(result, overview) + } + + return dates, totalOverview, result, nil +} + +func (e *executor) aggregateSummary(ctx context.Context, start time.Time, end time.Time, measurement string, bucket string, filters string, fields []string) (map[string]*monitor.Aggregate, error) { + if len(fields) == 0 { + return nil, fmt.Errorf("fields is empty") + } + maxFields := make([]string, 0, len(fields)) + minFields := make([]string, 0, len(fields)) + avgFields := make([]string, 0, len(fields)) + for _, field := range fields { + maxFields = append(maxFields, field+"_max") + minFields = append(minFields, field+"_min") + avgFields = append(avgFields, field+"_avg") + } + maxRes, err := e.fluxQuery.CommonQueryOnce(ctx, e.openApi, start, end, bucket, filters, &flux.StatisticsFilterConf{ + Measurement: measurement, + AggregateFn: "max()", + Fields: maxFields, + }) + if err != nil { + return nil, err + } + minRes, err := e.fluxQuery.CommonQueryOnce(ctx, e.openApi, start, end, bucket, filters, &flux.StatisticsFilterConf{ + Measurement: measurement, + AggregateFn: "min()", + Fields: minFields, + }) + if err != nil { + return nil, err + } + avgRes, err := e.fluxQuery.CommonQueryOnce(ctx, e.openApi, start, end, bucket, filters, &flux.StatisticsFilterConf{ + Measurement: measurement, + AggregateFn: "mean()", + Fields: avgFields, + }) + if err != nil { + return nil, err + } + result := make(map[string]*monitor.Aggregate) + for _, field := range fields { + a := new(monitor.Aggregate) + a.Avg = int64(avgRes[field+"_avg"].(float64)) + a.Min = minRes[field+"_min"].(int64) + a.Max = maxRes[field+"_max"].(int64) + result[field] = a + } + + return result, nil + +} + +func (e *executor) AvgResponseTimeOverview(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.Aggregate, []int64, error) { + newStartTime, every, windowOffset, bucket := getTimeIntervalAndBucket(start, end) + filters := formatFilter(wheres) + + fieldsConditions := []string{"timing_avg"} + + agg, err := e.aggregateSummary(ctx, newStartTime, end, "request", bucket, filters, []string{"timing"}) + if err != nil { + return nil, nil, nil, err + } + + dates, groupValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "request", filters, fieldsConditions, every, windowOffset, flux.AvgFn) + if err != nil { + return nil, nil, nil, err + } + + timingAvg := groupValues["timing_avg"] + timingAvgLen := len(timingAvg) + result := make([]int64, 0, len(dates)) + for i := range dates { + if timingAvgLen > i { + result = append(result, timingAvg[i]) + } + } + + return dates, agg["timing"], result, nil + +} + +func (e *executor) RequestOverview(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.StatusCodeOverview, []*monitor.StatusCodeOverview, error) { + newStartTime, every, windowOffset, bucket := getTimeIntervalAndBucket(start, end) + filters := formatFilter(wheres) + + requestConditions := []string{"total", "s2xx", "s4xx", "s5xx"} + + dates, requestValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "request", filters, requestConditions, every, windowOffset, flux.SumFn) + if err != nil { + return nil, nil, nil, err + } + total := requestValues["total"] + totalLen := len(total) + s2xx := requestValues["s2xx"] + s2xxLen := len(s2xx) + s4xx := requestValues["s4xx"] + s4xxLen := len(s4xx) + s5xx := requestValues["s5xx"] + s5xxLen := len(s5xx) + totalOverview := new(monitor.StatusCodeOverview) + result := make([]*monitor.StatusCodeOverview, 0, len(dates)) + for i := range dates { + r := new(monitor.StatusCodeOverview) + if totalLen > i { + r.StatusTotal = total[i] + totalOverview.StatusTotal += r.StatusTotal + } + if s2xxLen > i { + r.Status2xx = s2xx[i] + totalOverview.Status2xx += r.Status2xx + } + if s4xxLen > i { + r.Status4xx = s4xx[i] + totalOverview.Status4xx += r.Status4xx + } + if s5xxLen > i { + r.Status5xx = s5xx[i] + totalOverview.Status5xx += r.Status5xx + } + result = append(result, r) + } + return dates, totalOverview, result, nil +} + +func (e *executor) TopN(ctx context.Context, start time.Time, end time.Time, limit int, groupBy string, wheres []monitor.MonWhereItem) ([]*monitor.TopN, error) { + filters := formatFilter(wheres) + newStartTime, _, _, bucket := getTimeIntervalAndBucket(start, end) + + statisticsConf := []*flux.StatisticsFilterConf{ + { + Measurement: "request", + AggregateFn: "sum()", + Fields: []string{"total", "request", "total_token"}, + }, + { + Measurement: "proxy", + AggregateFn: "sum()", + Fields: []string{"p_total"}, + }, + } + + results, err := e.fluxQuery.CommonStatistics(ctx, e.openApi, newStartTime, end, bucket, groupBy, filters, statisticsConf, limit) + if err != nil { + return nil, err + } + topN := make([]*monitor.TopN, 0, len(results)) + for key, result := range results { + n := new(monitor.TopN) + n.Key = key + n.Request = result.Total + n.Token = result.TotalToken + n.Traffic = result.TotalRequest + topN = append(topN, n) + } + + return topN, nil +} + +func (e *executor) TokenOverview(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.TokenOverview, []*monitor.TokenOverview, error) { + newStartTime, every, windowOffset, bucket := getTimeIntervalAndBucket(start, end) + filters := formatFilter(wheres) + + requestConditions := []string{"total_token", "input_token", "output_token"} + + dates, requestValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "request", filters, requestConditions, every, windowOffset, flux.SumFn) + if err != nil { + return nil, nil, nil, err + } + total := requestValues["total_token"] + totalLen := len(total) + input := requestValues["input_token"] + inputLen := len(input) + output := requestValues["output_token"] + outputLen := len(output) + totalOverview := new(monitor.TokenOverview) + result := make([]*monitor.TokenOverview, 0, len(dates)) + for i := range dates { + r := new(monitor.TokenOverview) + if totalLen > i { + r.TotalToken = total[i] + totalOverview.TotalToken += r.TotalToken + } + if inputLen > i { + r.InputToken = input[i] + totalOverview.InputToken += r.InputToken + } + if outputLen > i { + r.OutputToken = output[i] + totalOverview.OutputToken += r.OutputToken + } + + result = append(result, r) + } + return dates, totalOverview, result, nil +} diff --git a/module/monitor/driver/influxdb-v2/flux/flux.go b/module/monitor/driver/influxdb-v2/flux/flux.go index 291cc775..02b3b73f 100644 --- a/module/monitor/driver/influxdb-v2/flux/flux.go +++ b/module/monitor/driver/influxdb-v2/flux/flux.go @@ -14,7 +14,7 @@ import ( type IFluxQuery interface { CommonStatistics(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, groupBy, filters string, statisticsConf []*StatisticsFilterConf, limit int) (map[string]*FluxStatistics, error) CommonProxyStatistics(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, groupBy, filters string, statisticsConf []*StatisticsFilterConf, limit int) (map[string]*FluxStatistics, error) - CommonTendency(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, table, filters string, dataFields []string, every, windowOffset string) ([]time.Time, map[string][]int64, error) + CommonTendency(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, table, filters string, dataFields []string, every, windowOffset string, fn AggregateFn) ([]time.Time, map[string][]int64, error) // CommonQueryOnce 查询只返回一条结果 CommonQueryOnce(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, filters string, fieldsConf *StatisticsFilterConf) (map[string]interface{}, error) CommonWarnStatistics(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, groupBy, filters string, statisticsConf *StatisticsFilterConf) (map[string]*FluxWarnStatistics, error) @@ -61,6 +61,9 @@ func (f *fluxQuery) CommonStatistics(ctx context.Context, queryApi api.QueryAPI, totalRequest := common.FmtIntFromInterface(maps["request"]) maxRequest := common.FmtIntFromInterface(maps["request_max"]) minRequest := common.FmtIntFromInterface(maps["request_min"]) + totalToken := common.FmtIntFromInterface(maps["total_token"]) + maxToken := common.FmtIntFromInterface(maps["total_token_max"]) + minToken := common.FmtIntFromInterface(maps["total_token_min"]) resultMap[key] = &FluxStatistics{ Total: total, @@ -73,6 +76,9 @@ func (f *fluxQuery) CommonStatistics(ctx context.Context, queryApi api.QueryAPI, TotalRequest: totalRequest, RequestMax: maxRequest, RequestMin: minRequest, + TotalToken: totalToken, + TokenMax: maxToken, + TokenMin: minToken, } } @@ -128,10 +134,10 @@ func (f *fluxQuery) CommonProxyStatistics(ctx context.Context, queryApi api.Quer return resultMap, nil } -func (f *fluxQuery) CommonTendency(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, table, filters string, dataFields []string, every, windowOffset string) ([]time.Time, map[string][]int64, error) { +func (f *fluxQuery) CommonTendency(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, table, filters string, dataFields []string, every, windowOffset string, fn AggregateFn) ([]time.Time, map[string][]int64, error) { fieldConditions := f.assembleTendencyFieldCondition(dataFields) //拼装请求 - query := f.assembleTendencyFlux(start, end, bucket, table, filters, fieldConditions, every, windowOffset) + query := f.assembleTendencyFlux(start, end, bucket, table, filters, fieldConditions, every, windowOffset, fn) log.Info("flux sql=", query) result, err := queryApi.Query(ctx, query) @@ -148,15 +154,12 @@ func (f *fluxQuery) CommonTendency(ctx context.Context, queryApi api.QueryAPI, s //初始返回内容 dates := make([]time.Time, 0, len(resultList)) resultMap := make(map[string][]int64, len(dataFields)) - for _, field := range dataFields { - resultMap[field] = make([]int64, 0, len(resultList)) - } - for _, res := range resultList { for _, field := range dataFields { resultMap[field] = append(resultMap[field], common.FmtIntFromInterface(res[field])) } t, _ := res["_time"].(time.Time) + dates = append(dates, t) } @@ -270,7 +273,7 @@ from(bucket: "%s") } return fmt.Sprintf(` -union(tables: [ +union(tables: [ %s ]) |> pivot(rowKey: ["%s"], columnKey: ["_field"], valueColumn: "_value") @@ -278,20 +281,41 @@ union(tables: [ `, strings.Join(streams, ",\n"), groupBy, limitStr) } -func (f *fluxQuery) assembleTendencyFlux(start, end time.Time, bucket, table, filters, fieldConditions, every string, windowOffset string) string { +type AggregateFn string + +const ( + SumFn AggregateFn = "sum" + MaxFn AggregateFn = "max" + MinFn AggregateFn = "min" + AvgFn AggregateFn = "mean" +) + +var ( + fns = map[AggregateFn]struct{}{ + SumFn: {}, + MaxFn: {}, + MinFn: {}, + } +) + +func (f *fluxQuery) assembleTendencyFlux(start, end time.Time, bucket, table, filters, fieldConditions, every, windowOffset string, fn AggregateFn) string { windowOffsetFlux := "" if windowOffset != "" { windowOffsetFlux = fmt.Sprintf(", offset: %s", windowOffset) } + if _, ok := fns[fn]; !ok { + fn = SumFn + } + return fmt.Sprintf(`from(bucket: "%s") |> range(start: %d, stop: %d) |> filter(fn: (r) => r["_measurement"] == "%s") %s %s |> group(columns: ["_field"]) - |> aggregateWindow(every: %s, fn: sum, location: {offset: 0ns, zone: "Asia/Shanghai"}, timeSrc: "_start"%s) + |> aggregateWindow(every: %s, fn: %s, location: {offset: 0ns, zone: "Asia/Shanghai"}, timeSrc: "_start"%s) |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")`, bucket, start.Unix(), end.Unix(), table, - filters, fieldConditions, every, windowOffsetFlux) + filters, fieldConditions, every, string(fn), windowOffsetFlux) } diff --git a/module/monitor/driver/influxdb-v2/flux/monitor_flux.go b/module/monitor/driver/influxdb-v2/flux/monitor_flux.go index 8acbd155..1297f165 100644 --- a/module/monitor/driver/influxdb-v2/flux/monitor_flux.go +++ b/module/monitor/driver/influxdb-v2/flux/monitor_flux.go @@ -2,22 +2,27 @@ package flux // FluxStatistics flux统计通用字段 type FluxStatistics struct { - Total int64 `json:"total"` //总数 - Success int64 `json:"success"` //成功数 - ProxyTotal int64 `json:"p_total"` //转发总数 - ProxySuccess int64 `json:"p_success"` //转发成功数 - TotalTiming int64 `json:"timing"` //平均响应时间 - MaxTiming int64 `json:"timing_max"` //最大响应时间 - MinTiming int64 `json:"timing_min"` //最小响应时间 - TotalRequest int64 `json:"request"` //总请求流量 - RequestMax int64 `json:"request_max"` //最大流量 - RequestMin int64 `json:"request_min"` //最小流量 + Total int64 `json:"total"` //总数 + Success int64 `json:"success"` //成功数 + S2xx int64 `json:"s2xx"` //2xx + ProxyTotal int64 `json:"p_total"` //转发总数 + ProxySuccess int64 `json:"p_success"` //转发成功数 + TotalTiming int64 `json:"timing"` //平均响应时间 + MaxTiming int64 `json:"timing_max"` //最大响应时间 + MinTiming int64 `json:"timing_min"` //最小响应时间 + TotalRequest int64 `json:"request"` //总请求流量 + RequestMax int64 `json:"request_max"` //最大流量 + RequestMin int64 `json:"request_min"` //最小流量 + TotalToken int64 `json:"total_token"` //总token流量 + TokenMax int64 `json:"total_token_max"` //最大token流量 + TokenMin int64 `json:"total_token_min"` //最小token流量 } // FluxWarnStatistics flux统计告警通用字段 type FluxWarnStatistics struct { Total int64 `json:"total"` //总数 Success int64 `json:"success"` //成功数 + S2xx int64 `json:"s2xx"` S4xx int64 `json:"s4xx"` S5xx int64 `json:"s5xx"` ProxyTotal int64 `json:"p_total"` //转发总数 diff --git a/module/monitor/driver/influxdb-v2/flux/tasks/day.yaml b/module/monitor/driver/influxdb-v2/flux/tasks/day.yaml new file mode 100644 index 00000000..0ed5c8fe --- /dev/null +++ b/module/monitor/driver/influxdb-v2/flux/tasks/day.yaml @@ -0,0 +1,294 @@ +- + task_name: "apinto_day_request_v1" + cron: "0 0 * * *" + offset: "2m30s" + flux: | + + from(bucket: "apinto/hour") + |> range(start: -1d) + |> filter(fn: (r) => r._measurement == "request") + |> filter( + fn: (r) => + r._field == "total" or r._field == "success" or r._field == "s2xx" or r._field == "s4xx" or r._field == "s5xx" + or + r._field == "timing" or r._field == "request" or r._field == "response" or r._field + == + "retry", + ) + |> group( + columns: [ + "api", + "app", + "upstream", + "method", + "node", + "cluster", + "provider", + "api_kind", + "_field", + ], + ) + |> sum() + |> set(key: "_measurement", value: "request") + |> to( + bucket: "apinto/day", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "node", + "cluster", + "provider", + "api_kind", + ], + ) + from(bucket: "apinto/hour") + |> range(start: -1d) + |> filter(fn: (r) => r._measurement == "request") + |> filter( + fn: (r) => + r._field == "timing_max" or r._field == "request_max" or r._field == "response_max" + or + r._field == "retry_max" + or + r._field == "input_token_max" or r._field == "output_token_max" or r._field == "total_token_max", + ) + |> group( + columns: [ + "api", + "app", + "upstream", + "method", + "node", + "cluster", + "provider", + "api_kind", + "_field", + ], + ) + |> max() + |> set(key: "_measurement", value: "request") + |> to( + bucket: "apinto/day", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "node", + "cluster", + "provider", + "api_kind", + ], + ) + from(bucket: "apinto/hour") + |> range(start: -1d) + |> filter(fn: (r) => r._measurement == "request") + |> filter( + fn: (r) => + r._field == "timing_min" or r._field == "request_min" or r._field == "response_min" + or + r._field == "retry_min" + or + r._field == "input_token_min" or r._field == "output_token_min" or r._field == "total_token_min", + ) + |> group( + columns: [ + "api", + "app", + "upstream", + "method", + "node", + "cluster", + "provider", + "api_kind", + "_field", + ], + ) + |> min() + |> set(key: "_measurement", value: "request") + |> to( + bucket: "apinto/day", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "node", + "cluster", + "provider", + "api_kind", + ], + ) + from(bucket: "apinto/hour") + |> range(start: -1d) + |> filter(fn: (r) => r._measurement == "request") + |> filter( + fn: (r) => + r._field == "timing_avg" or r._field == "request_avg" or r._field == "response_avg" + or + r._field == "input_token_avg" or r._field == "output_token_avg" or r._field == "total_token_avg", + ) + |> group( + columns: [ + "api", + "app", + "upstream", + "method", + "node", + "cluster", + "provider", + "api_kind", + "_field", + ], + ) + |> mean() + |> set(key: "_measurement", value: "request") + |> to( + bucket: "apinto/day", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "node", + "cluster", + "provider", + "api_kind", + ], + ) +- + task_name: "apinto_day_proxy_v1" + cron: "0 0 * * *" + offset: "2m45s" + flux: | + + from(bucket: "apinto/hour") + |> range(start: -1d) + |> filter(fn: (r) => r._measurement == "proxy") + |> filter( + fn: (r) => + r._field == "p_total" or r._field == "p_success" or r._field == "p_s2xx" or r._field == "p_s4xx" or r._field + == + "p_s5xx" or r._field == "p_timing" or r._field == "p_request" or r._field + == + "p_response" or r._field == "p_retry", + ) + |> group( + columns: [ + "api", + "app", + "upstream", + "addr", + "method", + "node", + "cluster", + "provider", + "api_kind", + "_field", + ], + ) + |> sum() + |> set(key: "_measurement", value: "proxy") + |> to( + bucket: "apinto/day", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "addr", + "node", + "cluster", + "provider", + "api_kind", + ], + ) + + from(bucket: "apinto/hour") + |> range(start: -1d) + |> filter(fn: (r) => r._measurement == "proxy") + |> filter( + fn: (r) => + r._field == "p_timing_max" or r._field == "p_request_max" or r._field + == + "p_response_max" or r._field == "p_retry_max", + ) + |> group( + columns: [ + "api", + "app", + "upstream", + "addr", + "method", + "node", + "cluster", + "provider", + "api_kind", + "_field", + ], + ) + |> max() + |> set(key: "_measurement", value: "proxy") + |> to( + bucket: "apinto/day", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "addr", + "node", + "cluster", + "provider", + "api_kind", + ], + ) + + from(bucket: "apinto/hour") + |> range(start: -1d) + |> filter(fn: (r) => r._measurement == "proxy") + |> filter( + fn: (r) => + r._field == "p_timing_min" or r._field == "p_request_min" or r._field + == + "p_response_min" or r._field == "p_retry_min", + ) + |> group( + columns: [ + "api", + "app", + "upstream", + "addr", + "method", + "node", + "cluster", + "provider", + "api_kind", + "_field", + ], + ) + |> min() + |> set(key: "_measurement", value: "proxy") + |> to( + bucket: "apinto/day", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "addr", + "node", + "cluster", + "provider", + "api_kind", + ], + ) \ No newline at end of file diff --git a/module/monitor/driver/influxdb-v2/flux/tasks/hour.yaml b/module/monitor/driver/influxdb-v2/flux/tasks/hour.yaml new file mode 100644 index 00000000..611a8f37 --- /dev/null +++ b/module/monitor/driver/influxdb-v2/flux/tasks/hour.yaml @@ -0,0 +1,294 @@ +- + task_name: "apinto_hour_request_v1" + cron: "0 * * * *" + offset: "1m30s" + flux: | + + from(bucket: "apinto/minute") + |> range(start: -1h) + |> filter(fn: (r) => r._measurement == "request") + |> filter( + fn: (r) => + r._field == "total" or r._field == "success" or r._field == "s2xx" or r._field == "s4xx" or r._field == "s5xx" + or + r._field == "timing" or r._field == "request" or r._field == "response" or r._field + == + "retry", + ) + |> group( + columns: [ + "api", + "app", + "upstream", + "method", + "node", + "cluster", + "provider", + "api_kind", + "_field", + "_measurement", + ], + ) + |> sum() + |> to( + bucket: "apinto/hour", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "node", + "cluster", + "provider", + "api_kind", + ], + ) + from(bucket: "apinto/minute") + |> range(start: -1h) + |> filter(fn: (r) => r._measurement == "request") + |> filter( + fn: (r) => + r._field == "timing_max" or r._field == "request_max" or r._field == "response_max" + or + r._field == "retry_max" + or + r._field == "input_token_max" or r._field == "output_token_max" or r._field == "total_token_max", + ) + |> group( + columns: [ + "api", + "app", + "upstream", + "method", + "node", + "cluster", + "provider", + "api_kind", + "_field", + "_measurement", + ], + ) + |> max() + |> to( + bucket: "apinto/hour", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "node", + "cluster", + "provider", + "api_kind", + ], + ) + from(bucket: "apinto/minute") + |> range(start: -1h) + |> filter(fn: (r) => r._measurement == "request") + |> filter( + fn: (r) => + r._field == "timing_min" or r._field == "request_min" or r._field == "response_min" + or + r._field == "retry_min" + or + r._field == "input_token_min" or r._field == "output_token_min" or r._field == "total_token_min", + ) + |> group( + columns: [ + "api", + "app", + "upstream", + "method", + "node", + "cluster", + "provider", + "api_kind", + "_field", + "_measurement", + ], + ) + |> min() + |> to( + bucket: "apinto/hour", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "node", + "cluster", + "provider", + "api_kind", + ], + ) + from(bucket: "apinto/minute") + |> range(start: -1h) + |> filter(fn: (r) => r._measurement == "request") + |> filter( + fn: (r) => + r._field == "timing_avg" or r._field == "request_avg" or r._field == "response_avg" + or + r._field == "input_token_avg" or r._field == "output_token_avg" or r._field == "total_token_avg", + ) + |> group( + columns: [ + "api", + "app", + "upstream", + "method", + "node", + "cluster", + "provider", + "api_kind", + "_field", + "_measurement", + ], + ) + |> mean() + |> to( + bucket: "apinto/hour", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "node", + "cluster", + "provider", + "api_kind", + ], + ) +- + task_name: "apinto_hour_proxy_v1" + cron: "0 * * * *" + offset: "1m45s" + flux: | + + from(bucket: "apinto/minute") + |> range(start: -1h) + |> filter(fn: (r) => r._measurement == "proxy") + |> filter( + fn: (r) => + r._field == "p_total" or r._field == "p_success" or r._field == "p_s2xx" or r._field == "p_s4xx" or r._field + == + "p_s5xx" or r._field == "p_timing" or r._field == "p_request" or r._field + == + "p_response" or r._field == "p_retry", + ) + |> group( + columns: [ + "api", + "app", + "upstream", + "addr", + "method", + "node", + "cluster", + "provider", + "api_kind", + "_field", + "_measurement", + ], + ) + |> sum() + |> to( + bucket: "apinto/hour", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "addr", + "node", + "cluster", + "provider", + "api_kind", + ], + ) + + from(bucket: "apinto/minute") + |> range(start: -1h) + |> filter(fn: (r) => r._measurement == "proxy") + |> filter( + fn: (r) => + r._field == "p_timing_max" or r._field == "p_request_max" or r._field + == + "p_response_max" or r._field == "p_retry_max", + ) + |> group( + columns: [ + "api", + "app", + "upstream", + "addr", + "method", + "node", + "cluster", + "provider", + "api_kind", + "_field", + "_measurement", + ], + ) + |> max() + |> to( + bucket: "apinto/hour", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "addr", + "node", + "cluster", + "provider", + "api_kind", + ], + ) + + from(bucket: "apinto/minute") + |> range(start: -1h) + |> filter(fn: (r) => r._measurement == "proxy") + |> filter( + fn: (r) => + r._field == "p_timing_min" or r._field == "p_request_min" or r._field + == + "p_response_min" or r._field == "p_retry_min", + ) + |> group( + columns: [ + "api", + "app", + "upstream", + "addr", + "method", + "node", + "cluster", + "provider", + "api_kind", + "_field", + "_measurement", + ], + ) + |> min() + |> to( + bucket: "apinto/hour", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "addr", + "node", + "cluster", + "provider", + "api_kind", + ], + ) diff --git a/module/monitor/driver/influxdb-v2/flux/influxdb_config/tasks.yaml b/module/monitor/driver/influxdb-v2/flux/tasks/minute.yaml similarity index 59% rename from module/monitor/driver/influxdb-v2/flux/influxdb_config/tasks.yaml rename to module/monitor/driver/influxdb-v2/flux/tasks/minute.yaml index f5f09c49..51707cb7 100644 --- a/module/monitor/driver/influxdb-v2/flux/influxdb_config/tasks.yaml +++ b/module/monitor/driver/influxdb-v2/flux/tasks/minute.yaml @@ -3,7 +3,6 @@ cron: "* * * * *" offset: "10s" flux: | - request_request = from(bucket: "apinto") |> range(start: -1m) @@ -18,6 +17,7 @@ "node", "cluster", "provider", + "api_kind", "_measurement", ], ) @@ -25,49 +25,69 @@ |> sum() |> set(key: "_field", value: "request") |> to( - bucket: "apinto/minute", - timeColumn: "_start", - tagColumns: [ - "api", - "app", - "method", - "upstream", - "node", - "cluster", - "provider", - ], + bucket: "apinto/minute", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "node", + "cluster", + "provider", + "api_kind", + ], ) request_request |> max() |> set(key: "_field", value: "request_max") |> to( - bucket: "apinto/minute", - tagColumns: [ - "api", - "app", - "method", - "upstream", - "node", - "cluster", - "provider", - ], - timeColumn: "_start", + bucket: "apinto/minute", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "node", + "cluster", + "provider", + "api_kind", + ], + timeColumn: "_start", ) request_request |> min() |> set(key: "_field", value: "request_min") |> to( - bucket: "apinto/minute", - timeColumn: "_start", - tagColumns: [ - "api", - "app", - "method", - "upstream", - "node", - "cluster", - "provider", - ], + bucket: "apinto/minute", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "node", + "cluster", + "provider", + "api_kind", + ], + ) + request_request + |> mean() + |> set(key: "_field", value: "request_avg") + |> to( + bucket: "apinto/minute", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "node", + "cluster", + "provider", + "api_kind", + ], ) - task_name: "apinto_minute_request_response_v1" @@ -89,6 +109,7 @@ "node", "cluster", "provider", + "api_kind", "_measurement", ], ) @@ -107,6 +128,7 @@ "node", "cluster", "provider", + "api_kind", ], ) request_response @@ -122,6 +144,7 @@ "node", "cluster", "provider", + "api_kind", ], timeColumn: "_start", ) @@ -139,6 +162,24 @@ "node", "cluster", "provider", + "api_kind", + ], + ) + request_response + |> mean() + |> set(key: "_field", value: "response_avg") + |> to( + bucket: "apinto/minute", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "node", + "cluster", + "provider", + "api_kind", ], ) - @@ -161,6 +202,7 @@ "node", "cluster", "provider", + "api_kind", "_measurement", ], ) @@ -179,6 +221,7 @@ "node", "cluster", "provider", + "api_kind", ], ) request_retry @@ -194,6 +237,7 @@ "node", "cluster", "provider", + "api_kind", ], timeColumn: "_start", ) @@ -211,6 +255,7 @@ "node", "cluster", "provider", + "api_kind", ], ) - @@ -233,6 +278,7 @@ "node", "cluster", "provider", + "api_kind", "_measurement", ], ) @@ -251,6 +297,7 @@ "node", "cluster", "provider", + "api_kind", ], ) request_status @@ -267,6 +314,25 @@ "node", "cluster", "provider", + "api_kind", + ], + timeColumn: "_start", + ) + request_status + |> filter(fn: (r) => r._value >= 200 and r._value < 300) + |> count() + |> set(key: "_field", value: "s2xx") + |> to( + bucket: "apinto/minute", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "node", + "cluster", + "provider", + "api_kind", ], timeColumn: "_start", ) @@ -285,6 +351,7 @@ "node", "cluster", "provider", + "api_kind", ], ) request_status @@ -302,6 +369,7 @@ "node", "cluster", "provider", + "api_kind", ], ) - @@ -323,6 +391,8 @@ "method", "node", "cluster", + "provider", + "api_kind", "_measurement", ], ) @@ -341,6 +411,7 @@ "node", "cluster", "provider", + "api_kind", ], ) request_timing @@ -356,6 +427,7 @@ "node", "cluster", "provider", + "api_kind", ], timeColumn: "_start", ) @@ -373,6 +445,24 @@ "node", "cluster", "provider", + "api_kind", + ], + ) + request_timing + |> mean() + |> set(key: "_field", value: "timing_avg") + |> to( + bucket: "apinto/minute", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "node", + "cluster", + "provider", + "api_kind", ], ) - @@ -396,6 +486,7 @@ "node", "cluster", "provider", + "api_kind", "_measurement", ], ) @@ -415,6 +506,7 @@ "node", "cluster", "provider", + "api_kind", ], ) proxy_timing @@ -432,6 +524,7 @@ "node", "cluster", "provider", + "api_kind", ], ) proxy_timing @@ -449,6 +542,25 @@ "node", "cluster", "provider", + "api_kind", + ], + ) + proxy_timing + |> mean() + |> set(key: "_field", value: "p_timing_avg") + |> to( + bucket: "apinto/minute", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "addr", + "node", + "cluster", + "provider", + "api_kind", ], ) - @@ -472,6 +584,7 @@ "node", "cluster", "provider", + "api_kind", "_measurement", ], ) @@ -491,6 +604,7 @@ "node", "cluster", "provider", + "api_kind", ], ) proxy_status @@ -508,6 +622,25 @@ "node", "cluster", "provider", + "api_kind", + ], + timeColumn: "_start", + ) + proxy_status + |> filter(fn: (r) => r._value >= 200 and r._value < 300) + |> count() + |> set(key: "_field", value: "p_s2xx") + |> to( + bucket: "apinto/minute", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "node", + "cluster", + "provider", + "api_kind", ], timeColumn: "_start", ) @@ -527,6 +660,7 @@ "node", "cluster", "provider", + "api_kind", ], ) proxy_status @@ -545,6 +679,7 @@ "node", "cluster", "provider", + "api_kind", ], ) - @@ -568,6 +703,7 @@ "node", "cluster", "provider", + "api_kind", "_measurement", ], ) @@ -587,6 +723,7 @@ "node", "cluster", "provider", + "api_kind", ], ) proxy_request @@ -603,6 +740,7 @@ "node", "cluster", "provider", + "api_kind", ], timeColumn: "_start", ) @@ -621,6 +759,7 @@ "node", "cluster", "provider", + "api_kind", ], ) - @@ -644,6 +783,7 @@ "node", "cluster", "provider", + "api_kind", "_measurement", ], ) @@ -662,6 +802,7 @@ "node", "cluster", "provider", + "api_kind", ], ) proxy_response @@ -678,6 +819,7 @@ "node", "cluster", "provider", + "api_kind", ], timeColumn: "_start", ) @@ -696,158 +838,39 @@ "node", "cluster", "provider", + "api_kind", ], ) - - task_name: "apinto_hour_request_v1" - cron: "0 * * * *" - offset: "1m30s" + task_name: "apinto_minute_request_input_token_v1" + cron: "* * * * *" + offset: "28s" flux: | - from(bucket: "apinto/minute") - |> range(start: -1h) - |> filter(fn: (r) => r._measurement == "request") - |> filter( - fn: (r) => - r._field == "total" or r._field == "success" or r._field == "s4xx" or r._field == "s5xx" - or - r._field == "timing" or r._field == "request" or r._field == "response" or r._field - == - "retry", - ) - |> group( - columns: [ - "api", - "app", - "upstream", - "method", - "node", - "cluster", - "provider", - "_field", - "_measurement", - ], - ) + request_input_token = + from(bucket: "apinto") + |> range(start: -1m) + |> filter(fn: (r) => r._measurement == "request") + |> filter(fn: (r) => r._field == "input_token") + |> group( + columns: [ + "api", + "app", + "upstream", + "addr", + "method", + "node", + "cluster", + "provider", + "api_kind", + "_measurement", + ], + ) + request_input_token |> sum() + |> set(key: "_field", value: "input_token") |> to( - bucket: "apinto/hour", - timeColumn: "_start", - tagColumns: [ - "api", - "app", - "method", - "upstream", - "node", - "cluster", - "provider", - ], - ) - from(bucket: "apinto/minute") - |> range(start: -1h) - |> filter(fn: (r) => r._measurement == "request") - |> filter( - fn: (r) => - r._field == "timing_max" or r._field == "request_max" or r._field == "response_max" - or - r._field == "retry_max", - ) - |> group( - columns: [ - "api", - "app", - "upstream", - "method", - "node", - "cluster", - "provider", - "_field", - "_measurement", - ], - ) - |> max() - |> to( - bucket: "apinto/hour", - timeColumn: "_start", - tagColumns: [ - "api", - "app", - "method", - "upstream", - "node", - "cluster", - "provider", - ], - ) - from(bucket: "apinto/minute") - |> range(start: -1h) - |> filter(fn: (r) => r._measurement == "request") - |> filter( - fn: (r) => - r._field == "timing_min" or r._field == "request_min" or r._field == "response_min" - or - r._field == "retry_min", - ) - |> group( - columns: [ - "api", - "app", - "upstream", - "method", - "node", - "cluster", - "provider", - "_field", - "_measurement", - ], - ) - |> max() - |> to( - bucket: "apinto/hour", - timeColumn: "_start", - tagColumns: [ - "api", - "app", - "method", - "upstream", - "node", - "cluster", - "provider", - ], - ) -- - task_name: "apinto_hour_proxy_v1" - cron: "0 * * * *" - offset: "1m45s" - flux: | - - from(bucket: "apinto/minute") - |> range(start: -1h) - |> filter(fn: (r) => r._measurement == "proxy") - |> filter( - fn: (r) => - r._field == "p_total" or r._field == "p_success" or r._field == "p_s4xx" or r._field - == - "p_s5xx" or r._field == "p_timing" or r._field == "p_request" or r._field - == - "p_response" or r._field == "p_retry", - ) - |> group( - columns: [ - "api", - "app", - "upstream", - "addr", - "method", - "node", - "cluster", - "provider", - "_field", - "_measurement", - ], - ) - |> sum() - |> to( - bucket: "apinto/hour", + bucket: "apinto/minute", timeColumn: "_start", tagColumns: [ "api", @@ -858,236 +881,227 @@ "node", "cluster", "provider", + "api_kind", + ], + ) + request_input_token + |> max() + |> set(key: "_field", value: "input_token_max") + |> to( + bucket: "apinto/minute", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "addr", + "node", + "cluster", + "provider", + "api_kind", + ], + timeColumn: "_start", + ) + request_input_token + |> min() + |> set(key: "_field", value: "input_token_min") + |> to( + bucket: "apinto/minute", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "addr", + "node", + "cluster", + "provider", + "api_kind", + ], + ) + request_input_token + |> mean() + |> set(key: "_field", value: "input_token_avg") + |> to( + bucket: "apinto/minute", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "addr", + "node", + "cluster", + "provider", + "api_kind", + ], + ) +- + task_name: "apinto_minute_request_output_token_v1" + cron: "* * * * *" + offset: "30s" + flux: | + + request_output_token = + from(bucket: "apinto") + |> range(start: -1m) + |> filter(fn: (r) => r._measurement == "request") + |> filter(fn: (r) => r._field == "output_token") + |> group( + columns: [ + "api", + "app", + "upstream", + "addr", + "method", + "node", + "cluster", + "provider", + "api_kind", + "_measurement", + ], + ) + request_output_token + |> sum() + |> set(key: "_field", value: "output_token") + |> to( + bucket: "apinto/minute", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "addr", + "node", + "cluster", + "provider", + "api_kind", + ], + ) + request_output_token + |> max() + |> set(key: "_field", value: "output_token_max") + |> to( + bucket: "apinto/minute", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "addr", + "node", + "cluster", + "provider", + "api_kind", + ], + timeColumn: "_start", + ) + request_output_token + |> min() + |> set(key: "_field", value: "output_token_min") + |> to( + bucket: "apinto/minute", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "addr", + "node", + "cluster", + "provider", + "api_kind", + ], + ) + request_output_token + |> mean() + |> set(key: "_field", value: "output_token_avg") + |> to( + bucket: "apinto/minute", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "addr", + "node", + "cluster", + "provider", + "api_kind", ], ) - from(bucket: "apinto/minute") - |> range(start: -1h) - |> filter(fn: (r) => r._measurement == "proxy") - |> filter( - fn: (r) => - r._field == "p_timing_max" or r._field == "p_request_max" or r._field - == - "p_response_max" or r._field == "p_retry_max", - ) - |> group( - columns: [ - "api", - "app", - "upstream", - "addr", - "method", - "node", - "cluster", - "provider", - "_field", - "_measurement", - ], - ) - |> max() - |> to( - bucket: "apinto/hour", - timeColumn: "_start", - tagColumns: [ - "api", - "app", - "method", - "upstream", - "addr", - "node", - "cluster", - "provider", - ], - ) - - from(bucket: "apinto/minute") - |> range(start: -1h) - |> filter(fn: (r) => r._measurement == "proxy") - |> filter( - fn: (r) => - r._field == "p_timing_min" or r._field == "p_request_min" or r._field - == - "p_response_min" or r._field == "p_retry_min", - ) - |> group( - columns: [ - "api", - "app", - "upstream", - "addr", - "method", - "node", - "cluster", - "provider", - "_field", - "_measurement", - ], - ) - |> max() - |> to( - bucket: "apinto/hour", - timeColumn: "_start", - tagColumns: [ - "api", - "app", - "method", - "upstream", - "addr", - "node", - "cluster", - "provider", - ], - ) - - task_name: "apinto_day_request_v1" - cron: "0 0 * * *" - offset: "2m30s" + task_name: "apinto_minute_request_total_token_v1" + cron: "* * * * *" + offset: "32s" flux: | - from(bucket: "apinto/hour") - |> range(start: -1d) - |> filter(fn: (r) => r._measurement == "request") - |> filter( - fn: (r) => - r._field == "total" or r._field == "success" or r._field == "s4xx" or r._field == "s5xx" - or - r._field == "timing" or r._field == "request" or r._field == "response" or r._field - == - "retry", - ) - |> group( - columns: [ - "api", - "app", - "upstream", - "method", - "node", - "cluster", - "provider", - "_field", - ], - ) + request_total_token = + from(bucket: "apinto") + |> range(start: -1m) + |> filter(fn: (r) => r._measurement == "request") + |> filter(fn: (r) => r._field == "total_token") + |> group( + columns: [ + "api", + "app", + "upstream", + "addr", + "method", + "node", + "cluster", + "provider", + "api_kind", + "_measurement", + ], + ) + request_total_token |> sum() - |> set(key: "_measurement", value: "request") + |> set(key: "_field", value: "total_token") |> to( - bucket: "apinto/day", + bucket: "apinto/minute", timeColumn: "_start", tagColumns: [ "api", "app", "method", "upstream", + "addr", "node", "cluster", "provider", + "api_kind", ], ) - from(bucket: "apinto/hour") - |> range(start: -1d) - |> filter(fn: (r) => r._measurement == "request") - |> filter( - fn: (r) => - r._field == "timing_max" or r._field == "request_max" or r._field == "response_max" - or - r._field == "retry_max", - ) - |> group( - columns: [ - "api", - "app", - "upstream", - "method", - "node", - "cluster", - "provider", - "_field", - ], - ) + request_total_token |> max() - |> set(key: "_measurement", value: "request") + |> set(key: "_field", value: "total_token_max") |> to( - bucket: "apinto/day", - timeColumn: "_start", + bucket: "apinto/minute", tagColumns: [ "api", "app", "method", "upstream", - "node", - "cluster", - "provider", - ], - ) - from(bucket: "apinto/hour") - |> range(start: -1d) - |> filter(fn: (r) => r._measurement == "request") - |> filter( - fn: (r) => - r._field == "timing_min" or r._field == "request_min" or r._field == "response_min" - or - r._field == "retry_min", - ) - |> group( - columns: [ - "api", - "app", - "upstream", - "method", - "node", - "cluster", - "provider", - "_field", - ], - ) - |> max() - |> set(key: "_measurement", value: "request") - |> to( - bucket: "apinto/day", - timeColumn: "_start", - tagColumns: [ - "api", - "app", - "method", - "upstream", - "node", - "cluster", - "provider", - ], - ) -- - task_name: "apinto_day_proxy_v1" - cron: "0 0 * * *" - offset: "2m45s" - flux: | - - from(bucket: "apinto/hour") - |> range(start: -1d) - |> filter(fn: (r) => r._measurement == "proxy") - |> filter( - fn: (r) => - r._field == "p_total" or r._field == "p_success" or r._field == "p_s4xx" or r._field - == - "p_s5xx" or r._field == "p_timing" or r._field == "p_request" or r._field - == - "p_response" or r._field == "p_retry", - ) - |> group( - columns: [ - "api", - "app", - "upstream", "addr", - "method", "node", "cluster", "provider", - "_field", + "api_kind", ], + timeColumn: "_start", ) - |> sum() - |> set(key: "_measurement", value: "proxy") + request_total_token + |> min() + |> set(key: "_field", value: "total_token_min") |> to( - bucket: "apinto/day", + bucket: "apinto/minute", timeColumn: "_start", tagColumns: [ "api", @@ -1098,35 +1112,14 @@ "node", "cluster", "provider", + "api_kind", ], ) - - from(bucket: "apinto/hour") - |> range(start: -1d) - |> filter(fn: (r) => r._measurement == "proxy") - |> filter( - fn: (r) => - r._field == "p_timing_max" or r._field == "p_request_max" or r._field - == - "p_response_max" or r._field == "p_retry_max", - ) - |> group( - columns: [ - "api", - "app", - "upstream", - "addr", - "method", - "node", - "cluster", - "provider", - "_field", - ], - ) - |> max() - |> set(key: "_measurement", value: "proxy") + request_total_token + |> mean() + |> set(key: "_field", value: "total_token_avg") |> to( - bucket: "apinto/day", + bucket: "apinto/minute", timeColumn: "_start", tagColumns: [ "api", @@ -1137,282 +1130,6 @@ "node", "cluster", "provider", + "api_kind", ], ) - - from(bucket: "apinto/hour") - |> range(start: -1d) - |> filter(fn: (r) => r._measurement == "proxy") - |> filter( - fn: (r) => - r._field == "p_timing_min" or r._field == "p_request_min" or r._field - == - "p_response_min" or r._field == "p_retry_min", - ) - |> group( - columns: [ - "api", - "app", - "upstream", - "addr", - "method", - "node", - "cluster", - "provider", - "_field", - ], - ) - |> max() - |> set(key: "_measurement", value: "proxy") - |> to( - bucket: "apinto/day", - timeColumn: "_start", - tagColumns: [ - "api", - "app", - "method", - "upstream", - "addr", - "node", - "cluster", - "provider", - ], - ) -- - task_name: "apinto_week_request_v1" - cron: "0 0 * * 1" - offset: "3m30s" - flux: | - - from(bucket: "apinto/day") - |> range(start: -1w) - |> filter(fn: (r) => r._measurement == "request") - |> filter( - fn: (r) => - r._field == "total" or r._field == "success" or r._field == "s4xx" or r._field == "s5xx" - or - r._field == "timing" or r._field == "request" or r._field == "response" or r._field - == - "retry", - ) - |> group( - columns: [ - "api", - "app", - "upstream", - "method", - "node", - "cluster", - "provider", - "_field", - ], - ) - |> sum() - |> set(key: "_measurement", value: "request") - |> to( - bucket: "apinto/week", - timeColumn: "_start", - tagColumns: [ - "api", - "app", - "method", - "upstream", - "node", - "cluster", - "provider", - ], - ) - from(bucket: "apinto/day") - |> range(start: -1w) - |> filter(fn: (r) => r._measurement == "request") - |> filter( - fn: (r) => - r._field == "timing_max" or r._field == "request_max" or r._field == "response_max" - or - r._field == "retry_max", - ) - |> group( - columns: [ - "api", - "app", - "upstream", - "method", - "node", - "cluster", - "provider", - "_field", - ], - ) - |> max() - |> set(key: "_measurement", value: "request") - |> to( - bucket: "apinto/week", - timeColumn: "_start", - tagColumns: [ - "api", - "app", - "method", - "upstream", - "node", - "cluster", - "provider", - ], - ) - from(bucket: "apinto/day") - |> range(start: -1w) - |> filter(fn: (r) => r._measurement == "request") - |> filter( - fn: (r) => - r._field == "timing_min" or r._field == "request_min" or r._field == "response_min" - or - r._field == "retry_min", - ) - |> group( - columns: [ - "api", - "app", - "upstream", - "method", - "node", - "cluster", - "provider", - "_field", - ], - ) - |> max() - |> set(key: "_measurement", value: "request") - |> to( - bucket: "apinto/week", - timeColumn: "_start", - tagColumns: [ - "api", - "app", - "method", - "upstream", - "node", - "cluster", - "provider", - ], - ) -- - task_name: "apinto_week_proxy_v1" - cron: "0 0 * * 1" - offset: "3m45s" - flux: | - - from(bucket: "apinto/day") - |> range(start: -1w) - |> filter(fn: (r) => r._measurement == "proxy") - |> filter( - fn: (r) => - r._field == "p_total" or r._field == "p_success" or r._field == "p_s4xx" or r._field - == - "p_s5xx" or r._field == "p_timing" or r._field == "p_request" or r._field - == - "p_response" or r._field == "p_retry", - ) - |> group( - columns: [ - "api", - "app", - "upstream", - "addr", - "method", - "node", - "cluster", - "provider", - "_field", - ], - ) - |> sum() - |> set(key: "_measurement", value: "proxy") - |> to( - bucket: "apinto/week", - timeColumn: "_start", - tagColumns: [ - "api", - "app", - "method", - "upstream", - "addr", - "node", - "cluster", - "provider", - ], - ) - from(bucket: "apinto/day") - |> range(start: -1w) - |> filter(fn: (r) => r._measurement == "proxy") - |> filter( - fn: (r) => - r._field == "p_timing_max" or r._field == "p_request_max" or r._field - == - "p_response_max" or r._field == "p_retry_max", - ) - |> group( - columns: [ - "api", - "app", - "upstream", - "addr", - "method", - "node", - "cluster", - "provider", - "_field", - ], - ) - |> max() - |> set(key: "_measurement", value: "proxy") - |> to( - bucket: "apinto/week", - timeColumn: "_start", - tagColumns: [ - "api", - "app", - "method", - "upstream", - "addr", - "node", - "cluster", - "provider", - ], - ) - from(bucket: "apinto/day") - |> range(start: -1w) - |> filter(fn: (r) => r._measurement == "proxy") - |> filter( - fn: (r) => - r._field == "p_timing_min" or r._field == "p_request_min" or r._field - == - "p_response_min" or r._field == "p_retry_min", - ) - |> group( - columns: [ - "api", - "app", - "upstream", - "addr", - "method", - "node", - "cluster", - "provider", - "_field", - ], - ) - |> max() - |> set(key: "_measurement", value: "proxy") - |> to( - bucket: "apinto/week", - timeColumn: "_start", - tagColumns: [ - "api", - "app", - "method", - "upstream", - "addr", - "node", - "cluster", - "provider", - ], - ) \ No newline at end of file diff --git a/module/monitor/driver/influxdb-v2/flux/tasks/week.yaml b/module/monitor/driver/influxdb-v2/flux/tasks/week.yaml new file mode 100644 index 00000000..a535ff37 --- /dev/null +++ b/module/monitor/driver/influxdb-v2/flux/tasks/week.yaml @@ -0,0 +1,292 @@ +- + task_name: "apinto_week_request_v1" + cron: "0 0 * * 1" + offset: "3m30s" + flux: | + + from(bucket: "apinto/day") + |> range(start: -1w) + |> filter(fn: (r) => r._measurement == "request") + |> filter( + fn: (r) => + r._field == "total" or r._field == "success" or r._field == "s2xx" or r._field == "s4xx" or r._field == "s5xx" + or + r._field == "timing" or r._field == "request" or r._field == "response" or r._field + == + "retry", + ) + |> group( + columns: [ + "api", + "app", + "upstream", + "method", + "node", + "cluster", + "provider", + "api_kind", + "_field", + ], + ) + |> sum() + |> set(key: "_measurement", value: "request") + |> to( + bucket: "apinto/week", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "node", + "cluster", + "provider", + "api_kind", + ], + ) + from(bucket: "apinto/day") + |> range(start: -1w) + |> filter(fn: (r) => r._measurement == "request") + |> filter( + fn: (r) => + r._field == "timing_max" or r._field == "request_max" or r._field == "response_max" + or + r._field == "retry_max" + or + r._field == "input_token_max" or r._field == "output_token_max" or r._field == "total_token_max", + ) + |> group( + columns: [ + "api", + "app", + "upstream", + "method", + "node", + "cluster", + "provider", + "api_kind", + "_field", + ], + ) + |> max() + |> set(key: "_measurement", value: "request") + |> to( + bucket: "apinto/week", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "node", + "cluster", + "provider", + "api_kind", + ], + ) + from(bucket: "apinto/day") + |> range(start: -1w) + |> filter(fn: (r) => r._measurement == "request") + |> filter( + fn: (r) => + r._field == "timing_min" or r._field == "request_min" or r._field == "response_min" + or + r._field == "retry_min" + or + r._field == "input_token_min" or r._field == "output_token_min" or r._field == "total_token_min", + ) + |> group( + columns: [ + "api", + "app", + "upstream", + "method", + "node", + "cluster", + "provider", + "api_kind", + "_field", + ], + ) + |> min() + |> set(key: "_measurement", value: "request") + |> to( + bucket: "apinto/week", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "node", + "cluster", + "provider", + "api_kind", + ], + ) + from(bucket: "apinto/day") + |> range(start: -1w) + |> filter(fn: (r) => r._measurement == "request") + |> filter( + fn: (r) => + r._field == "timing_avg" or r._field == "request_avg" or r._field == "response_avg" + or + r._field == "input_token_avg" or r._field == "output_token_avg" or r._field == "total_token_avg", + ) + |> group( + columns: [ + "api", + "app", + "upstream", + "method", + "node", + "cluster", + "provider", + "api_kind", + "_field", + ], + ) + |> mean() + |> set(key: "_measurement", value: "request") + |> to( + bucket: "apinto/week", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "node", + "cluster", + "provider", + "api_kind", + ], + ) +- + task_name: "apinto_week_proxy_v1" + cron: "0 0 * * 1" + offset: "3m45s" + flux: | + + from(bucket: "apinto/day") + |> range(start: -1w) + |> filter(fn: (r) => r._measurement == "proxy") + |> filter( + fn: (r) => + r._field == "p_total" or r._field == "p_success" or r._field == "p_s2xx" or r._field == "p_s4xx" or r._field + == + "p_s5xx" or r._field == "p_timing" or r._field == "p_request" or r._field + == + "p_response" or r._field == "p_retry", + ) + |> group( + columns: [ + "api", + "app", + "upstream", + "addr", + "method", + "node", + "cluster", + "provider", + "api_kind", + "_field", + ], + ) + |> sum() + |> set(key: "_measurement", value: "proxy") + |> to( + bucket: "apinto/week", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "addr", + "node", + "cluster", + "provider", + "api_kind", + ], + ) + from(bucket: "apinto/day") + |> range(start: -1w) + |> filter(fn: (r) => r._measurement == "proxy") + |> filter( + fn: (r) => + r._field == "p_timing_max" or r._field == "p_request_max" or r._field + == + "p_response_max" or r._field == "p_retry_max", + ) + |> group( + columns: [ + "api", + "app", + "upstream", + "addr", + "method", + "node", + "cluster", + "provider", + "api_kind", + "_field", + ], + ) + |> max() + |> set(key: "_measurement", value: "proxy") + |> to( + bucket: "apinto/week", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "addr", + "node", + "cluster", + "provider", + "api_kind", + ], + ) + from(bucket: "apinto/day") + |> range(start: -1w) + |> filter(fn: (r) => r._measurement == "proxy") + |> filter( + fn: (r) => + r._field == "p_timing_min" or r._field == "p_request_min" or r._field + == + "p_response_min" or r._field == "p_retry_min", + ) + |> group( + columns: [ + "api", + "app", + "upstream", + "addr", + "method", + "node", + "cluster", + "provider", + "api_kind", + "_field", + ], + ) + |> min() + |> set(key: "_measurement", value: "proxy") + |> to( + bucket: "apinto/week", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "addr", + "node", + "cluster", + "provider", + "api_kind", + ], + ) \ No newline at end of file diff --git a/module/monitor/driver/influxdb-v2/flux/tasks_config.go b/module/monitor/driver/influxdb-v2/flux/tasks_config.go index 54b595a4..70725126 100644 --- a/module/monitor/driver/influxdb-v2/flux/tasks_config.go +++ b/module/monitor/driver/influxdb-v2/flux/tasks_config.go @@ -1,13 +1,18 @@ package flux import ( + "embed" _ "embed" + "fmt" + "strings" - "gopkg.in/yaml.v3" + "github.com/eolinker/eosc/log" + + yaml "gopkg.in/yaml.v3" ) -//go:embed influxdb_config/tasks.yaml -var tasksData []byte +//go:embed tasks/*.yaml +var taskReader embed.FS var ( taskList []*TaskConf @@ -22,9 +27,28 @@ type TaskConf struct { func initTasksConfig() { conf := make([]*TaskConf, 0, 15) - err := yaml.Unmarshal(tasksData, &conf) + files, err := taskReader.ReadDir("tasks") if err != nil { - panic(err) + panic(fmt.Sprintf("read tasks dir error: %v", err)) + } + for _, file := range files { + if file.IsDir() || !strings.HasSuffix(file.Name(), ".yaml") { + continue + } + name := fmt.Sprintf("tasks/%s", file.Name()) + data, err := taskReader.ReadFile(name) + if err != nil { + log.Errorf("read file(%s) error: %v", name, err) + continue + } + tmp := make([]*TaskConf, 0, 15) + err = yaml.Unmarshal(data, &tmp) + if err != nil { + log.Errorf("unmarshal file(%s) error: %v", name, err) + continue + } + conf = append(conf, tmp...) + } taskList = conf } diff --git a/module/monitor/dto/output.go b/module/monitor/dto/output.go index 5f6b1660..a69f616a 100644 --- a/module/monitor/dto/output.go +++ b/module/monitor/dto/output.go @@ -138,3 +138,58 @@ type MonitorCluster struct { Name string `json:"name"` Enable bool `json:"enable"` } + +type ChartOverview struct { +} + +type StatusCodeOverview struct { + Status2xx int64 `json:"2xx"` //状态码2xx数 + Status4xx int64 `json:"4xx"` + Status5xx int64 `json:"5xx"` //状态码5xx数 +} + +type TokenOverview struct { + TotalToken int64 `json:"total_token"` //总token流量 + OutputToken int64 `json:"output_token"` + InputToken int64 `json:"input_token"` //最小token流量 +} + +type ChartAIOverview struct { + RequestOverview []*StatusCodeOverview `json:"request_overview"` + AvgRequestPerSubscriber string `json:"avg_request_per_subscriber"` //请求概况 + AvgRequestPerSubscriberOverview []int64 `json:"avg_request_per_subscriber_overview"` //平均响应时间概况 + RequestTotal string `json:"request_total"` + TokenTotal string `json:"token_total"` //总token流量 + TokenOverview []*TokenOverview `json:"token_overview"` //token概况 + AvgTokenOverview []int64 `json:"avg_token_overview"` + AvgTokenPerSubscriberOverview []*TokenOverview `json:"avg_token_per_subscriber_overview"` + AvgToken string `json:"avg_token"` + MaxToken string `json:"max_token"` + MinToken string `json:"min_token"` + AvgTokenPerSubscriber string `json:"avg_token_per_subscriber"` + Date []string `json:"date"` +} + +type ChartRestOverview struct { + RequestOverview []*StatusCodeOverview `json:"request_overview"` //请求概况 + AvgRequestPerSubscriber string `json:"avg_request_per_subscriber"` + AvgRequestPerSubscriberOverview []int64 `json:"avg_request_per_subscriber_overview"` //平均响应时间概况 + RequestTotal string `json:"request_total"` + TrafficOverview []*StatusCodeOverview `json:"traffic_overview"` //流量概况 + AvgResponseTimeOverview []int64 `json:"avg_response_time_overview"` //平均响应时间概况 + AvgTrafficPerSubscriberOverview []int64 `json:"avg_traffic_per_subscriber_overview"` + TrafficTotal string `json:"traffic_total"` + AvgResponseTime string `json:"avg_response_time"` //平均响应时间 + MaxResponseTime string `json:"max_response_time"` //最大响应时间 + MinResponseTime string `json:"min_response_time"` //最小响应时间 + AvgTrafficPerSubscriber string `json:"avg_traffic_per_subscriber"` + Date []string `json:"date"` +} + +type TopN struct { + Id string `json:"id"` + Name string `json:"name"` + Request string `json:"request"` + Traffic string `json:"traffic,omitempty"` + Token string `json:"token,omitempty"` +} diff --git a/module/monitor/format.go b/module/monitor/format.go new file mode 100644 index 00000000..e3440274 --- /dev/null +++ b/module/monitor/format.go @@ -0,0 +1,94 @@ +package monitor + +import ( + "fmt" + "strconv" + "time" +) + +func formatCount(count int64) string { + switch { + case count < 1000: + return strconv.FormatInt(count, 10) + case count < 1000000: + return fmt.Sprintf("%.1fK", float64(count)/1000) + case count < 1000000000: + return fmt.Sprintf("%.1fM", float64(count)/1000000) + case count < 1000000000000: + return fmt.Sprintf("%.1fB", float64(count)/1000000000) + default: + return fmt.Sprintf("%.1fT", float64(count)/1000000000000) + } +} + +func formatByte(b int64) string { + const ( + KB = 1000 + MB = KB * 1000 + GB = MB * 1000 + TB = GB * 1000 + PB = TB * 1000 + ) + + switch { + case b < KB: + return fmt.Sprintf("%dB", b) + case b < MB: + return fmt.Sprintf("%.1fKB", float64(b)/KB) + case b < GB: + return fmt.Sprintf("%.1fMB", float64(b)/MB) + case b < TB: + return fmt.Sprintf("%.1fGB", float64(b)/GB) + case b < PB: + return fmt.Sprintf("%.1fTB", float64(b)/TB) + default: + return fmt.Sprintf("%.1fPB", float64(b)/PB) + } +} + +const ( + oneMinute = 60 + oneHour = 3600 + oneDay = 24 * oneHour + tenDay = 10 * oneDay + oneYear = 365 * oneDay + bucketMinuteRetention = (7 - 1) * oneDay + bucketHourRetention = (90 - 1) * oneDay + bucketDayRetention = (5*365 - 1) * oneDay +) + +// getTimeIntervalAndBucket 根据start和end来获取窗口时间间隔,窗口偏移量offset,以及使用的bucket, 查询的startTime也会格式化 +func getTimeInterval(startTime, endTime time.Time) int64 { + startToNow := time.Now().Unix() - startTime.Unix() + + //结合可使用的最小桶,根据end-start时间间隔来得出合适的桶和趋势图时间间隔 + diff := endTime.Unix() - startTime.Unix() + if diff <= oneHour { + return 5 * oneMinute + } else if diff <= oneDay { + + switch { + case startToNow <= bucketHourRetention: + return oneHour + case startToNow <= bucketDayRetention: + return oneDay + default: + return 7 * oneDay + } + + } else if diff <= tenDay { + + switch { + case startToNow <= bucketHourRetention: + return oneHour + case startToNow <= bucketDayRetention: + return oneDay + default: + return 7 * oneDay + } + + } else { + return 7 * oneDay + } + +} diff --git a/module/monitor/iml.go b/module/monitor/iml.go index 63d127e4..0d4bee98 100644 --- a/module/monitor/iml.go +++ b/module/monitor/iml.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "sort" + "sync" "time" "github.com/APIParkLab/APIPark/gateway" @@ -43,6 +44,374 @@ type imlMonitorStatisticModule struct { apiService api.IAPIService `autowired:""` } +func (i *imlMonitorStatisticModule) genOverviewWhere(ctx context.Context, serviceId string, apiKind []string) ([]monitor.MonWhereItem, error) { + clusterId := cluster.DefaultClusterID + _, err := i.clusterService.Get(ctx, clusterId) + if err != nil { + return nil, err + } + wheres, err := i.genCommonWheres(ctx, clusterId) + if err != nil { + return nil, err + } + if serviceId != "" { + wheres = append(wheres, monitor.MonWhereItem{ + Key: "provider", + Operation: "=", + Values: []string{serviceId}, + }) + } + if len(apiKind) > 0 { + wheres = append(wheres, monitor.MonWhereItem{ + Key: "api_kind", + Operation: "in", + Values: apiKind, + }) + } + return wheres, nil +} + +func (i *imlMonitorStatisticModule) AIChartOverview(ctx context.Context, serviceId string, start int64, end int64) (*monitor_dto.ChartAIOverview, error) { + wheres, err := i.genOverviewWhere(ctx, serviceId, []string{"ai"}) + if err != nil { + return nil, err + } + executor, err := i.getExecutor(ctx, cluster.DefaultClusterID) + if err != nil { + return nil, err + } + + serviceIds := make([]string, 0) + // 从数据库中获取相关信息 + if serviceId == "" { + // 获取全部服务 + list, err := i.serviceService.ServiceListByKind(ctx, service.AIService) + if err != nil { + + return nil, err + } + serviceIds = utils.SliceToSlice(list, func(t *service.Service) string { + return t.Id + }) + } else { + serviceIds = append(serviceIds, serviceId) + } + appMap := make(map[string]struct{}) + for _, sId := range serviceIds { + items, err := i.subscribeService.ListBySubscribeStatus(ctx, sId, subscribe.ApplyStatusSubscribe) + if err != nil { + return nil, err + } + for _, item := range items { + appMap[item.Application] = struct{}{} + } + } + subscriberNum := int64(len(appMap)) + var wg sync.WaitGroup + wg.Add(2) + errChan := make(chan error, 2) + result := new(monitor_dto.ChartAIOverview) + go func() { + defer wg.Done() + date, summary, items, err := executor.RequestOverview(ctx, formatTimeByMinute(start), formatTimeByMinute(end), wheres) + if err != nil { + errChan <- err + return + } + result.Date = utils.SliceToSlice(date, func(t time.Time) string { + return t.Format("2006/01/02 15:04") + }) + result.AvgRequestPerSubscriberOverview = make([]int64, 0, len(items)) + result.RequestOverview = make([]*monitor_dto.StatusCodeOverview, 0, len(items)) + for _, item := range items { + result.AvgRequestPerSubscriberOverview = append(result.AvgRequestPerSubscriberOverview, item.StatusTotal/subscriberNum) + result.RequestOverview = append(result.RequestOverview, &monitor_dto.StatusCodeOverview{ + Status2xx: item.Status2xx, + Status4xx: item.Status4xx, + Status5xx: item.Status5xx, + }) + } + result.AvgRequestPerSubscriber = formatCount(summary.StatusTotal / subscriberNum) + result.RequestTotal = formatCount(summary.StatusTotal) + }() + + go func() { + defer wg.Done() + startTime := formatTimeByMinute(start) + endTime := formatTimeByMinute(end) + _, summary, items, err := executor.TokenOverview(ctx, startTime, endTime, wheres) + if err != nil { + errChan <- err + return + } + timeInterval := getTimeInterval(startTime, endTime) + result.TokenOverview = make([]*monitor_dto.TokenOverview, 0, len(items)) + result.AvgTokenOverview = make([]int64, 0, len(items)) + result.AvgTokenPerSubscriberOverview = make([]*monitor_dto.TokenOverview, 0, len(items)) + var maxToken, minToken int64 = 0, 0 + for _, item := range items { + if maxToken < item.TotalToken { + maxToken = item.TotalToken + } + if minToken == 0 || minToken > item.TotalToken { + minToken = item.TotalToken + } + result.TokenOverview = append(result.TokenOverview, &monitor_dto.TokenOverview{ + TotalToken: item.TotalToken, + OutputToken: item.OutputToken, + InputToken: item.InputToken, + }) + result.AvgTokenOverview = append(result.AvgTokenOverview, item.TotalToken/timeInterval) + result.AvgTokenPerSubscriberOverview = append(result.AvgTokenPerSubscriberOverview, &monitor_dto.TokenOverview{ + TotalToken: item.TotalToken / subscriberNum, + OutputToken: item.OutputToken / subscriberNum, + InputToken: item.InputToken / subscriberNum, + }) + + } + result.AvgTokenPerSubscriber = formatCount(summary.TotalToken / subscriberNum) + result.MaxToken = fmt.Sprintf("%s/s", formatCount(maxToken/timeInterval)) + result.MinToken = fmt.Sprintf("%s/s", formatCount(minToken/timeInterval)) + result.AvgToken = fmt.Sprintf("%s/s", formatCount(summary.OutputToken/timeInterval)) + result.TokenTotal = formatCount(summary.TotalToken) + }() + go func() { + wg.Wait() + close(errChan) + }() + errs := make([]error, 0, 2) + // 收集错误 + for err := range errChan { + errs = append(errs, err) + } + + if len(errs) > 0 { + return nil, fmt.Errorf("errors occurred: %v", errs) + } + return result, nil +} + +func (i *imlMonitorStatisticModule) RestChartOverview(ctx context.Context, serviceId string, start int64, end int64) (*monitor_dto.ChartRestOverview, error) { + wheres, err := i.genOverviewWhere(ctx, serviceId, []string{"rest"}) + if err != nil { + return nil, err + } + executor, err := i.getExecutor(ctx, cluster.DefaultClusterID) + if err != nil { + return nil, err + } + + serviceIds := make([]string, 0) + // 从数据库中获取相关信息 + if serviceId == "" { + // 获取全部服务 + list, err := i.serviceService.ServiceListByKind(ctx, service.RestService) + if err != nil { + return nil, err + } + serviceIds = utils.SliceToSlice(list, func(t *service.Service) string { + return t.Id + }) + } else { + serviceIds = append(serviceIds, serviceId) + } + appMap := make(map[string]struct{}) + for _, sId := range serviceIds { + items, err := i.subscribeService.ListBySubscribeStatus(ctx, sId, subscribe.ApplyStatusSubscribe) + if err != nil { + return nil, err + } + for _, item := range items { + appMap[item.Id] = struct{}{} + } + } + subscriberNum := int64(len(appMap)) + var wg sync.WaitGroup + wg.Add(3) + errChan := make(chan error, 2) + result := new(monitor_dto.ChartRestOverview) + go func() { + defer wg.Done() + date, summary, items, err := executor.RequestOverview(ctx, formatTimeByMinute(start), formatTimeByMinute(end), wheres) + if err != nil { + errChan <- err + return + } + result.Date = utils.SliceToSlice(date, func(t time.Time) string { + return t.Format("2006/01/02 15:04") + }) + result.AvgRequestPerSubscriberOverview = make([]int64, 0, len(items)) + result.RequestOverview = make([]*monitor_dto.StatusCodeOverview, 0, len(items)) + for _, item := range items { + result.AvgRequestPerSubscriberOverview = append(result.AvgRequestPerSubscriberOverview, item.StatusTotal/int64(subscriberNum)) + result.RequestOverview = append(result.RequestOverview, &monitor_dto.StatusCodeOverview{ + Status2xx: item.Status2xx, + Status4xx: item.Status4xx, + Status5xx: item.Status5xx, + }) + } + result.AvgRequestPerSubscriber = formatCount(summary.StatusTotal / subscriberNum) + result.RequestTotal = formatCount(summary.StatusTotal) + }() + + go func() { + defer wg.Done() + startTime := formatTimeByMinute(start) + endTime := formatTimeByMinute(end) + _, summary, items, err := executor.AvgResponseTimeOverview(ctx, startTime, endTime, wheres) + if err != nil { + errChan <- err + return + } + result.AvgResponseTimeOverview = items + result.AvgResponseTime = fmt.Sprintf("%dms", summary.Avg) + result.MaxResponseTime = fmt.Sprintf("%dms", summary.Max) + result.MinResponseTime = fmt.Sprintf("%dms", summary.Min) + }() + + go func() { + defer wg.Done() + startTime := formatTimeByMinute(start) + endTime := formatTimeByMinute(end) + _, summary, items, err := executor.TrafficOverviewByStatusCode(ctx, startTime, endTime, wheres) + if err != nil { + errChan <- err + return + } + result.TrafficOverview = make([]*monitor_dto.StatusCodeOverview, 0, len(items)) + result.AvgTrafficPerSubscriberOverview = make([]int64, 0, len(items)) + for _, item := range items { + result.TrafficOverview = append(result.TrafficOverview, &monitor_dto.StatusCodeOverview{ + Status2xx: item.Status2xx, + Status4xx: item.Status4xx, + Status5xx: item.Status5xx, + }) + result.AvgTrafficPerSubscriberOverview = append(result.AvgTrafficPerSubscriberOverview, item.StatusTotal/subscriberNum) + } + result.AvgTrafficPerSubscriber = formatCount(summary.StatusTotal / subscriberNum) + }() + go func() { + wg.Wait() + close(errChan) + }() + errs := make([]error, 0, 3) + // 收集错误 + for err := range errChan { + errs = append(errs, err) + } + + if len(errs) > 0 { + return nil, fmt.Errorf("errors occurred: %v", errs) + } + return result, nil +} + +func generateTopN(id string, name string, item *monitor.TopN, apiKind string) *monitor_dto.TopN { + n := &monitor_dto.TopN{ + Id: id, + Name: name, + Request: formatCount(item.Request), + } + switch apiKind { + case "rest": + n.Traffic = formatByte(item.Traffic) + case "ai": + n.Token = formatCount(item.Token) + } + return n +} + +func (i *imlMonitorStatisticModule) Top(ctx context.Context, serviceId string, start int64, end int64, limit int, apiKind string) ([]*monitor_dto.TopN, []*monitor_dto.TopN, error) { + wheres, err := i.genOverviewWhere(ctx, serviceId, []string{apiKind}) + if err != nil { + return nil, nil, err + } + + executor, err := i.getExecutor(ctx, cluster.DefaultClusterID) + if err != nil { + return nil, nil, err + } + + errChan := make(chan error, 2) + var wg sync.WaitGroup + apisResult, consumersResult := make([]*monitor_dto.TopN, 0), make([]*monitor_dto.TopN, 0) + var errs []error + + wg.Add(2) + + go func() { + defer wg.Done() + result, err := executor.TopN(ctx, formatTimeByMinute(start), formatTimeByMinute(end), limit, "api", wheres) + if err != nil { + errChan <- err + return + } + if len(result) < 1 { + return + } + apiIds := utils.SliceToSlice(result, func(t *monitor.TopN) string { + return t.Key + }) + apis, err := i.apiService.ListInfo(ctx, apiIds...) + if err != nil { + errChan <- err + return + } + apiMap := utils.SliceToMap(apis, func(t *api.Info) string { + return t.UUID + }) + for _, item := range result { + if v, ok := apiMap[item.Key]; ok { + apisResult = append(apisResult, generateTopN(v.UUID, v.Name, item, apiKind)) + } + } + }() + + go func() { + defer wg.Done() + result, err := executor.TopN(ctx, formatTimeByMinute(start), formatTimeByMinute(end), limit, "app", wheres) + if err != nil { + errChan <- err + return + } + if len(result) < 1 { + return + } + appIds := utils.SliceToSlice(result, func(t *monitor.TopN) string { + return t.Key + }) + apps, err := i.serviceService.AppList(ctx, appIds...) + if err != nil { + errChan <- err + return + } + appMap := utils.SliceToMap(apps, func(t *service.Service) string { + return t.Id + }) + for _, item := range result { + if v, ok := appMap[item.Key]; ok { + consumersResult = append(consumersResult, generateTopN(v.Id, v.Name, item, apiKind)) + } + } + }() + + // 收集所有错误 + go func() { + wg.Wait() + close(errChan) + }() + + // 收集错误 + for err := range errChan { + errs = append(errs, err) + } + + if len(errs) > 0 { + return nil, nil, fmt.Errorf("errors occurred: %v", errs) + } + return apisResult, consumersResult, nil +} + func (i *imlMonitorStatisticModule) ApiStatistics(ctx context.Context, input *monitor_dto.StatisticInput) ([]*monitor_dto.ApiStatisticBasicItem, error) { clusterId := cluster.DefaultClusterID _, err := i.clusterService.Get(ctx, clusterId) diff --git a/module/monitor/monitor.go b/module/monitor/monitor.go index ba897c70..5f0e140f 100644 --- a/module/monitor/monitor.go +++ b/module/monitor/monitor.go @@ -43,6 +43,10 @@ type IMonitorStatisticModule interface { ApiStatisticsOnProvider(ctx context.Context, providerId string, input *monitor_dto.StatisticInput) ([]*monitor_dto.ApiStatisticBasicItem, error) ApiStatisticsOnSubscriber(ctx context.Context, subscriberId string, input *monitor_dto.StatisticInput) ([]*monitor_dto.ApiStatisticBasicItem, error) SubscriberStatisticsOnApi(ctx context.Context, apiId string, input *monitor_dto.StatisticInput) ([]*monitor_dto.ServiceStatisticBasicItem, error) + + AIChartOverview(ctx context.Context, serviceId string, start int64, end int64) (*monitor_dto.ChartAIOverview, error) + RestChartOverview(ctx context.Context, serviceId string, start int64, end int64) (*monitor_dto.ChartRestOverview, error) + Top(ctx context.Context, serviceId string, start int64, end int64, limit int, apiKind string) ([]*monitor_dto.TopN, []*monitor_dto.TopN, error) } type IMonitorConfigModule interface { diff --git a/module/publish/iml.go b/module/publish/iml.go index b39db372..05e02899 100644 --- a/module/publish/iml.go +++ b/module/publish/iml.go @@ -66,28 +66,28 @@ type imlPublishModule struct { func (m *imlPublishModule) initGateway(ctx context.Context, partitionId string, clientDriver gateway.IClientDriver) error { return nil - projects, err := m.serviceService.List(ctx) - if err != nil { - return err - } - projectIds := utils.SliceToSlice(projects, func(p *service.Service) string { - return p.Id - }) - for _, projectId := range projectIds { - releaseInfo, err := m.GetProjectRelease(ctx, projectId, partitionId) - if err != nil { - return err - } - if releaseInfo == nil { - continue - } - - err = clientDriver.Project().Online(ctx, releaseInfo) - if err != nil { - return err - } - } - return nil + //projects, err := m.serviceService.List(ctx) + //if err != nil { + // return err + //} + //projectIds := utils.SliceToSlice(projects, func(p *service.Service) string { + // return p.Id + //}) + //for _, projectId := range projectIds { + // releaseInfo, err := m.GetProjectRelease(ctx, projectId, partitionId) + // if err != nil { + // return err + // } + // if releaseInfo == nil { + // continue + // } + // + // err = clientDriver.Project().Online(ctx, releaseInfo) + // if err != nil { + // return err + // } + //} + //return nil } func (m *imlPublishModule) getProjectRelease(ctx context.Context, projectID string, commitId string) (*gateway.ProjectRelease, error) { @@ -110,6 +110,10 @@ func (m *imlPublishModule) getProjectRelease(ctx context.Context, projectID stri strategyCommitIds = append(strategyCommitIds, c.Commit) } } + serviceInfo, err := m.serviceService.Get(ctx, projectID) + if err != nil { + return nil, err + } apiInfos, err := m.apiService.ListInfo(ctx, apiIds...) if err != nil { @@ -140,6 +144,9 @@ func (m *imlPublishModule) getProjectRelease(ctx context.Context, projectID stri }, Path: a.Path, Methods: a.Methods, + Labels: map[string]string{ + "api_kind": serviceInfo.Kind.String(), + }, //Service: a.Upstream, } if hasUpstream { diff --git a/module/router/dto/output.go b/module/router/dto/output.go index ca5c488b..4f71f0cf 100644 --- a/module/router/dto/output.go +++ b/module/router/dto/output.go @@ -11,6 +11,7 @@ import ( type Item struct { Id string `json:"id"` + Name string `json:"name"` Methods []string `json:"methods"` Protocols []string `json:"protocols"` Path string `json:"request_path"` diff --git a/module/router/iml.go b/module/router/iml.go index a7f368fc..81aa3f2c 100644 --- a/module/router/iml.go +++ b/module/router/iml.go @@ -205,6 +205,7 @@ func (i *imlRouterModule) Search(ctx context.Context, keyword string, serviceId } return &router_dto.Item{ Id: item.UUID, + Name: item.Name, Methods: item.Methods, Protocols: protocols, Path: item.Path, diff --git a/plugins/core/monitor.go b/plugins/core/monitor.go index 64afa039..61975661 100644 --- a/plugins/core/monitor.go +++ b/plugins/core/monitor.go @@ -22,5 +22,10 @@ func (p *plugin) monitorStatisticApis() []pm3.Api { pm3.CreateApiWidthDoc(http.MethodPost, "/api/v1/monitor/:data_type/trend", []string{"context", "rest:data_type", "query:id", "body"}, []string{"tendency", "time_interval"}, p.monitorStatisticController.InvokeTrend), pm3.CreateApiWidthDoc(http.MethodPost, "/api/v1/monitor/:data_type/trend/:typ", []string{"context", "rest:data_type", "rest:typ", "query:api", "query:provider", "query:subscriber", "body"}, []string{"tendency", "time_interval"}, p.monitorStatisticController.InvokeTrendInner), pm3.CreateApiWidthDoc(http.MethodPost, "/api/v1/monitor/:data_type/statistics/:typ", []string{"context", "rest:data_type", "rest:typ", "query:id", "body"}, []string{"statistics"}, p.monitorStatisticController.StatisticsInner), + + pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/monitor/overview/chart/rest", []string{"context", "query:start", "query:end"}, []string{"overview"}, p.monitorStatisticController.ChartRestOverview, access.SystemAnalysisRunViewView), + pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/monitor/overview/chart/ai", []string{"context", "query:start", "query:end"}, []string{"overview"}, p.monitorStatisticController.ChartAIOverview, access.SystemAnalysisRunViewView), + pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/monitor/overview/top10/rest", []string{"context", "query:start", "query:end", "query:limit"}, []string{"apis", "consumers"}, p.monitorStatisticController.RestTopN, access.SystemAnalysisRunViewView), + pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/monitor/overview/top10/ai", []string{"context", "query:start", "query:end", "query:limit"}, []string{"apis", "consumers"}, p.monitorStatisticController.AITopN, access.SystemAnalysisRunViewView), } } diff --git a/plugins/core/service.go b/plugins/core/service.go index 60a48a69..eb800f30 100644 --- a/plugins/core/service.go +++ b/plugins/core/service.go @@ -39,5 +39,9 @@ func (p *plugin) ServiceApis() []pm3.Api { pm3.CreateApiSimple(http.MethodGet, "/api/v1/service/swagger/:id", p.serviceController.Swagger), pm3.CreateApiSimple(http.MethodGet, "/api/v1/service/apidoc/:id", p.serviceController.Swagger), pm3.CreateApiSimple(http.MethodGet, "/api/v1/export/openapi/:id", p.serviceController.ExportSwagger), + + pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/monitor/top10", []string{"context", "query:service", "query:start", "query:end"}, []string{"apis", "consumers"}, p.serviceController.Top10, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView), + pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/overview/monitor/ai", []string{"context", "query:service", "query:start", "query:end"}, []string{"overview"}, p.serviceController.AIChartOverview, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView), + pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/overview/monitor/rest", []string{"context", "query:service", "query:start", "query:end"}, []string{"overview"}, p.serviceController.RestChartOverview, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView), } } diff --git a/service/monitor/model.go b/service/monitor/model.go index 6a6081f3..408d042d 100644 --- a/service/monitor/model.go +++ b/service/monitor/model.go @@ -157,3 +157,29 @@ type MonTrendValues struct { Names []string Values [][]interface{} } + +type StatusCodeOverview struct { + Status2xx int64 + Status4xx int64 + Status5xx int64 + StatusTotal int64 +} + +type TokenOverview struct { + InputToken int64 + OutputToken int64 + TotalToken int64 +} + +type TopN struct { + Key string + Request int64 + Token int64 + Traffic int64 +} + +type Aggregate struct { + Max int64 + Min int64 + Avg int64 +}