finish: monitor overview

This commit is contained in:
Liujian
2025-04-29 00:34:58 +08:00
parent a22759136e
commit cff536710e
23 changed files with 2346 additions and 747 deletions
+61
View File
@@ -2,6 +2,7 @@ package monitor
import (
"fmt"
"strconv"
"time"
"github.com/APIParkLab/APIPark/module/monitor"
@@ -17,6 +18,66 @@ type imlMonitorStatisticController struct {
module monitor.IMonitorStatisticModule `autowired:""`
}
func (i *imlMonitorStatisticController) ChartRestOverview(ctx *gin.Context, start string, end string) (*monitor_dto.ChartRestOverview, error) {
s, e, err := formatTime(start, end)
if err != nil {
return nil, err
}
return i.module.RestChartOverview(ctx, "", s, e)
}
func (i *imlMonitorStatisticController) ChartAIOverview(ctx *gin.Context, start string, end string) (*monitor_dto.ChartAIOverview, error) {
s, e, err := formatTime(start, end)
if err != nil {
return nil, err
}
return i.module.AIChartOverview(ctx, "", s, e)
}
func (i *imlMonitorStatisticController) AITopN(ctx *gin.Context, start string, end string, limit string) ([]*monitor_dto.TopN, []*monitor_dto.TopN, error) {
s, e, err := formatTime(start, end)
if err != nil {
return nil, nil, err
}
l, err := strconv.Atoi(limit)
if err != nil {
if limit == "" {
l = 10
} else {
return nil, nil, fmt.Errorf("parse limit %s error: %w", limit, err)
}
}
return i.module.Top(ctx, "", s, e, l, "ai")
}
func formatTime(start string, end string) (int64, int64, error) {
s, err := strconv.ParseInt(start, 10, 64)
if err != nil {
return 0, 0, fmt.Errorf("parse start time %s error: %w", start, err)
}
e, err := strconv.ParseInt(end, 10, 64)
if err != nil {
return 0, 0, fmt.Errorf("parse end time %s error: %w", end, err)
}
return s, e, nil
}
func (i *imlMonitorStatisticController) RestTopN(ctx *gin.Context, start string, end string, limit string) ([]*monitor_dto.TopN, []*monitor_dto.TopN, error) {
s, e, err := formatTime(start, end)
if err != nil {
return nil, nil, err
}
l, err := strconv.Atoi(limit)
if err != nil {
if limit == "" {
l = 10
} else {
return nil, nil, fmt.Errorf("parse limit %s error: %w", limit, err)
}
}
return i.module.Top(ctx, "", s, e, l, "rest")
}
func (i *imlMonitorStatisticController) Statistics(ctx *gin.Context, dataType string, input *monitor_dto.StatisticInput) (interface{}, error) {
switch dataType {
case monitor_dto.DataTypeApi:
+5
View File
@@ -22,6 +22,11 @@ type IMonitorStatisticController interface {
InvokeTrendInner(ctx *gin.Context, dataType string, typ string, api string, provider string, subscriber string, input *monitor_dto.CommonInput) (*monitor_dto.MonInvokeCountTrend, string, error)
StatisticsInner(ctx *gin.Context, dataType string, typ string, id string, input *monitor_dto.StatisticInput) (interface{}, error)
ChartRestOverview(ctx *gin.Context, start string, end string) (*monitor_dto.ChartRestOverview, error)
ChartAIOverview(ctx *gin.Context, start string, end string) (*monitor_dto.ChartAIOverview, error)
AITopN(ctx *gin.Context, start string, end string, limit string) ([]*monitor_dto.TopN, []*monitor_dto.TopN, error)
RestTopN(ctx *gin.Context, start string, end string, limit string) ([]*monitor_dto.TopN, []*monitor_dto.TopN, error)
}
type IMonitorConfigController interface {
+70 -14
View File
@@ -5,9 +5,13 @@ import (
"fmt"
"io"
"net/http"
"strconv"
"strings"
"time"
"github.com/APIParkLab/APIPark/module/monitor"
monitor_dto "github.com/APIParkLab/APIPark/module/monitor/dto"
ai_provider_local "github.com/APIParkLab/APIPark/ai-provider/local"
subscribe_dto "github.com/APIParkLab/APIPark/module/subscribe/dto"
@@ -66,20 +70,72 @@ var (
)
type imlServiceController struct {
module service.IServiceModule `autowired:""`
docModule service.IServiceDocModule `autowired:""`
subscribeModule subscribe.ISubscribeModule `autowired:""`
aiAPIModule ai_api.IAPIModule `autowired:""`
routerModule router.IRouterModule `autowired:""`
apiDocModule api_doc.IAPIDocModule `autowired:""`
providerModule ai.IProviderModule `autowired:""`
aiLocalModel ai_local.ILocalModelModule `autowired:""`
appModule service.IAppModule `autowired:""`
upstreamModule upstream.IUpstreamModule `autowired:""`
settingModule system.ISettingModule `autowired:""`
teamModule team.ITeamModule `autowired:""`
catalogueModule catalogue.ICatalogueModule `autowired:""`
transaction store.ITransaction `autowired:""`
module service.IServiceModule `autowired:""`
docModule service.IServiceDocModule `autowired:""`
subscribeModule subscribe.ISubscribeModule `autowired:""`
aiAPIModule ai_api.IAPIModule `autowired:""`
routerModule router.IRouterModule `autowired:""`
apiDocModule api_doc.IAPIDocModule `autowired:""`
providerModule ai.IProviderModule `autowired:""`
aiLocalModel ai_local.ILocalModelModule `autowired:""`
appModule service.IAppModule `autowired:""`
upstreamModule upstream.IUpstreamModule `autowired:""`
settingModule system.ISettingModule `autowired:""`
teamModule team.ITeamModule `autowired:""`
catalogueModule catalogue.ICatalogueModule `autowired:""`
monitorModule monitor.IMonitorStatisticModule `autowired:""`
transaction store.ITransaction `autowired:""`
}
func (i *imlServiceController) AIChartOverview(ctx *gin.Context, serviceId string, start string, end string) (*monitor_dto.ChartAIOverview, error) {
s, e, err := formatTime(start, end)
if err != nil {
return nil, err
}
if serviceId == "" {
return nil, fmt.Errorf("service is required")
}
return i.monitorModule.AIChartOverview(ctx, serviceId, s, e)
}
func (i *imlServiceController) RestChartOverview(ctx *gin.Context, serviceId string, start string, end string) (*monitor_dto.ChartRestOverview, error) {
s, e, err := formatTime(start, end)
if err != nil {
return nil, err
}
if serviceId == "" {
return nil, fmt.Errorf("service is required")
}
return i.monitorModule.RestChartOverview(ctx, serviceId, s, e)
}
func formatTime(start string, end string) (int64, int64, error) {
s, err := strconv.ParseInt(start, 10, 64)
if err != nil {
return 0, 0, fmt.Errorf("parse start time %s error: %w", start, err)
}
e, err := strconv.ParseInt(end, 10, 64)
if err != nil {
return 0, 0, fmt.Errorf("parse end time %s error: %w", end, err)
}
return s, e, nil
}
func (i *imlServiceController) Top10(ctx *gin.Context, serviceId string, start string, end string) ([]*monitor_dto.TopN, []*monitor_dto.TopN, error) {
if serviceId == "" {
return nil, nil, fmt.Errorf("serviceId is required")
}
info, err := i.module.Get(ctx, serviceId)
if err != nil {
return nil, nil, err
}
s, e, err := formatTime(start, end)
if err != nil {
return nil, nil, err
}
return i.monitorModule.Top(ctx, serviceId, s, e, 10, info.ServiceKind)
}
func (i *imlServiceController) QuickCreateAIService(ctx *gin.Context, input *service_dto.QuickCreateAIService) error {
+7
View File
@@ -3,6 +3,8 @@ package service
import (
"reflect"
monitor_dto "github.com/APIParkLab/APIPark/module/monitor/dto"
service_dto "github.com/APIParkLab/APIPark/module/service/dto"
"github.com/gin-gonic/gin"
@@ -32,6 +34,11 @@ type IServiceController interface {
Swagger(ctx *gin.Context)
ExportSwagger(ctx *gin.Context)
Top10(ctx *gin.Context, serviceId string, start string, end string) ([]*monitor_dto.TopN, []*monitor_dto.TopN, error)
AIChartOverview(ctx *gin.Context, serviceId string, start string, end string) (*monitor_dto.ChartAIOverview, error)
RestChartOverview(ctx *gin.Context, serviceId string, start string, end string) (*monitor_dto.ChartRestOverview, error)
}
type IAppController interface {
+20
View File
@@ -21,4 +21,24 @@ type IExecutor interface {
InvokeTrend(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) (*monitor.MonInvokeCountTrend, string, error)
ProxyTrend(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) (*monitor.MonInvokeCountTrend, string, error)
MessageTrend(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) (*monitor.MonMessageTrend, string, error)
IBasicOverview
IRestOverview
IAIOverview
}
type IBasicOverview interface {
RequestOverview(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.StatusCodeOverview, []*monitor.StatusCodeOverview, error)
TopN(ctx context.Context, start time.Time, end time.Time, limit int, groupBy string, wheres []monitor.MonWhereItem) ([]*monitor.TopN, error)
}
type IRestOverview interface {
TrafficOverviewByStatusCode(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.StatusCodeOverview, []*monitor.StatusCodeOverview, error)
AvgResponseTimeOverview(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.Aggregate, []int64, error)
}
type IAIOverview interface {
TokenOverview(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.TokenOverview, []*monitor.TokenOverview, error)
}
+240 -6
View File
@@ -3,6 +3,7 @@ package influxdb_v2
import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
@@ -23,6 +24,9 @@ import (
"github.com/APIParkLab/APIPark/service/monitor"
)
var _ driver.IAIOverview = (*executor)(nil)
var _ driver.IRestOverview = (*executor)(nil)
func newExecutor(cfg string, fluxQuery flux.IFluxQuery) (driver.IExecutor, error) {
var data InfluxdbV2Config
err := json.Unmarshal([]byte(cfg), &data)
@@ -147,7 +151,7 @@ func (e *executor) MessageTrend(ctx context.Context, start time.Time, end time.T
fieldsConditions := []string{"request", "response"}
dates, groupValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "request", filters, fieldsConditions, every, windowOffset)
dates, groupValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "request", filters, fieldsConditions, every, windowOffset, flux.SumFn)
if err != nil {
return nil, "", err
}
@@ -166,9 +170,9 @@ func (e *executor) ProxyTrend(ctx context.Context, start time.Time, end time.Tim
filters := formatFilter(wheres)
proxyConditions := []string{"p_total", "p_success", "p_s4xx", "p_s5xx"}
proxyConditions := []string{"p_total", "p_success", "p_s2xx", "p_s4xx", "p_s5xx"}
dates, proxyValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "proxy", filters, proxyConditions, every, windowOffset)
dates, proxyValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "proxy", filters, proxyConditions, every, windowOffset, flux.SumFn)
if err != nil {
return nil, "", err
}
@@ -200,9 +204,9 @@ func (e *executor) InvokeTrend(ctx context.Context, start time.Time, end time.Ti
newStartTime, every, windowOffset, bucket := getTimeIntervalAndBucket(start, end)
filters := formatFilter(wheres)
requestConditions := []string{"total", "success", "s4xx", "s5xx"}
requestConditions := []string{"total", "success", "2xx", "s4xx", "s5xx"}
dates, requestValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "request", filters, requestConditions, every, windowOffset)
dates, requestValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "request", filters, requestConditions, every, windowOffset, flux.SumFn)
if err != nil {
return nil, "", err
}
@@ -221,7 +225,7 @@ func (e *executor) InvokeTrend(ctx context.Context, start time.Time, end time.Ti
proxyConditions := []string{"p_total", "p_success"}
_, proxyValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "proxy", filters, proxyConditions, every, windowOffset)
_, proxyValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "proxy", filters, proxyConditions, every, windowOffset, flux.SumFn)
if err != nil {
return nil, "", err
}
@@ -361,3 +365,233 @@ func (e *executor) CommonStatistics(ctx context.Context, start, end time.Time, g
return resultMap, nil
}
func (e *executor) TrafficOverviewByStatusCode(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.StatusCodeOverview, []*monitor.StatusCodeOverview, error) {
newStartTime, every, windowOffset, bucket := getTimeIntervalAndBucket(start, end)
filters := formatFilter(wheres)
fieldsConditions := []string{"s2xx_request", "s4xx_request", "s5xx_request", "s2xx_response", "s4xx_response", "s5xx_response"}
dates, groupValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "request", filters, fieldsConditions, every, windowOffset, flux.SumFn)
if err != nil {
return nil, nil, nil, err
}
s2xxRequest := groupValues["s2xx_request"]
s4xxRequest := groupValues["s4xx_request"]
s5xxRequest := groupValues["s5xx_request"]
s2xxResponse := groupValues["s2xx_response"]
s4xxResponse := groupValues["s4xx_response"]
s5xxResponse := groupValues["s5xx_response"]
totalOverview := new(monitor.StatusCodeOverview)
result := make([]*monitor.StatusCodeOverview, 0, len(dates))
for i := range dates {
overview := new(monitor.StatusCodeOverview)
overview.Status2xx = s2xxRequest[i] + s2xxResponse[i]
overview.Status4xx = s4xxRequest[i] + s4xxResponse[i]
overview.Status5xx = s5xxRequest[i] + s5xxResponse[i]
overview.StatusTotal = overview.Status2xx + overview.Status4xx + overview.Status5xx
totalOverview.StatusTotal += overview.StatusTotal
totalOverview.Status2xx += overview.Status2xx
totalOverview.Status4xx += overview.Status4xx
totalOverview.Status5xx += overview.Status5xx
result = append(result, overview)
}
return dates, totalOverview, result, nil
}
func (e *executor) aggregateSummary(ctx context.Context, start time.Time, end time.Time, measurement string, bucket string, filters string, fields []string) (map[string]*monitor.Aggregate, error) {
if len(fields) == 0 {
return nil, fmt.Errorf("fields is empty")
}
maxFields := make([]string, 0, len(fields))
minFields := make([]string, 0, len(fields))
avgFields := make([]string, 0, len(fields))
for _, field := range fields {
maxFields = append(maxFields, field+"_max")
minFields = append(minFields, field+"_min")
avgFields = append(avgFields, field+"_avg")
}
maxRes, err := e.fluxQuery.CommonQueryOnce(ctx, e.openApi, start, end, bucket, filters, &flux.StatisticsFilterConf{
Measurement: measurement,
AggregateFn: "max()",
Fields: maxFields,
})
if err != nil {
return nil, err
}
minRes, err := e.fluxQuery.CommonQueryOnce(ctx, e.openApi, start, end, bucket, filters, &flux.StatisticsFilterConf{
Measurement: measurement,
AggregateFn: "min()",
Fields: minFields,
})
if err != nil {
return nil, err
}
avgRes, err := e.fluxQuery.CommonQueryOnce(ctx, e.openApi, start, end, bucket, filters, &flux.StatisticsFilterConf{
Measurement: measurement,
AggregateFn: "mean()",
Fields: avgFields,
})
if err != nil {
return nil, err
}
result := make(map[string]*monitor.Aggregate)
for _, field := range fields {
a := new(monitor.Aggregate)
a.Avg = int64(avgRes[field+"_avg"].(float64))
a.Min = minRes[field+"_min"].(int64)
a.Max = maxRes[field+"_max"].(int64)
result[field] = a
}
return result, nil
}
func (e *executor) AvgResponseTimeOverview(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.Aggregate, []int64, error) {
newStartTime, every, windowOffset, bucket := getTimeIntervalAndBucket(start, end)
filters := formatFilter(wheres)
fieldsConditions := []string{"timing_avg"}
agg, err := e.aggregateSummary(ctx, newStartTime, end, "request", bucket, filters, []string{"timing"})
if err != nil {
return nil, nil, nil, err
}
dates, groupValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "request", filters, fieldsConditions, every, windowOffset, flux.AvgFn)
if err != nil {
return nil, nil, nil, err
}
timingAvg := groupValues["timing_avg"]
timingAvgLen := len(timingAvg)
result := make([]int64, 0, len(dates))
for i := range dates {
if timingAvgLen > i {
result = append(result, timingAvg[i])
}
}
return dates, agg["timing"], result, nil
}
func (e *executor) RequestOverview(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.StatusCodeOverview, []*monitor.StatusCodeOverview, error) {
newStartTime, every, windowOffset, bucket := getTimeIntervalAndBucket(start, end)
filters := formatFilter(wheres)
requestConditions := []string{"total", "s2xx", "s4xx", "s5xx"}
dates, requestValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "request", filters, requestConditions, every, windowOffset, flux.SumFn)
if err != nil {
return nil, nil, nil, err
}
total := requestValues["total"]
totalLen := len(total)
s2xx := requestValues["s2xx"]
s2xxLen := len(s2xx)
s4xx := requestValues["s4xx"]
s4xxLen := len(s4xx)
s5xx := requestValues["s5xx"]
s5xxLen := len(s5xx)
totalOverview := new(monitor.StatusCodeOverview)
result := make([]*monitor.StatusCodeOverview, 0, len(dates))
for i := range dates {
r := new(monitor.StatusCodeOverview)
if totalLen > i {
r.StatusTotal = total[i]
totalOverview.StatusTotal += r.StatusTotal
}
if s2xxLen > i {
r.Status2xx = s2xx[i]
totalOverview.Status2xx += r.Status2xx
}
if s4xxLen > i {
r.Status4xx = s4xx[i]
totalOverview.Status4xx += r.Status4xx
}
if s5xxLen > i {
r.Status5xx = s5xx[i]
totalOverview.Status5xx += r.Status5xx
}
result = append(result, r)
}
return dates, totalOverview, result, nil
}
func (e *executor) TopN(ctx context.Context, start time.Time, end time.Time, limit int, groupBy string, wheres []monitor.MonWhereItem) ([]*monitor.TopN, error) {
filters := formatFilter(wheres)
newStartTime, _, _, bucket := getTimeIntervalAndBucket(start, end)
statisticsConf := []*flux.StatisticsFilterConf{
{
Measurement: "request",
AggregateFn: "sum()",
Fields: []string{"total", "request", "total_token"},
},
{
Measurement: "proxy",
AggregateFn: "sum()",
Fields: []string{"p_total"},
},
}
results, err := e.fluxQuery.CommonStatistics(ctx, e.openApi, newStartTime, end, bucket, groupBy, filters, statisticsConf, limit)
if err != nil {
return nil, err
}
topN := make([]*monitor.TopN, 0, len(results))
for key, result := range results {
n := new(monitor.TopN)
n.Key = key
n.Request = result.Total
n.Token = result.TotalToken
n.Traffic = result.TotalRequest
topN = append(topN, n)
}
return topN, nil
}
func (e *executor) TokenOverview(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.TokenOverview, []*monitor.TokenOverview, error) {
newStartTime, every, windowOffset, bucket := getTimeIntervalAndBucket(start, end)
filters := formatFilter(wheres)
requestConditions := []string{"total_token", "input_token", "output_token"}
dates, requestValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "request", filters, requestConditions, every, windowOffset, flux.SumFn)
if err != nil {
return nil, nil, nil, err
}
total := requestValues["total_token"]
totalLen := len(total)
input := requestValues["input_token"]
inputLen := len(input)
output := requestValues["output_token"]
outputLen := len(output)
totalOverview := new(monitor.TokenOverview)
result := make([]*monitor.TokenOverview, 0, len(dates))
for i := range dates {
r := new(monitor.TokenOverview)
if totalLen > i {
r.TotalToken = total[i]
totalOverview.TotalToken += r.TotalToken
}
if inputLen > i {
r.InputToken = input[i]
totalOverview.InputToken += r.InputToken
}
if outputLen > i {
r.OutputToken = output[i]
totalOverview.OutputToken += r.OutputToken
}
result = append(result, r)
}
return dates, totalOverview, result, nil
}
+35 -11
View File
@@ -14,7 +14,7 @@ import (
type IFluxQuery interface {
CommonStatistics(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, groupBy, filters string, statisticsConf []*StatisticsFilterConf, limit int) (map[string]*FluxStatistics, error)
CommonProxyStatistics(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, groupBy, filters string, statisticsConf []*StatisticsFilterConf, limit int) (map[string]*FluxStatistics, error)
CommonTendency(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, table, filters string, dataFields []string, every, windowOffset string) ([]time.Time, map[string][]int64, error)
CommonTendency(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, table, filters string, dataFields []string, every, windowOffset string, fn AggregateFn) ([]time.Time, map[string][]int64, error)
// CommonQueryOnce 查询只返回一条结果
CommonQueryOnce(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, filters string, fieldsConf *StatisticsFilterConf) (map[string]interface{}, error)
CommonWarnStatistics(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, groupBy, filters string, statisticsConf *StatisticsFilterConf) (map[string]*FluxWarnStatistics, error)
@@ -61,6 +61,9 @@ func (f *fluxQuery) CommonStatistics(ctx context.Context, queryApi api.QueryAPI,
totalRequest := common.FmtIntFromInterface(maps["request"])
maxRequest := common.FmtIntFromInterface(maps["request_max"])
minRequest := common.FmtIntFromInterface(maps["request_min"])
totalToken := common.FmtIntFromInterface(maps["total_token"])
maxToken := common.FmtIntFromInterface(maps["total_token_max"])
minToken := common.FmtIntFromInterface(maps["total_token_min"])
resultMap[key] = &FluxStatistics{
Total: total,
@@ -73,6 +76,9 @@ func (f *fluxQuery) CommonStatistics(ctx context.Context, queryApi api.QueryAPI,
TotalRequest: totalRequest,
RequestMax: maxRequest,
RequestMin: minRequest,
TotalToken: totalToken,
TokenMax: maxToken,
TokenMin: minToken,
}
}
@@ -128,10 +134,10 @@ func (f *fluxQuery) CommonProxyStatistics(ctx context.Context, queryApi api.Quer
return resultMap, nil
}
func (f *fluxQuery) CommonTendency(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, table, filters string, dataFields []string, every, windowOffset string) ([]time.Time, map[string][]int64, error) {
func (f *fluxQuery) CommonTendency(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, table, filters string, dataFields []string, every, windowOffset string, fn AggregateFn) ([]time.Time, map[string][]int64, error) {
fieldConditions := f.assembleTendencyFieldCondition(dataFields)
//拼装请求
query := f.assembleTendencyFlux(start, end, bucket, table, filters, fieldConditions, every, windowOffset)
query := f.assembleTendencyFlux(start, end, bucket, table, filters, fieldConditions, every, windowOffset, fn)
log.Info("flux sql=", query)
result, err := queryApi.Query(ctx, query)
@@ -148,15 +154,12 @@ func (f *fluxQuery) CommonTendency(ctx context.Context, queryApi api.QueryAPI, s
//初始返回内容
dates := make([]time.Time, 0, len(resultList))
resultMap := make(map[string][]int64, len(dataFields))
for _, field := range dataFields {
resultMap[field] = make([]int64, 0, len(resultList))
}
for _, res := range resultList {
for _, field := range dataFields {
resultMap[field] = append(resultMap[field], common.FmtIntFromInterface(res[field]))
}
t, _ := res["_time"].(time.Time)
dates = append(dates, t)
}
@@ -270,7 +273,7 @@ from(bucket: "%s")
}
return fmt.Sprintf(`
union(tables: [
union(tables: [
%s
])
|> pivot(rowKey: ["%s"], columnKey: ["_field"], valueColumn: "_value")
@@ -278,20 +281,41 @@ union(tables: [
`, strings.Join(streams, ",\n"), groupBy, limitStr)
}
func (f *fluxQuery) assembleTendencyFlux(start, end time.Time, bucket, table, filters, fieldConditions, every string, windowOffset string) string {
type AggregateFn string
const (
SumFn AggregateFn = "sum"
MaxFn AggregateFn = "max"
MinFn AggregateFn = "min"
AvgFn AggregateFn = "mean"
)
var (
fns = map[AggregateFn]struct{}{
SumFn: {},
MaxFn: {},
MinFn: {},
}
)
func (f *fluxQuery) assembleTendencyFlux(start, end time.Time, bucket, table, filters, fieldConditions, every, windowOffset string, fn AggregateFn) string {
windowOffsetFlux := ""
if windowOffset != "" {
windowOffsetFlux = fmt.Sprintf(", offset: %s", windowOffset)
}
if _, ok := fns[fn]; !ok {
fn = SumFn
}
return fmt.Sprintf(`from(bucket: "%s")
|> range(start: %d, stop: %d)
|> filter(fn: (r) => r["_measurement"] == "%s")
%s
%s
|> group(columns: ["_field"])
|> aggregateWindow(every: %s, fn: sum, location: {offset: 0ns, zone: "Asia/Shanghai"}, timeSrc: "_start"%s)
|> aggregateWindow(every: %s, fn: %s, location: {offset: 0ns, zone: "Asia/Shanghai"}, timeSrc: "_start"%s)
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")`, bucket, start.Unix(), end.Unix(), table,
filters, fieldConditions, every, windowOffsetFlux)
filters, fieldConditions, every, string(fn), windowOffsetFlux)
}
@@ -2,22 +2,27 @@ package flux
// FluxStatistics flux统计通用字段
type FluxStatistics struct {
Total int64 `json:"total"` //总数
Success int64 `json:"success"` //成功数
ProxyTotal int64 `json:"p_total"` //转发总数
ProxySuccess int64 `json:"p_success"` //转发成功
TotalTiming int64 `json:"timing"` //平均响应时间
MaxTiming int64 `json:"timing_max"` //最大响应时间
MinTiming int64 `json:"timing_min"` //最响应时间
TotalRequest int64 `json:"request"` //总请求流量
RequestMax int64 `json:"request_max"` //最大流量
RequestMin int64 `json:"request_min"` //最流量
Total int64 `json:"total"` //总数
Success int64 `json:"success"` //成功数
S2xx int64 `json:"s2xx"` //2xx
ProxyTotal int64 `json:"p_total"` //转发
ProxySuccess int64 `json:"p_success"` //转发成功数
TotalTiming int64 `json:"timing"` //平均响应时间
MaxTiming int64 `json:"timing_max"` //最响应时间
MinTiming int64 `json:"timing_min"` //最小响应时间
TotalRequest int64 `json:"request"` //总请求流量
RequestMax int64 `json:"request_max"` //最流量
RequestMin int64 `json:"request_min"` //最小流量
TotalToken int64 `json:"total_token"` //总token流量
TokenMax int64 `json:"total_token_max"` //最大token流量
TokenMin int64 `json:"total_token_min"` //最小token流量
}
// FluxWarnStatistics flux统计告警通用字段
type FluxWarnStatistics struct {
Total int64 `json:"total"` //总数
Success int64 `json:"success"` //成功数
S2xx int64 `json:"s2xx"`
S4xx int64 `json:"s4xx"`
S5xx int64 `json:"s5xx"`
ProxyTotal int64 `json:"p_total"` //转发总数
@@ -0,0 +1,294 @@
-
task_name: "apinto_day_request_v1"
cron: "0 0 * * *"
offset: "2m30s"
flux: |
from(bucket: "apinto/hour")
|> range(start: -1d)
|> filter(fn: (r) => r._measurement == "request")
|> filter(
fn: (r) =>
r._field == "total" or r._field == "success" or r._field == "s2xx" or r._field == "s4xx" or r._field == "s5xx"
or
r._field == "timing" or r._field == "request" or r._field == "response" or r._field
==
"retry",
)
|> group(
columns: [
"api",
"app",
"upstream",
"method",
"node",
"cluster",
"provider",
"api_kind",
"_field",
],
)
|> sum()
|> set(key: "_measurement", value: "request")
|> to(
bucket: "apinto/day",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"node",
"cluster",
"provider",
"api_kind",
],
)
from(bucket: "apinto/hour")
|> range(start: -1d)
|> filter(fn: (r) => r._measurement == "request")
|> filter(
fn: (r) =>
r._field == "timing_max" or r._field == "request_max" or r._field == "response_max"
or
r._field == "retry_max"
or
r._field == "input_token_max" or r._field == "output_token_max" or r._field == "total_token_max",
)
|> group(
columns: [
"api",
"app",
"upstream",
"method",
"node",
"cluster",
"provider",
"api_kind",
"_field",
],
)
|> max()
|> set(key: "_measurement", value: "request")
|> to(
bucket: "apinto/day",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"node",
"cluster",
"provider",
"api_kind",
],
)
from(bucket: "apinto/hour")
|> range(start: -1d)
|> filter(fn: (r) => r._measurement == "request")
|> filter(
fn: (r) =>
r._field == "timing_min" or r._field == "request_min" or r._field == "response_min"
or
r._field == "retry_min"
or
r._field == "input_token_min" or r._field == "output_token_min" or r._field == "total_token_min",
)
|> group(
columns: [
"api",
"app",
"upstream",
"method",
"node",
"cluster",
"provider",
"api_kind",
"_field",
],
)
|> min()
|> set(key: "_measurement", value: "request")
|> to(
bucket: "apinto/day",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"node",
"cluster",
"provider",
"api_kind",
],
)
from(bucket: "apinto/hour")
|> range(start: -1d)
|> filter(fn: (r) => r._measurement == "request")
|> filter(
fn: (r) =>
r._field == "timing_avg" or r._field == "request_avg" or r._field == "response_avg"
or
r._field == "input_token_avg" or r._field == "output_token_avg" or r._field == "total_token_avg",
)
|> group(
columns: [
"api",
"app",
"upstream",
"method",
"node",
"cluster",
"provider",
"api_kind",
"_field",
],
)
|> mean()
|> set(key: "_measurement", value: "request")
|> to(
bucket: "apinto/day",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"node",
"cluster",
"provider",
"api_kind",
],
)
-
task_name: "apinto_day_proxy_v1"
cron: "0 0 * * *"
offset: "2m45s"
flux: |
from(bucket: "apinto/hour")
|> range(start: -1d)
|> filter(fn: (r) => r._measurement == "proxy")
|> filter(
fn: (r) =>
r._field == "p_total" or r._field == "p_success" or r._field == "p_s2xx" or r._field == "p_s4xx" or r._field
==
"p_s5xx" or r._field == "p_timing" or r._field == "p_request" or r._field
==
"p_response" or r._field == "p_retry",
)
|> group(
columns: [
"api",
"app",
"upstream",
"addr",
"method",
"node",
"cluster",
"provider",
"api_kind",
"_field",
],
)
|> sum()
|> set(key: "_measurement", value: "proxy")
|> to(
bucket: "apinto/day",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"addr",
"node",
"cluster",
"provider",
"api_kind",
],
)
from(bucket: "apinto/hour")
|> range(start: -1d)
|> filter(fn: (r) => r._measurement == "proxy")
|> filter(
fn: (r) =>
r._field == "p_timing_max" or r._field == "p_request_max" or r._field
==
"p_response_max" or r._field == "p_retry_max",
)
|> group(
columns: [
"api",
"app",
"upstream",
"addr",
"method",
"node",
"cluster",
"provider",
"api_kind",
"_field",
],
)
|> max()
|> set(key: "_measurement", value: "proxy")
|> to(
bucket: "apinto/day",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"addr",
"node",
"cluster",
"provider",
"api_kind",
],
)
from(bucket: "apinto/hour")
|> range(start: -1d)
|> filter(fn: (r) => r._measurement == "proxy")
|> filter(
fn: (r) =>
r._field == "p_timing_min" or r._field == "p_request_min" or r._field
==
"p_response_min" or r._field == "p_retry_min",
)
|> group(
columns: [
"api",
"app",
"upstream",
"addr",
"method",
"node",
"cluster",
"provider",
"api_kind",
"_field",
],
)
|> min()
|> set(key: "_measurement", value: "proxy")
|> to(
bucket: "apinto/day",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"addr",
"node",
"cluster",
"provider",
"api_kind",
],
)
@@ -0,0 +1,294 @@
-
task_name: "apinto_hour_request_v1"
cron: "0 * * * *"
offset: "1m30s"
flux: |
from(bucket: "apinto/minute")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "request")
|> filter(
fn: (r) =>
r._field == "total" or r._field == "success" or r._field == "s2xx" or r._field == "s4xx" or r._field == "s5xx"
or
r._field == "timing" or r._field == "request" or r._field == "response" or r._field
==
"retry",
)
|> group(
columns: [
"api",
"app",
"upstream",
"method",
"node",
"cluster",
"provider",
"api_kind",
"_field",
"_measurement",
],
)
|> sum()
|> to(
bucket: "apinto/hour",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"node",
"cluster",
"provider",
"api_kind",
],
)
from(bucket: "apinto/minute")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "request")
|> filter(
fn: (r) =>
r._field == "timing_max" or r._field == "request_max" or r._field == "response_max"
or
r._field == "retry_max"
or
r._field == "input_token_max" or r._field == "output_token_max" or r._field == "total_token_max",
)
|> group(
columns: [
"api",
"app",
"upstream",
"method",
"node",
"cluster",
"provider",
"api_kind",
"_field",
"_measurement",
],
)
|> max()
|> to(
bucket: "apinto/hour",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"node",
"cluster",
"provider",
"api_kind",
],
)
from(bucket: "apinto/minute")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "request")
|> filter(
fn: (r) =>
r._field == "timing_min" or r._field == "request_min" or r._field == "response_min"
or
r._field == "retry_min"
or
r._field == "input_token_min" or r._field == "output_token_min" or r._field == "total_token_min",
)
|> group(
columns: [
"api",
"app",
"upstream",
"method",
"node",
"cluster",
"provider",
"api_kind",
"_field",
"_measurement",
],
)
|> min()
|> to(
bucket: "apinto/hour",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"node",
"cluster",
"provider",
"api_kind",
],
)
from(bucket: "apinto/minute")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "request")
|> filter(
fn: (r) =>
r._field == "timing_avg" or r._field == "request_avg" or r._field == "response_avg"
or
r._field == "input_token_avg" or r._field == "output_token_avg" or r._field == "total_token_avg",
)
|> group(
columns: [
"api",
"app",
"upstream",
"method",
"node",
"cluster",
"provider",
"api_kind",
"_field",
"_measurement",
],
)
|> mean()
|> to(
bucket: "apinto/hour",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"node",
"cluster",
"provider",
"api_kind",
],
)
-
task_name: "apinto_hour_proxy_v1"
cron: "0 * * * *"
offset: "1m45s"
flux: |
from(bucket: "apinto/minute")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "proxy")
|> filter(
fn: (r) =>
r._field == "p_total" or r._field == "p_success" or r._field == "p_s2xx" or r._field == "p_s4xx" or r._field
==
"p_s5xx" or r._field == "p_timing" or r._field == "p_request" or r._field
==
"p_response" or r._field == "p_retry",
)
|> group(
columns: [
"api",
"app",
"upstream",
"addr",
"method",
"node",
"cluster",
"provider",
"api_kind",
"_field",
"_measurement",
],
)
|> sum()
|> to(
bucket: "apinto/hour",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"addr",
"node",
"cluster",
"provider",
"api_kind",
],
)
from(bucket: "apinto/minute")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "proxy")
|> filter(
fn: (r) =>
r._field == "p_timing_max" or r._field == "p_request_max" or r._field
==
"p_response_max" or r._field == "p_retry_max",
)
|> group(
columns: [
"api",
"app",
"upstream",
"addr",
"method",
"node",
"cluster",
"provider",
"api_kind",
"_field",
"_measurement",
],
)
|> max()
|> to(
bucket: "apinto/hour",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"addr",
"node",
"cluster",
"provider",
"api_kind",
],
)
from(bucket: "apinto/minute")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "proxy")
|> filter(
fn: (r) =>
r._field == "p_timing_min" or r._field == "p_request_min" or r._field
==
"p_response_min" or r._field == "p_retry_min",
)
|> group(
columns: [
"api",
"app",
"upstream",
"addr",
"method",
"node",
"cluster",
"provider",
"api_kind",
"_field",
"_measurement",
],
)
|> min()
|> to(
bucket: "apinto/hour",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"addr",
"node",
"cluster",
"provider",
"api_kind",
],
)
@@ -0,0 +1,292 @@
-
task_name: "apinto_week_request_v1"
cron: "0 0 * * 1"
offset: "3m30s"
flux: |
from(bucket: "apinto/day")
|> range(start: -1w)
|> filter(fn: (r) => r._measurement == "request")
|> filter(
fn: (r) =>
r._field == "total" or r._field == "success" or r._field == "s2xx" or r._field == "s4xx" or r._field == "s5xx"
or
r._field == "timing" or r._field == "request" or r._field == "response" or r._field
==
"retry",
)
|> group(
columns: [
"api",
"app",
"upstream",
"method",
"node",
"cluster",
"provider",
"api_kind",
"_field",
],
)
|> sum()
|> set(key: "_measurement", value: "request")
|> to(
bucket: "apinto/week",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"node",
"cluster",
"provider",
"api_kind",
],
)
from(bucket: "apinto/day")
|> range(start: -1w)
|> filter(fn: (r) => r._measurement == "request")
|> filter(
fn: (r) =>
r._field == "timing_max" or r._field == "request_max" or r._field == "response_max"
or
r._field == "retry_max"
or
r._field == "input_token_max" or r._field == "output_token_max" or r._field == "total_token_max",
)
|> group(
columns: [
"api",
"app",
"upstream",
"method",
"node",
"cluster",
"provider",
"api_kind",
"_field",
],
)
|> max()
|> set(key: "_measurement", value: "request")
|> to(
bucket: "apinto/week",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"node",
"cluster",
"provider",
"api_kind",
],
)
from(bucket: "apinto/day")
|> range(start: -1w)
|> filter(fn: (r) => r._measurement == "request")
|> filter(
fn: (r) =>
r._field == "timing_min" or r._field == "request_min" or r._field == "response_min"
or
r._field == "retry_min"
or
r._field == "input_token_min" or r._field == "output_token_min" or r._field == "total_token_min",
)
|> group(
columns: [
"api",
"app",
"upstream",
"method",
"node",
"cluster",
"provider",
"api_kind",
"_field",
],
)
|> min()
|> set(key: "_measurement", value: "request")
|> to(
bucket: "apinto/week",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"node",
"cluster",
"provider",
"api_kind",
],
)
from(bucket: "apinto/day")
|> range(start: -1w)
|> filter(fn: (r) => r._measurement == "request")
|> filter(
fn: (r) =>
r._field == "timing_avg" or r._field == "request_avg" or r._field == "response_avg"
or
r._field == "input_token_avg" or r._field == "output_token_avg" or r._field == "total_token_avg",
)
|> group(
columns: [
"api",
"app",
"upstream",
"method",
"node",
"cluster",
"provider",
"api_kind",
"_field",
],
)
|> mean()
|> set(key: "_measurement", value: "request")
|> to(
bucket: "apinto/week",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"node",
"cluster",
"provider",
"api_kind",
],
)
-
task_name: "apinto_week_proxy_v1"
cron: "0 0 * * 1"
offset: "3m45s"
flux: |
from(bucket: "apinto/day")
|> range(start: -1w)
|> filter(fn: (r) => r._measurement == "proxy")
|> filter(
fn: (r) =>
r._field == "p_total" or r._field == "p_success" or r._field == "p_s2xx" or r._field == "p_s4xx" or r._field
==
"p_s5xx" or r._field == "p_timing" or r._field == "p_request" or r._field
==
"p_response" or r._field == "p_retry",
)
|> group(
columns: [
"api",
"app",
"upstream",
"addr",
"method",
"node",
"cluster",
"provider",
"api_kind",
"_field",
],
)
|> sum()
|> set(key: "_measurement", value: "proxy")
|> to(
bucket: "apinto/week",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"addr",
"node",
"cluster",
"provider",
"api_kind",
],
)
from(bucket: "apinto/day")
|> range(start: -1w)
|> filter(fn: (r) => r._measurement == "proxy")
|> filter(
fn: (r) =>
r._field == "p_timing_max" or r._field == "p_request_max" or r._field
==
"p_response_max" or r._field == "p_retry_max",
)
|> group(
columns: [
"api",
"app",
"upstream",
"addr",
"method",
"node",
"cluster",
"provider",
"api_kind",
"_field",
],
)
|> max()
|> set(key: "_measurement", value: "proxy")
|> to(
bucket: "apinto/week",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"addr",
"node",
"cluster",
"provider",
"api_kind",
],
)
from(bucket: "apinto/day")
|> range(start: -1w)
|> filter(fn: (r) => r._measurement == "proxy")
|> filter(
fn: (r) =>
r._field == "p_timing_min" or r._field == "p_request_min" or r._field
==
"p_response_min" or r._field == "p_retry_min",
)
|> group(
columns: [
"api",
"app",
"upstream",
"addr",
"method",
"node",
"cluster",
"provider",
"api_kind",
"_field",
],
)
|> min()
|> set(key: "_measurement", value: "proxy")
|> to(
bucket: "apinto/week",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"addr",
"node",
"cluster",
"provider",
"api_kind",
],
)
@@ -1,13 +1,18 @@
package flux
import (
"embed"
_ "embed"
"fmt"
"strings"
"gopkg.in/yaml.v3"
"github.com/eolinker/eosc/log"
yaml "gopkg.in/yaml.v3"
)
//go:embed influxdb_config/tasks.yaml
var tasksData []byte
//go:embed tasks/*.yaml
var taskReader embed.FS
var (
taskList []*TaskConf
@@ -22,9 +27,28 @@ type TaskConf struct {
func initTasksConfig() {
conf := make([]*TaskConf, 0, 15)
err := yaml.Unmarshal(tasksData, &conf)
files, err := taskReader.ReadDir("tasks")
if err != nil {
panic(err)
panic(fmt.Sprintf("read tasks dir error: %v", err))
}
for _, file := range files {
if file.IsDir() || !strings.HasSuffix(file.Name(), ".yaml") {
continue
}
name := fmt.Sprintf("tasks/%s", file.Name())
data, err := taskReader.ReadFile(name)
if err != nil {
log.Errorf("read file(%s) error: %v", name, err)
continue
}
tmp := make([]*TaskConf, 0, 15)
err = yaml.Unmarshal(data, &tmp)
if err != nil {
log.Errorf("unmarshal file(%s) error: %v", name, err)
continue
}
conf = append(conf, tmp...)
}
taskList = conf
}
+55
View File
@@ -138,3 +138,58 @@ type MonitorCluster struct {
Name string `json:"name"`
Enable bool `json:"enable"`
}
type ChartOverview struct {
}
type StatusCodeOverview struct {
Status2xx int64 `json:"2xx"` //状态码2xx数
Status4xx int64 `json:"4xx"`
Status5xx int64 `json:"5xx"` //状态码5xx数
}
type TokenOverview struct {
TotalToken int64 `json:"total_token"` //总token流量
OutputToken int64 `json:"output_token"`
InputToken int64 `json:"input_token"` //最小token流量
}
type ChartAIOverview struct {
RequestOverview []*StatusCodeOverview `json:"request_overview"`
AvgRequestPerSubscriber string `json:"avg_request_per_subscriber"` //请求概况
AvgRequestPerSubscriberOverview []int64 `json:"avg_request_per_subscriber_overview"` //平均响应时间概况
RequestTotal string `json:"request_total"`
TokenTotal string `json:"token_total"` //总token流量
TokenOverview []*TokenOverview `json:"token_overview"` //token概况
AvgTokenOverview []int64 `json:"avg_token_overview"`
AvgTokenPerSubscriberOverview []*TokenOverview `json:"avg_token_per_subscriber_overview"`
AvgToken string `json:"avg_token"`
MaxToken string `json:"max_token"`
MinToken string `json:"min_token"`
AvgTokenPerSubscriber string `json:"avg_token_per_subscriber"`
Date []string `json:"date"`
}
type ChartRestOverview struct {
RequestOverview []*StatusCodeOverview `json:"request_overview"` //请求概况
AvgRequestPerSubscriber string `json:"avg_request_per_subscriber"`
AvgRequestPerSubscriberOverview []int64 `json:"avg_request_per_subscriber_overview"` //平均响应时间概况
RequestTotal string `json:"request_total"`
TrafficOverview []*StatusCodeOverview `json:"traffic_overview"` //流量概况
AvgResponseTimeOverview []int64 `json:"avg_response_time_overview"` //平均响应时间概况
AvgTrafficPerSubscriberOverview []int64 `json:"avg_traffic_per_subscriber_overview"`
TrafficTotal string `json:"traffic_total"`
AvgResponseTime string `json:"avg_response_time"` //平均响应时间
MaxResponseTime string `json:"max_response_time"` //最大响应时间
MinResponseTime string `json:"min_response_time"` //最小响应时间
AvgTrafficPerSubscriber string `json:"avg_traffic_per_subscriber"`
Date []string `json:"date"`
}
type TopN struct {
Id string `json:"id"`
Name string `json:"name"`
Request string `json:"request"`
Traffic string `json:"traffic,omitempty"`
Token string `json:"token,omitempty"`
}
+94
View File
@@ -0,0 +1,94 @@
package monitor
import (
"fmt"
"strconv"
"time"
)
func formatCount(count int64) string {
switch {
case count < 1000:
return strconv.FormatInt(count, 10)
case count < 1000000:
return fmt.Sprintf("%.1fK", float64(count)/1000)
case count < 1000000000:
return fmt.Sprintf("%.1fM", float64(count)/1000000)
case count < 1000000000000:
return fmt.Sprintf("%.1fB", float64(count)/1000000000)
default:
return fmt.Sprintf("%.1fT", float64(count)/1000000000000)
}
}
func formatByte(b int64) string {
const (
KB = 1000
MB = KB * 1000
GB = MB * 1000
TB = GB * 1000
PB = TB * 1000
)
switch {
case b < KB:
return fmt.Sprintf("%dB", b)
case b < MB:
return fmt.Sprintf("%.1fKB", float64(b)/KB)
case b < GB:
return fmt.Sprintf("%.1fMB", float64(b)/MB)
case b < TB:
return fmt.Sprintf("%.1fGB", float64(b)/GB)
case b < PB:
return fmt.Sprintf("%.1fTB", float64(b)/TB)
default:
return fmt.Sprintf("%.1fPB", float64(b)/PB)
}
}
const (
oneMinute = 60
oneHour = 3600
oneDay = 24 * oneHour
tenDay = 10 * oneDay
oneYear = 365 * oneDay
bucketMinuteRetention = (7 - 1) * oneDay
bucketHourRetention = (90 - 1) * oneDay
bucketDayRetention = (5*365 - 1) * oneDay
)
// getTimeIntervalAndBucket 根据start和end来获取窗口时间间隔,窗口偏移量offset,以及使用的bucket, 查询的startTime也会格式化
func getTimeInterval(startTime, endTime time.Time) int64 {
startToNow := time.Now().Unix() - startTime.Unix()
//结合可使用的最小桶,根据end-start时间间隔来得出合适的桶和趋势图时间间隔
diff := endTime.Unix() - startTime.Unix()
if diff <= oneHour {
return 5 * oneMinute
} else if diff <= oneDay {
switch {
case startToNow <= bucketHourRetention:
return oneHour
case startToNow <= bucketDayRetention:
return oneDay
default:
return 7 * oneDay
}
} else if diff <= tenDay {
switch {
case startToNow <= bucketHourRetention:
return oneHour
case startToNow <= bucketDayRetention:
return oneDay
default:
return 7 * oneDay
}
} else {
return 7 * oneDay
}
}
+369
View File
@@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"sort"
"sync"
"time"
"github.com/APIParkLab/APIPark/gateway"
@@ -43,6 +44,374 @@ type imlMonitorStatisticModule struct {
apiService api.IAPIService `autowired:""`
}
func (i *imlMonitorStatisticModule) genOverviewWhere(ctx context.Context, serviceId string, apiKind []string) ([]monitor.MonWhereItem, error) {
clusterId := cluster.DefaultClusterID
_, err := i.clusterService.Get(ctx, clusterId)
if err != nil {
return nil, err
}
wheres, err := i.genCommonWheres(ctx, clusterId)
if err != nil {
return nil, err
}
if serviceId != "" {
wheres = append(wheres, monitor.MonWhereItem{
Key: "provider",
Operation: "=",
Values: []string{serviceId},
})
}
if len(apiKind) > 0 {
wheres = append(wheres, monitor.MonWhereItem{
Key: "api_kind",
Operation: "in",
Values: apiKind,
})
}
return wheres, nil
}
func (i *imlMonitorStatisticModule) AIChartOverview(ctx context.Context, serviceId string, start int64, end int64) (*monitor_dto.ChartAIOverview, error) {
wheres, err := i.genOverviewWhere(ctx, serviceId, []string{"ai"})
if err != nil {
return nil, err
}
executor, err := i.getExecutor(ctx, cluster.DefaultClusterID)
if err != nil {
return nil, err
}
serviceIds := make([]string, 0)
// 从数据库中获取相关信息
if serviceId == "" {
// 获取全部服务
list, err := i.serviceService.ServiceListByKind(ctx, service.AIService)
if err != nil {
return nil, err
}
serviceIds = utils.SliceToSlice(list, func(t *service.Service) string {
return t.Id
})
} else {
serviceIds = append(serviceIds, serviceId)
}
appMap := make(map[string]struct{})
for _, sId := range serviceIds {
items, err := i.subscribeService.ListBySubscribeStatus(ctx, sId, subscribe.ApplyStatusSubscribe)
if err != nil {
return nil, err
}
for _, item := range items {
appMap[item.Application] = struct{}{}
}
}
subscriberNum := int64(len(appMap))
var wg sync.WaitGroup
wg.Add(2)
errChan := make(chan error, 2)
result := new(monitor_dto.ChartAIOverview)
go func() {
defer wg.Done()
date, summary, items, err := executor.RequestOverview(ctx, formatTimeByMinute(start), formatTimeByMinute(end), wheres)
if err != nil {
errChan <- err
return
}
result.Date = utils.SliceToSlice(date, func(t time.Time) string {
return t.Format("2006/01/02 15:04")
})
result.AvgRequestPerSubscriberOverview = make([]int64, 0, len(items))
result.RequestOverview = make([]*monitor_dto.StatusCodeOverview, 0, len(items))
for _, item := range items {
result.AvgRequestPerSubscriberOverview = append(result.AvgRequestPerSubscriberOverview, item.StatusTotal/subscriberNum)
result.RequestOverview = append(result.RequestOverview, &monitor_dto.StatusCodeOverview{
Status2xx: item.Status2xx,
Status4xx: item.Status4xx,
Status5xx: item.Status5xx,
})
}
result.AvgRequestPerSubscriber = formatCount(summary.StatusTotal / subscriberNum)
result.RequestTotal = formatCount(summary.StatusTotal)
}()
go func() {
defer wg.Done()
startTime := formatTimeByMinute(start)
endTime := formatTimeByMinute(end)
_, summary, items, err := executor.TokenOverview(ctx, startTime, endTime, wheres)
if err != nil {
errChan <- err
return
}
timeInterval := getTimeInterval(startTime, endTime)
result.TokenOverview = make([]*monitor_dto.TokenOverview, 0, len(items))
result.AvgTokenOverview = make([]int64, 0, len(items))
result.AvgTokenPerSubscriberOverview = make([]*monitor_dto.TokenOverview, 0, len(items))
var maxToken, minToken int64 = 0, 0
for _, item := range items {
if maxToken < item.TotalToken {
maxToken = item.TotalToken
}
if minToken == 0 || minToken > item.TotalToken {
minToken = item.TotalToken
}
result.TokenOverview = append(result.TokenOverview, &monitor_dto.TokenOverview{
TotalToken: item.TotalToken,
OutputToken: item.OutputToken,
InputToken: item.InputToken,
})
result.AvgTokenOverview = append(result.AvgTokenOverview, item.TotalToken/timeInterval)
result.AvgTokenPerSubscriberOverview = append(result.AvgTokenPerSubscriberOverview, &monitor_dto.TokenOverview{
TotalToken: item.TotalToken / subscriberNum,
OutputToken: item.OutputToken / subscriberNum,
InputToken: item.InputToken / subscriberNum,
})
}
result.AvgTokenPerSubscriber = formatCount(summary.TotalToken / subscriberNum)
result.MaxToken = fmt.Sprintf("%s/s", formatCount(maxToken/timeInterval))
result.MinToken = fmt.Sprintf("%s/s", formatCount(minToken/timeInterval))
result.AvgToken = fmt.Sprintf("%s/s", formatCount(summary.OutputToken/timeInterval))
result.TokenTotal = formatCount(summary.TotalToken)
}()
go func() {
wg.Wait()
close(errChan)
}()
errs := make([]error, 0, 2)
// 收集错误
for err := range errChan {
errs = append(errs, err)
}
if len(errs) > 0 {
return nil, fmt.Errorf("errors occurred: %v", errs)
}
return result, nil
}
func (i *imlMonitorStatisticModule) RestChartOverview(ctx context.Context, serviceId string, start int64, end int64) (*monitor_dto.ChartRestOverview, error) {
wheres, err := i.genOverviewWhere(ctx, serviceId, []string{"rest"})
if err != nil {
return nil, err
}
executor, err := i.getExecutor(ctx, cluster.DefaultClusterID)
if err != nil {
return nil, err
}
serviceIds := make([]string, 0)
// 从数据库中获取相关信息
if serviceId == "" {
// 获取全部服务
list, err := i.serviceService.ServiceListByKind(ctx, service.RestService)
if err != nil {
return nil, err
}
serviceIds = utils.SliceToSlice(list, func(t *service.Service) string {
return t.Id
})
} else {
serviceIds = append(serviceIds, serviceId)
}
appMap := make(map[string]struct{})
for _, sId := range serviceIds {
items, err := i.subscribeService.ListBySubscribeStatus(ctx, sId, subscribe.ApplyStatusSubscribe)
if err != nil {
return nil, err
}
for _, item := range items {
appMap[item.Id] = struct{}{}
}
}
subscriberNum := int64(len(appMap))
var wg sync.WaitGroup
wg.Add(3)
errChan := make(chan error, 2)
result := new(monitor_dto.ChartRestOverview)
go func() {
defer wg.Done()
date, summary, items, err := executor.RequestOverview(ctx, formatTimeByMinute(start), formatTimeByMinute(end), wheres)
if err != nil {
errChan <- err
return
}
result.Date = utils.SliceToSlice(date, func(t time.Time) string {
return t.Format("2006/01/02 15:04")
})
result.AvgRequestPerSubscriberOverview = make([]int64, 0, len(items))
result.RequestOverview = make([]*monitor_dto.StatusCodeOverview, 0, len(items))
for _, item := range items {
result.AvgRequestPerSubscriberOverview = append(result.AvgRequestPerSubscriberOverview, item.StatusTotal/int64(subscriberNum))
result.RequestOverview = append(result.RequestOverview, &monitor_dto.StatusCodeOverview{
Status2xx: item.Status2xx,
Status4xx: item.Status4xx,
Status5xx: item.Status5xx,
})
}
result.AvgRequestPerSubscriber = formatCount(summary.StatusTotal / subscriberNum)
result.RequestTotal = formatCount(summary.StatusTotal)
}()
go func() {
defer wg.Done()
startTime := formatTimeByMinute(start)
endTime := formatTimeByMinute(end)
_, summary, items, err := executor.AvgResponseTimeOverview(ctx, startTime, endTime, wheres)
if err != nil {
errChan <- err
return
}
result.AvgResponseTimeOverview = items
result.AvgResponseTime = fmt.Sprintf("%dms", summary.Avg)
result.MaxResponseTime = fmt.Sprintf("%dms", summary.Max)
result.MinResponseTime = fmt.Sprintf("%dms", summary.Min)
}()
go func() {
defer wg.Done()
startTime := formatTimeByMinute(start)
endTime := formatTimeByMinute(end)
_, summary, items, err := executor.TrafficOverviewByStatusCode(ctx, startTime, endTime, wheres)
if err != nil {
errChan <- err
return
}
result.TrafficOverview = make([]*monitor_dto.StatusCodeOverview, 0, len(items))
result.AvgTrafficPerSubscriberOverview = make([]int64, 0, len(items))
for _, item := range items {
result.TrafficOverview = append(result.TrafficOverview, &monitor_dto.StatusCodeOverview{
Status2xx: item.Status2xx,
Status4xx: item.Status4xx,
Status5xx: item.Status5xx,
})
result.AvgTrafficPerSubscriberOverview = append(result.AvgTrafficPerSubscriberOverview, item.StatusTotal/subscriberNum)
}
result.AvgTrafficPerSubscriber = formatCount(summary.StatusTotal / subscriberNum)
}()
go func() {
wg.Wait()
close(errChan)
}()
errs := make([]error, 0, 3)
// 收集错误
for err := range errChan {
errs = append(errs, err)
}
if len(errs) > 0 {
return nil, fmt.Errorf("errors occurred: %v", errs)
}
return result, nil
}
func generateTopN(id string, name string, item *monitor.TopN, apiKind string) *monitor_dto.TopN {
n := &monitor_dto.TopN{
Id: id,
Name: name,
Request: formatCount(item.Request),
}
switch apiKind {
case "rest":
n.Traffic = formatByte(item.Traffic)
case "ai":
n.Token = formatCount(item.Token)
}
return n
}
func (i *imlMonitorStatisticModule) Top(ctx context.Context, serviceId string, start int64, end int64, limit int, apiKind string) ([]*monitor_dto.TopN, []*monitor_dto.TopN, error) {
wheres, err := i.genOverviewWhere(ctx, serviceId, []string{apiKind})
if err != nil {
return nil, nil, err
}
executor, err := i.getExecutor(ctx, cluster.DefaultClusterID)
if err != nil {
return nil, nil, err
}
errChan := make(chan error, 2)
var wg sync.WaitGroup
apisResult, consumersResult := make([]*monitor_dto.TopN, 0), make([]*monitor_dto.TopN, 0)
var errs []error
wg.Add(2)
go func() {
defer wg.Done()
result, err := executor.TopN(ctx, formatTimeByMinute(start), formatTimeByMinute(end), limit, "api", wheres)
if err != nil {
errChan <- err
return
}
if len(result) < 1 {
return
}
apiIds := utils.SliceToSlice(result, func(t *monitor.TopN) string {
return t.Key
})
apis, err := i.apiService.ListInfo(ctx, apiIds...)
if err != nil {
errChan <- err
return
}
apiMap := utils.SliceToMap(apis, func(t *api.Info) string {
return t.UUID
})
for _, item := range result {
if v, ok := apiMap[item.Key]; ok {
apisResult = append(apisResult, generateTopN(v.UUID, v.Name, item, apiKind))
}
}
}()
go func() {
defer wg.Done()
result, err := executor.TopN(ctx, formatTimeByMinute(start), formatTimeByMinute(end), limit, "app", wheres)
if err != nil {
errChan <- err
return
}
if len(result) < 1 {
return
}
appIds := utils.SliceToSlice(result, func(t *monitor.TopN) string {
return t.Key
})
apps, err := i.serviceService.AppList(ctx, appIds...)
if err != nil {
errChan <- err
return
}
appMap := utils.SliceToMap(apps, func(t *service.Service) string {
return t.Id
})
for _, item := range result {
if v, ok := appMap[item.Key]; ok {
consumersResult = append(consumersResult, generateTopN(v.Id, v.Name, item, apiKind))
}
}
}()
// 收集所有错误
go func() {
wg.Wait()
close(errChan)
}()
// 收集错误
for err := range errChan {
errs = append(errs, err)
}
if len(errs) > 0 {
return nil, nil, fmt.Errorf("errors occurred: %v", errs)
}
return apisResult, consumersResult, nil
}
func (i *imlMonitorStatisticModule) ApiStatistics(ctx context.Context, input *monitor_dto.StatisticInput) ([]*monitor_dto.ApiStatisticBasicItem, error) {
clusterId := cluster.DefaultClusterID
_, err := i.clusterService.Get(ctx, clusterId)
+4
View File
@@ -43,6 +43,10 @@ type IMonitorStatisticModule interface {
ApiStatisticsOnProvider(ctx context.Context, providerId string, input *monitor_dto.StatisticInput) ([]*monitor_dto.ApiStatisticBasicItem, error)
ApiStatisticsOnSubscriber(ctx context.Context, subscriberId string, input *monitor_dto.StatisticInput) ([]*monitor_dto.ApiStatisticBasicItem, error)
SubscriberStatisticsOnApi(ctx context.Context, apiId string, input *monitor_dto.StatisticInput) ([]*monitor_dto.ServiceStatisticBasicItem, error)
AIChartOverview(ctx context.Context, serviceId string, start int64, end int64) (*monitor_dto.ChartAIOverview, error)
RestChartOverview(ctx context.Context, serviceId string, start int64, end int64) (*monitor_dto.ChartRestOverview, error)
Top(ctx context.Context, serviceId string, start int64, end int64, limit int, apiKind string) ([]*monitor_dto.TopN, []*monitor_dto.TopN, error)
}
type IMonitorConfigModule interface {
+29 -22
View File
@@ -66,28 +66,28 @@ type imlPublishModule struct {
func (m *imlPublishModule) initGateway(ctx context.Context, partitionId string, clientDriver gateway.IClientDriver) error {
return nil
projects, err := m.serviceService.List(ctx)
if err != nil {
return err
}
projectIds := utils.SliceToSlice(projects, func(p *service.Service) string {
return p.Id
})
for _, projectId := range projectIds {
releaseInfo, err := m.GetProjectRelease(ctx, projectId, partitionId)
if err != nil {
return err
}
if releaseInfo == nil {
continue
}
err = clientDriver.Project().Online(ctx, releaseInfo)
if err != nil {
return err
}
}
return nil
//projects, err := m.serviceService.List(ctx)
//if err != nil {
// return err
//}
//projectIds := utils.SliceToSlice(projects, func(p *service.Service) string {
// return p.Id
//})
//for _, projectId := range projectIds {
// releaseInfo, err := m.GetProjectRelease(ctx, projectId, partitionId)
// if err != nil {
// return err
// }
// if releaseInfo == nil {
// continue
// }
//
// err = clientDriver.Project().Online(ctx, releaseInfo)
// if err != nil {
// return err
// }
//}
//return nil
}
func (m *imlPublishModule) getProjectRelease(ctx context.Context, projectID string, commitId string) (*gateway.ProjectRelease, error) {
@@ -110,6 +110,10 @@ func (m *imlPublishModule) getProjectRelease(ctx context.Context, projectID stri
strategyCommitIds = append(strategyCommitIds, c.Commit)
}
}
serviceInfo, err := m.serviceService.Get(ctx, projectID)
if err != nil {
return nil, err
}
apiInfos, err := m.apiService.ListInfo(ctx, apiIds...)
if err != nil {
@@ -140,6 +144,9 @@ func (m *imlPublishModule) getProjectRelease(ctx context.Context, projectID stri
},
Path: a.Path,
Methods: a.Methods,
Labels: map[string]string{
"api_kind": serviceInfo.Kind.String(),
},
//Service: a.Upstream,
}
if hasUpstream {
+1
View File
@@ -11,6 +11,7 @@ import (
type Item struct {
Id string `json:"id"`
Name string `json:"name"`
Methods []string `json:"methods"`
Protocols []string `json:"protocols"`
Path string `json:"request_path"`
+1
View File
@@ -205,6 +205,7 @@ func (i *imlRouterModule) Search(ctx context.Context, keyword string, serviceId
}
return &router_dto.Item{
Id: item.UUID,
Name: item.Name,
Methods: item.Methods,
Protocols: protocols,
Path: item.Path,
+5
View File
@@ -22,5 +22,10 @@ func (p *plugin) monitorStatisticApis() []pm3.Api {
pm3.CreateApiWidthDoc(http.MethodPost, "/api/v1/monitor/:data_type/trend", []string{"context", "rest:data_type", "query:id", "body"}, []string{"tendency", "time_interval"}, p.monitorStatisticController.InvokeTrend),
pm3.CreateApiWidthDoc(http.MethodPost, "/api/v1/monitor/:data_type/trend/:typ", []string{"context", "rest:data_type", "rest:typ", "query:api", "query:provider", "query:subscriber", "body"}, []string{"tendency", "time_interval"}, p.monitorStatisticController.InvokeTrendInner),
pm3.CreateApiWidthDoc(http.MethodPost, "/api/v1/monitor/:data_type/statistics/:typ", []string{"context", "rest:data_type", "rest:typ", "query:id", "body"}, []string{"statistics"}, p.monitorStatisticController.StatisticsInner),
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/monitor/overview/chart/rest", []string{"context", "query:start", "query:end"}, []string{"overview"}, p.monitorStatisticController.ChartRestOverview, access.SystemAnalysisRunViewView),
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/monitor/overview/chart/ai", []string{"context", "query:start", "query:end"}, []string{"overview"}, p.monitorStatisticController.ChartAIOverview, access.SystemAnalysisRunViewView),
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/monitor/overview/top10/rest", []string{"context", "query:start", "query:end", "query:limit"}, []string{"apis", "consumers"}, p.monitorStatisticController.RestTopN, access.SystemAnalysisRunViewView),
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/monitor/overview/top10/ai", []string{"context", "query:start", "query:end", "query:limit"}, []string{"apis", "consumers"}, p.monitorStatisticController.AITopN, access.SystemAnalysisRunViewView),
}
}
+4
View File
@@ -39,5 +39,9 @@ func (p *plugin) ServiceApis() []pm3.Api {
pm3.CreateApiSimple(http.MethodGet, "/api/v1/service/swagger/:id", p.serviceController.Swagger),
pm3.CreateApiSimple(http.MethodGet, "/api/v1/service/apidoc/:id", p.serviceController.Swagger),
pm3.CreateApiSimple(http.MethodGet, "/api/v1/export/openapi/:id", p.serviceController.ExportSwagger),
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/monitor/top10", []string{"context", "query:service", "query:start", "query:end"}, []string{"apis", "consumers"}, p.serviceController.Top10, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView),
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/overview/monitor/ai", []string{"context", "query:service", "query:start", "query:end"}, []string{"overview"}, p.serviceController.AIChartOverview, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView),
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/overview/monitor/rest", []string{"context", "query:service", "query:start", "query:end"}, []string{"overview"}, p.serviceController.RestChartOverview, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView),
}
}
+26
View File
@@ -157,3 +157,29 @@ type MonTrendValues struct {
Names []string
Values [][]interface{}
}
type StatusCodeOverview struct {
Status2xx int64
Status4xx int64
Status5xx int64
StatusTotal int64
}
type TokenOverview struct {
InputToken int64
OutputToken int64
TotalToken int64
}
type TopN struct {
Key string
Request int64
Token int64
Traffic int64
}
type Aggregate struct {
Max int64
Min int64
Avg int64
}