From 3a69a179bc1120dac2acf43879a6e5fe81d742c9 Mon Sep 17 00:00:00 2001 From: Liujian <824010343@qq.com> Date: Wed, 21 Aug 2024 23:42:15 +0800 Subject: [PATCH] =?UTF-8?q?=E7=9B=91=E6=8E=A7=E9=83=A8=E5=88=86=E8=BF=81?= =?UTF-8?q?=E7=A7=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 + controller/monitor/iml.go | 77 + controller/monitor/statistic.go | 35 + go.mod | 9 +- go.sum | 16 +- module/dynamic-module/iml.go | 66 +- module/monitor/driver/driver.go | 24 + module/monitor/driver/influxdb-v2/executor.go | 363 +++++ .../driver/influxdb-v2/flux/buckets_config.go | 32 + .../monitor/driver/influxdb-v2/flux/flux.go | 349 ++++ .../flux/influxdb_config/buckets.yaml | 15 + .../flux/influxdb_config/tasks.yaml | 1418 +++++++++++++++++ .../monitor/driver/influxdb-v2/flux/init.go | 18 + .../driver/influxdb-v2/flux/monitor_flux.go | 37 + .../driver/influxdb-v2/flux/tasks_config.go | 34 + .../monitor/driver/influxdb-v2/influxdb-v2.go | 61 + module/monitor/driver/influxdb-v2/util.go | 189 +++ module/monitor/driver/manager.go | 61 + module/monitor/dto/input.go | 35 + module/monitor/dto/output.go | 134 ++ module/monitor/iml.go | 405 +++++ module/monitor/monitor.go | 48 + module/publish/dto/out.go | 6 +- module/service-diff/iml.go | 52 +- plugins/core/core.go | 18 +- plugins/core/monitor.go | 19 + service/monitor/iml.go | 257 +++ service/monitor/model.go | 159 ++ service/monitor/service.go | 53 + service/service/iml.go | 27 +- service/service/service.go | 3 +- service/service_diff/diff.go | 2 +- stores/monitor/model.go | 23 + stores/monitor/store.go | 22 + 34 files changed, 3983 insertions(+), 85 deletions(-) create mode 100644 controller/monitor/iml.go create mode 100644 controller/monitor/statistic.go create mode 100644 module/monitor/driver/driver.go create mode 100644 module/monitor/driver/influxdb-v2/executor.go create mode 100644 module/monitor/driver/influxdb-v2/flux/buckets_config.go create mode 100644 module/monitor/driver/influxdb-v2/flux/flux.go create mode 100644 module/monitor/driver/influxdb-v2/flux/influxdb_config/buckets.yaml create mode 100644 module/monitor/driver/influxdb-v2/flux/influxdb_config/tasks.yaml create mode 100644 module/monitor/driver/influxdb-v2/flux/init.go create mode 100644 module/monitor/driver/influxdb-v2/flux/monitor_flux.go create mode 100644 module/monitor/driver/influxdb-v2/flux/tasks_config.go create mode 100644 module/monitor/driver/influxdb-v2/influxdb-v2.go create mode 100644 module/monitor/driver/influxdb-v2/util.go create mode 100644 module/monitor/driver/manager.go create mode 100644 module/monitor/dto/input.go create mode 100644 module/monitor/dto/output.go create mode 100644 module/monitor/iml.go create mode 100644 module/monitor/monitor.go create mode 100644 plugins/core/monitor.go create mode 100644 service/monitor/iml.go create mode 100644 service/monitor/model.go create mode 100644 service/monitor/service.go create mode 100644 stores/monitor/model.go create mode 100644 stores/monitor/store.go diff --git a/.gitignore b/.gitignore index 17943495..9b16619e 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ /config.yml /build/ /apipark +/aoplatform diff --git a/controller/monitor/iml.go b/controller/monitor/iml.go new file mode 100644 index 00000000..da303824 --- /dev/null +++ b/controller/monitor/iml.go @@ -0,0 +1,77 @@ +package monitor + +import ( + "fmt" + "time" + + "github.com/APIParkLab/APIPark/module/monitor" + monitor_dto "github.com/APIParkLab/APIPark/module/monitor/dto" + "github.com/gin-gonic/gin" +) + +var ( + _ IMonitorStatisticController = (*imlMonitorStatisticController)(nil) +) + +type imlMonitorStatisticController struct { + module monitor.IMonitorStatisticModule `autowired:""` +} + +func (i *imlMonitorStatisticController) OverviewMessageTrend(ctx *gin.Context, input *monitor_dto.CommonInput) ([]time.Time, []float64, []float64, string, error) { + trend, timeInterval, err := i.module.MessageTrend(ctx, input) + if err != nil { + return nil, nil, nil, "", err + } + + return trend.Dates, trend.ReqMessage, trend.RespMessage, timeInterval, nil +} + +func (i *imlMonitorStatisticController) OverviewInvokeTrend(ctx *gin.Context, input *monitor_dto.CommonInput) ([]time.Time, []int64, []int64, []int64, []int64, []float64, []float64, string, error) { + trend, timeInterval, err := i.module.InvokeTrend(ctx, input) + if err != nil { + return nil, nil, nil, nil, nil, nil, nil, "", err + } + + return trend.Date, trend.RequestTotal, trend.ProxyTotal, trend.Status4XX, trend.Status5XX, trend.RequestRate, trend.ProxyRate, timeInterval, nil +} + +func (i *imlMonitorStatisticController) Summary(ctx *gin.Context, input *monitor_dto.CommonInput) (*monitor_dto.MonSummaryOutput, *monitor_dto.MonSummaryOutput, error) { + requestSummary, err := i.module.RequestSummary(ctx, input) + if err != nil { + return nil, nil, err + } + proxySummary, err := i.module.ProxySummary(ctx, input) + if err != nil { + return nil, nil, err + } + return requestSummary, proxySummary, nil +} + +func (i *imlMonitorStatisticController) Top10(ctx *gin.Context, input *monitor_dto.Top10Input) (interface{}, error) { + switch input.DataType { + case monitor_dto.DataTypeApi: + return i.module.TopAPIStatistics(ctx, 10, input.CommonInput) + case monitor_dto.DataTypeProvider: + return i.module.TopProviderStatistics(ctx, 10, input.CommonInput) + case monitor_dto.DataTypeSubscriber: + return i.module.TopSubscriberStatistics(ctx, 10, input.CommonInput) + default: + return nil, fmt.Errorf("unsupported data type: %s", input.DataType) + } +} + +var ( + _ IMonitorConfigController = (*imlMonitorConfig)(nil) +) + +type imlMonitorConfig struct { + module monitor.IMonitorConfigModule `autowired:""` +} + +func (p *imlMonitorConfig) SaveMonitorConfig(ctx *gin.Context, cfg *monitor_dto.SaveMonitorConfig) (*monitor_dto.MonitorConfig, error) { + return p.module.SaveMonitorConfig(ctx, cfg) +} + +func (p *imlMonitorConfig) GetMonitorConfig(ctx *gin.Context) (*monitor_dto.MonitorConfig, error) { + return p.module.GetMonitorConfig(ctx) +} diff --git a/controller/monitor/statistic.go b/controller/monitor/statistic.go new file mode 100644 index 00000000..019c0370 --- /dev/null +++ b/controller/monitor/statistic.go @@ -0,0 +1,35 @@ +package monitor + +import ( + "reflect" + "time" + + "github.com/eolinker/go-common/autowire" + "github.com/gin-gonic/gin" + + monitor_dto "github.com/APIParkLab/APIPark/module/monitor/dto" +) + +type IMonitorStatisticController interface { + Top10(ctx *gin.Context, input *monitor_dto.Top10Input) (interface{}, error) + Summary(ctx *gin.Context, input *monitor_dto.CommonInput) (*monitor_dto.MonSummaryOutput, *monitor_dto.MonSummaryOutput, error) + OverviewInvokeTrend(ctx *gin.Context, input *monitor_dto.CommonInput) ([]time.Time, []int64, []int64, []int64, []int64, []float64, []float64, string, error) + OverviewMessageTrend(ctx *gin.Context, input *monitor_dto.CommonInput) ([]time.Time, []float64, []float64, string, error) + + //Statistics(ctx *gin.Context, dataType string, input *monitor_dto.StatisticInput) (interface{}, error) +} + +type IMonitorConfigController interface { + SaveMonitorConfig(ctx *gin.Context, cfg *monitor_dto.SaveMonitorConfig) (*monitor_dto.MonitorConfig, error) + GetMonitorConfig(ctx *gin.Context) (*monitor_dto.MonitorConfig, error) +} + +func init() { + autowire.Auto[IMonitorStatisticController](func() reflect.Value { + return reflect.ValueOf(new(imlMonitorStatisticController)) + }) + + autowire.Auto[IMonitorConfigController](func() reflect.Value { + return reflect.ValueOf(new(imlMonitorConfig)) + }) +} diff --git a/go.mod b/go.mod index c667d359..50e886be 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/gabriel-vasile/mimetype v1.4.4 github.com/gin-gonic/gin v1.10.0 github.com/google/uuid v1.6.0 + github.com/influxdata/influxdb-client-go/v2 v2.14.0 github.com/urfave/cli/v2 v2.27.2 golang.org/x/crypto v0.24.0 gopkg.in/yaml.v3 v3.0.1 @@ -18,6 +19,7 @@ require ( ) require ( + github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect github.com/bytedance/sonic v1.11.6 // indirect github.com/bytedance/sonic/loader v0.1.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect @@ -36,6 +38,7 @@ require ( github.com/goccy/go-json v0.10.2 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/gorilla/websocket v1.4.2 // indirect + github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect github.com/json-iterator/go v1.1.12 // indirect @@ -45,6 +48,7 @@ require ( github.com/mattn/go-isatty v0.0.20 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/oapi-codegen/runtime v1.0.0 // indirect github.com/pelletier/go-toml/v2 v2.2.2 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/redis/go-redis/v9 v9.5.3 // indirect @@ -66,5 +70,6 @@ require ( gorm.io/driver/mysql v1.5.2 // indirect ) -//replace github.com/eolinker/ap-account => ../../eolinker/ap-account -//replace github.com/eolinker/go-common => ../../eolinker/go-common +replace github.com/eolinker/ap-account => ../../eolinker/ap-account + +replace github.com/eolinker/go-common => ../../eolinker/go-common diff --git a/go.sum b/go.sum index 813d7dc4..1b2fb173 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,9 @@ +github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk= +github.com/apapsch/go-jsonmerge/v2 v2.0.0 h1:axGnT1gRIfimI7gJifB699GoE/oq+F2MU7Dml6nw9rQ= +github.com/apapsch/go-jsonmerge/v2 v2.0.0/go.mod h1:lvDnEdqiQrp0O42VQGgmlKpxL1AP2+08jFMw88y4klk= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= +github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w= github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= @@ -23,12 +27,8 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/r github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= -github.com/eolinker/ap-account v1.0.9 h1:tW345b1wsn0V8pfMMlZOMfpbjxQhMWBVfgsClFTJGLk= -github.com/eolinker/ap-account v1.0.9/go.mod h1:5lsZwkQfnHO5YJ3Cu6X1PZwZ0gbmJBUcix0hxG8aEsY= github.com/eolinker/eosc v0.17.3 h1:sr2yT+v/AsqEdciRaaZZj0zL9pTufR5RvDW6+65hraQ= github.com/eolinker/eosc v0.17.3/go.mod h1:xgq816hpanlMXFtZw7Ztdctb1eEk9UPHchY4NfFO6Cw= -github.com/eolinker/go-common v1.0.4 h1:F0akjnzJfIFOVmK30fD0SsCLU7DAKPXuY21MeyMmQ7w= -github.com/eolinker/go-common v1.0.4/go.mod h1:Kb/jENMN1mApnodvRgV4YwO9FJby1Jkt2EUjrBjvSX4= github.com/gabriel-vasile/mimetype v1.4.4 h1:QjV6pZ7/XZ7ryI2KuyeEDE8wnh7fHP9YnQy+R0LnH8I= github.com/gabriel-vasile/mimetype v1.4.4/go.mod h1:JwLei5XPtWdGiMFB5Pjle1oEeoSeEuJfJE+TtfvdB/s= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= @@ -58,12 +58,17 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/influxdata/influxdb-client-go/v2 v2.14.0 h1:AjbBfJuq+QoaXNcrova8smSjwJdUHnwvfjMF71M1iI4= +github.com/influxdata/influxdb-client-go/v2 v2.14.0/go.mod h1:Ahpm3QXKMJslpXl3IftVLVezreAUtBOTZssDrjZEFHI= +github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU= +github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/juju/gnuflag v0.0.0-20171113085948-2ce1bb71843d/go.mod h1:2PavIy+JPciBPrBUjwbNvtwB6RQlve+hkpll6QSNmOE= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM= github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= @@ -82,6 +87,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/oapi-codegen/runtime v1.0.0 h1:P4rqFX5fMFWqRzY9M/3YF9+aPSPPB06IzP2P7oOxrWo= +github.com/oapi-codegen/runtime v1.0.0/go.mod h1:LmCUMQuPB4M/nLXilQXhHw+BLZdDb18B34OO356yJ/A= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -92,6 +99,7 @@ github.com/redis/go-redis/v9 v9.5.3 h1:fOAp1/uJG+ZtcITgZOfYFmTKPE7n4Vclj1wZFgRci github.com/redis/go-redis/v9 v9.5.3/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= diff --git a/module/dynamic-module/iml.go b/module/dynamic-module/iml.go index 35eb60cf..d2a668b9 100644 --- a/module/dynamic-module/iml.go +++ b/module/dynamic-module/iml.go @@ -6,23 +6,23 @@ import ( "fmt" "strings" "time" - + "github.com/eolinker/eosc/log" - + "github.com/APIParkLab/APIPark/gateway" - + "github.com/eolinker/go-common/store" - + "github.com/eolinker/ap-account/service/user" - + "github.com/APIParkLab/APIPark/service/cluster" - + "github.com/eolinker/go-common/utils" - + "github.com/google/uuid" - + "github.com/APIParkLab/APIPark/module/dynamic-module/driver" - + dynamic_module_dto "github.com/APIParkLab/APIPark/module/dynamic-module/dto" dynamic_module "github.com/APIParkLab/APIPark/service/dynamic-module" ) @@ -50,7 +50,7 @@ func (i *imlDynamicModule) Online(ctx context.Context, module string, id string) //if len(clusterInput.Clusters) == 0 { // return fmt.Errorf("上线分区失败,分区为空") //} - + id = strings.ToLower(fmt.Sprintf("%s_%s", id, module)) info, err := i.dynamicModuleService.Get(ctx, id) if err != nil { @@ -60,10 +60,10 @@ func (i *imlDynamicModule) Online(ctx context.Context, module string, id string) if err != nil || len(clusters) == 0 { return fmt.Errorf("上线失败,集群不存在") } - + return i.transaction.Transaction(ctx, func(ctx context.Context) error { for _, c := range clusters { - + // 插入发布历史 err = i.dynamicModulePublishService.Create(ctx, &dynamic_module.CreateDynamicModulePublish{ ID: uuid.New().String(), @@ -95,9 +95,9 @@ func (i *imlDynamicModule) Online(ctx context.Context, module string, id string) if err != nil { return err } - + } - + return nil }) } @@ -127,7 +127,7 @@ func (i *imlDynamicModule) Offline(ctx context.Context, module string, id string if err != nil { return err } - + } return nil }) @@ -185,7 +185,7 @@ func (i *imlDynamicModule) Offline(ctx context.Context, module string, id string func (i *imlDynamicModule) dynamicClient(ctx context.Context, clusterId string, resource string, h func(gateway.IDynamicClient) error) error { client, err := i.clusterService.GatewayClient(ctx, clusterId) - + if err != nil { return err } @@ -213,7 +213,7 @@ func (i *imlDynamicModule) dynamicClient(ctx context.Context, clusterId string, // if err != nil { // return nil, err // } -// partitionIds := utils.SliceToSlice(partitions, func(s *partition.Partition) string { +// partitionIds := utils.SliceToSlice(partitions, func(s *partition.Cluster) string { // return s.UUID // }) // suffix := fmt.Sprintf("_%s", module) @@ -278,7 +278,7 @@ func (i *imlDynamicModule) ModuleDrivers(ctx context.Context, group string) ([]* Title: s.Title(), Path: s.Front(), } - + }), nil } @@ -295,9 +295,9 @@ func (i *imlDynamicModule) PluginInfo(ctx context.Context, module string, cluste if !has { return nil, fmt.Errorf("module %s not found", module) } - + fields := make([]*driver.Field, 0, 1) - + fields = append(fields, &driver.Field{ Name: "status", Title: fmt.Sprintf("状态"), @@ -336,7 +336,7 @@ func (i *imlDynamicModule) Create(ctx context.Context, module string, input *dyn if !has { return nil, fmt.Errorf("module %s not found", module) } - + id := strings.ToLower(fmt.Sprintf("%s_%s", input.Id, module)) err := i.transaction.Transaction(ctx, func(ctx context.Context) error { cfg, err := json.Marshal(input.Config) @@ -358,7 +358,7 @@ func (i *imlDynamicModule) Create(ctx context.Context, module string, input *dyn if err != nil { return nil, err } - + return i.Get(ctx, module, input.Id) } @@ -402,10 +402,10 @@ func (i *imlDynamicModule) Delete(ctx context.Context, module string, ids []stri return err } } - + return nil }) - + } func (i *imlDynamicModule) Get(ctx context.Context, module string, id string) (*dynamic_module_dto.DynamicModule, error) { @@ -413,7 +413,7 @@ func (i *imlDynamicModule) Get(ctx context.Context, module string, id string) (* if !strings.HasSuffix(id, suffix) { id = strings.ToLower(fmt.Sprintf("%s_%s", id, module)) } - + info, err := i.get(ctx, module, id) if err != nil { return nil, err @@ -437,7 +437,7 @@ func (i *imlDynamicModule) get(ctx context.Context, module string, id string) (* if err != nil { return nil, err } - + if info.Module != module { return nil, fmt.Errorf("module not match") } @@ -455,7 +455,7 @@ func (i *imlDynamicModule) List(ctx context.Context, module string, keyword stri if err != nil { return nil, 0, err } - + userIDs := utils.SliceToSlice(list, func(s *dynamic_module.DynamicModule) string { return s.Updater }) @@ -463,7 +463,7 @@ func (i *imlDynamicModule) List(ctx context.Context, module string, keyword stri if err != nil { return nil, 0, err } - + userMap := i.userService.GetLabels(ctx, userIDs...) items := make([]map[string]interface{}, 0, len(list)) suffix := fmt.Sprintf("_%s", module) @@ -478,7 +478,7 @@ func (i *imlDynamicModule) List(ctx context.Context, module string, keyword stri for _, l := range list { status := "未发布" id := strings.TrimSuffix(l.ID, suffix) - + item := map[string]interface{}{ "id": id, "title": l.Name, @@ -487,7 +487,7 @@ func (i *imlDynamicModule) List(ctx context.Context, module string, keyword stri "updater": userMap[l.Updater], "update_time": l.UpdateAt.Format("2006-01-02 15:04:05"), } - + tmp := make(map[string]interface{}) err = json.Unmarshal([]byte(l.Config), &tmp) if err == nil { @@ -496,7 +496,7 @@ func (i *imlDynamicModule) List(ctx context.Context, module string, keyword stri continue } item[column] = tmp[column] - + } } if versions != nil { @@ -516,8 +516,8 @@ func (i *imlDynamicModule) List(ctx context.Context, module string, keyword stri if err != nil { return nil, 0, err } - + } - + return items, total, nil } diff --git a/module/monitor/driver/driver.go b/module/monitor/driver/driver.go new file mode 100644 index 00000000..a4b8d58e --- /dev/null +++ b/module/monitor/driver/driver.go @@ -0,0 +1,24 @@ +package driver + +import ( + "context" + "time" + + "github.com/APIParkLab/APIPark/service/monitor" +) + +type IDriver interface { + Name() string + Check(cfg string) error + Create(cfg string) (IExecutor, error) +} + +type IExecutor interface { + Init(ctx context.Context) error + CommonStatistics(ctx context.Context, start, end time.Time, groupBy string, limit int, wheres []monitor.MonWhereItem) (map[string]monitor.MonCommonData, error) + RequestSummary(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) (*monitor.Summary, error) + ProxySummary(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) (*monitor.Summary, error) + InvokeTrend(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) (*monitor.MonInvokeCountTrend, string, error) + ProxyTrend(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) (*monitor.MonInvokeCountTrend, string, error) + MessageTrend(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) (*monitor.MonMessageTrend, string, error) +} diff --git a/module/monitor/driver/influxdb-v2/executor.go b/module/monitor/driver/influxdb-v2/executor.go new file mode 100644 index 00000000..21077e67 --- /dev/null +++ b/module/monitor/driver/influxdb-v2/executor.go @@ -0,0 +1,363 @@ +package influxdb_v2 + +import ( + "context" + "encoding/json" + "strings" + "time" + + "github.com/eolinker/eosc/log" + + "github.com/eolinker/go-common/utils" + "github.com/influxdata/influxdb-client-go/v2/domain" + + "github.com/APIParkLab/APIPark/common" + + influxdb2 "github.com/influxdata/influxdb-client-go/v2" + + "github.com/APIParkLab/APIPark/module/monitor/driver" + "github.com/APIParkLab/APIPark/module/monitor/driver/influxdb-v2/flux" + + "github.com/influxdata/influxdb-client-go/v2/api" + + "github.com/APIParkLab/APIPark/service/monitor" +) + +func newExecutor(cfg string, fluxQuery flux.IFluxQuery) (driver.IExecutor, error) { + var data InfluxdbV2Config + err := json.Unmarshal([]byte(cfg), &data) + if err != nil { + return nil, err + } + client := influxdb2.NewClient(data.Addr, data.Token) + + return &executor{cfg: data, openApi: client.QueryAPI(data.Org), client: client, fluxQuery: fluxQuery}, nil +} + +type executor struct { + fluxQuery flux.IFluxQuery + cfg InfluxdbV2Config + openApi api.QueryAPI + client influxdb2.Client +} + +func (e *executor) Init(ctx context.Context) error { + orgInfo, err := e.client.OrganizationsAPI().FindOrganizationByName(ctx, e.cfg.Org) + if err != nil { + return err + } + orgID := *orgInfo.Id + //初始化bucket + bucketsAPI := e.client.BucketsAPI() + buckets, err := bucketsAPI.FindBucketsByOrgName(ctx, e.cfg.Org) + if err != nil { + return err + } + + bucketsConf := flux.GetBucketConfigList() + //要创建的bucket + toCreateBuckets := utils.SliceToMap(bucketsConf, func(t *flux.BucketConf) string { + return t.BucketName + }) + if buckets != nil { + for _, bucket := range *buckets { + if _, has := toCreateBuckets[bucket.Name]; has { + delete(toCreateBuckets, bucket.Name) + } + } + } + expire := domain.RetentionRuleTypeExpire + rule := domain.RetentionRule{ + ShardGroupDurationSeconds: nil, + Type: &expire, + } + //创建bucket + for _, bucketConf := range toCreateBuckets { + rule.EverySeconds = bucketConf.Retention + _, err := e.client.BucketsAPI().CreateBucketWithNameWithID(ctx, orgID, bucketConf.BucketName, rule) + if err != nil { + return err + } + log.Infof("Save bucket %s success. organization: %s", bucketConf.BucketName, e.cfg.Org) + } + + //创建定时脚本 + tasksApi := e.client.TasksAPI() + taskFilter := &api.TaskFilter{ + OrgID: orgID, + } + existedTasks, err := tasksApi.FindTasks(ctx, taskFilter) + if err != nil { + return err + } + tasksConf := flux.GetTaskConfigList() + //要创建的bucket + toCreateTasks := utils.SliceToMap(tasksConf, func(t *flux.TaskConf) string { + return t.TaskName + }) + toDeleteTaskIDs := make([]string, 0, len(toCreateTasks)) + + /* + 将influxDB已存在的定时脚本 与 定时脚本配置的进行对比 + 1. 配置和influxDB均有则不创建 + 2. 配置有,influxDB没有,则创建 + 3. 配置没有,influxDB有,且是apinto开头, 则删除 + */ + for _, task := range existedTasks { + if _, has := toCreateTasks[task.Name]; has { + delete(toCreateTasks, task.Name) + } else { + if strings.HasPrefix(task.Name, "apinto") { + toDeleteTaskIDs = append(toDeleteTaskIDs, task.Id) + } + } + } + //删除旧的apinto定时脚本 + for _, delId := range toDeleteTaskIDs { + err = tasksApi.DeleteTaskWithID(ctx, delId) + if err != nil { + return err + } + } + + //创建influxDB中没有的定时脚本 + for _, taskConf := range toCreateTasks { + newTask := &domain.Task{ + Cron: &taskConf.Cron, + Flux: taskConf.Flux, + Name: taskConf.TaskName, + Offset: &taskConf.Offset, + OrgID: orgID, + //Status: nil, + } + _, err := tasksApi.CreateTask(ctx, newTask) + if err != nil { + return err + } + log.Infof("Save task %s success. organization: %s", taskConf.TaskName, e.cfg.Org) + } + + return nil +} + +func (e *executor) MessageTrend(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) (*monitor.MonMessageTrend, string, error) { + newStartTime, every, windowOffset, bucket := getTimeIntervalAndBucket(start, end) + + filters := formatFilter(wheres) + + fieldsConditions := []string{"request", "response"} + + dates, groupValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "request", filters, fieldsConditions, every, windowOffset) + if err != nil { + return nil, "", err + } + + resultVal := &monitor.MonMessageTrend{ + Dates: dates, + ReqMessage: formatMessageTrendData(groupValues["request"]), + RespMessage: formatMessageTrendData(groupValues["response"]), + } + + return resultVal, every, nil +} + +func (e *executor) ProxyTrend(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) (*monitor.MonInvokeCountTrend, string, error) { + newStartTime, every, windowOffset, bucket := getTimeIntervalAndBucket(start, end) + + filters := formatFilter(wheres) + + proxyConditions := []string{"p_total", "p_success", "p_s4xx", "p_s5xx"} + + dates, proxyValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "proxy", filters, proxyConditions, every, windowOffset) + if err != nil { + return nil, "", err + } + proxyRate := make([]float64, 0, len(dates)) + //计算请求成功率 + proxyTotal := proxyValues["p_total"] + proxySuccess := proxyValues["p_success"] + for i, total := range proxyTotal { + if total == 0 { + proxyRate = append(proxyRate, 0) + continue + } + rate := FormatFloat64(float64(proxySuccess[i])/float64(total), 4) + proxyRate = append(proxyRate, rate) + } + + resultVal := &monitor.MonInvokeCountTrend{ + Date: dates, + Status5XX: proxyValues["p_s5xx"], + Status4XX: proxyValues["p_s4xx"], + ProxyRate: proxyRate, + ProxyTotal: proxyValues["p_total"], + } + + return resultVal, every, nil +} + +func (e *executor) InvokeTrend(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) (*monitor.MonInvokeCountTrend, string, error) { + newStartTime, every, windowOffset, bucket := getTimeIntervalAndBucket(start, end) + filters := formatFilter(wheres) + + requestConditions := []string{"total", "success", "s4xx", "s5xx"} + + dates, requestValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "request", filters, requestConditions, every, windowOffset) + if err != nil { + return nil, "", err + } + requestRate := make([]float64, 0, len(dates)) + //计算请求成功率 + requestTotal := requestValues["total"] + requestSuccess := requestValues["success"] + for i, total := range requestTotal { + if total == 0 { + requestRate = append(requestRate, 0) + continue + } + rate := FormatFloat64(float64(requestSuccess[i])/float64(total), 4) + requestRate = append(requestRate, rate) + } + + proxyConditions := []string{"p_total", "p_success"} + + _, proxyValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "proxy", filters, proxyConditions, every, windowOffset) + if err != nil { + return nil, "", err + } + //计算转发成功率 + proxyTotal := proxyValues["p_total"] + proxySuccess := proxyValues["p_success"] //proxySuccess和proxyTotal必定等长 + proxyRate := make([]float64, 0, len(proxyTotal)) + for i, total := range proxyTotal { + if total == 0 { + proxyRate = append(proxyRate, 0) + continue + } + rate := FormatFloat64(float64(proxySuccess[i])/float64(total), 4) + proxyRate = append(proxyRate, rate) + } + resultVal := &monitor.MonInvokeCountTrend{ + Date: dates, + Status5XX: requestValues["s5xx"], + Status4XX: requestValues["s4xx"], + ProxyRate: proxyRate, + ProxyTotal: proxyValues["p_total"], + RequestRate: requestRate, + RequestTotal: requestValues["total"], + } + + return resultVal, every, nil +} + +func (e *executor) summary(ctx context.Context, start, end time.Time, bucket string, filters string, filterCfg *flux.StatisticsFilterConf, prefix string) (*monitor.Summary, error) { + //获取请求表的饼状图 + queryOnce, err := e.fluxQuery.CommonQueryOnce(ctx, e.openApi, start, end, bucket, filters, filterCfg) + if err != nil { + return nil, err + } + summary := new(monitor.Summary) + if v, ok := queryOnce[prefix+"s4xx"]; ok { + summary.Status4Xx = common.FmtIntFromInterface(v) + } + if v, ok := queryOnce[prefix+"s5xx"]; ok { + summary.Status5Xx = common.FmtIntFromInterface(v) + } + if v, ok := queryOnce[prefix+"success"]; ok { + summary.Success = common.FmtIntFromInterface(v) + } + if v, ok := queryOnce[prefix+"total"]; ok { + summary.Total = common.FmtIntFromInterface(v) + } + summary.Fail = summary.Total - summary.Success + return summary, nil +} + +func (e *executor) RequestSummary(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) (*monitor.Summary, error) { + newStartTime, _, _, bucket := getTimeIntervalAndBucket(start, end) + return e.summary(ctx, newStartTime, end, bucket, formatFilter(wheres), &flux.StatisticsFilterConf{ + Measurement: "request", + AggregateFn: "sum()", + Fields: []string{"total", "success", "s4xx", "s5xx"}, + }, "") +} + +func (e *executor) ProxySummary(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) (*monitor.Summary, error) { + newStartTime, _, _, bucket := getTimeIntervalAndBucket(start, end) + return e.summary(ctx, newStartTime, end, bucket, formatFilter(wheres), &flux.StatisticsFilterConf{ + Measurement: "proxy", + AggregateFn: "sum()", + Fields: []string{"p_total", "p_success", "p_s4xx", "p_s5xx"}, + }, "p_") +} + +func (e *executor) CommonStatistics(ctx context.Context, start, end time.Time, groupBy string, limit int, wheres []monitor.MonWhereItem) (map[string]monitor.MonCommonData, error) { + filters := formatFilter(wheres) + newStartTime, _, _, bucket := getTimeIntervalAndBucket(start, end) + + statisticsConf := []*flux.StatisticsFilterConf{ + { + Measurement: "request", + AggregateFn: "sum()", + Fields: []string{"total", "success", "timing", "request"}, + }, + { + Measurement: "proxy", + AggregateFn: "sum()", + Fields: []string{"p_total", "p_success"}, + }, + { + Measurement: "request", + AggregateFn: "max()", + Fields: []string{"timing_max", "request_max"}, + }, { + Measurement: "request", + AggregateFn: "min()", + Fields: []string{"timing_min", "request_min"}, + }, + } + + results, err := e.fluxQuery.CommonStatistics(ctx, e.openApi, newStartTime, end, bucket, groupBy, filters, statisticsConf, limit) + if err != nil { + return nil, err + } + resultMap := make(map[string]monitor.MonCommonData) + for key, result := range results { + + requestRate := 0.0 + if result.Total == 0 { + requestRate = 0.0 + } else { + requestRate = FormatFloat64(float64(result.Success)/float64(result.Total), 4) + } + + proxyRate := 0.0 + if result.ProxyTotal == 0 { + proxyRate = 0.0 + } else { + proxyRate = FormatFloat64(float64(result.ProxySuccess)/float64(result.ProxyTotal), 4) + } + + monCommonData := monitor.MonCommonData{ + ID: key, + RequestTotal: result.Total, + RequestSuccess: result.Success, + RequestRate: requestRate, + ProxyTotal: result.ProxyTotal, + ProxySuccess: result.ProxySuccess, + ProxyRate: proxyRate, + StatusFail: result.Total - result.Success, + AvgResp: float64(result.TotalTiming) / float64(result.Total), + MaxResp: result.MaxTiming, + MinResp: result.MinTiming, + AvgTraffic: float64(result.TotalRequest) / float64(result.Total), + MaxTraffic: result.RequestMax, + MinTraffic: result.RequestMin, + } + + resultMap[key] = monCommonData + } + + return resultMap, nil + +} diff --git a/module/monitor/driver/influxdb-v2/flux/buckets_config.go b/module/monitor/driver/influxdb-v2/flux/buckets_config.go new file mode 100644 index 00000000..99106867 --- /dev/null +++ b/module/monitor/driver/influxdb-v2/flux/buckets_config.go @@ -0,0 +1,32 @@ +package flux + +import ( + _ "embed" + + "gopkg.in/yaml.v3" +) + +//go:embed influxdb_config/buckets.yaml +var bucketsData []byte + +var ( + bucketList []*BucketConf +) + +type BucketConf struct { + BucketName string `yaml:"bucket_name"` + Retention int64 `yaml:"retention"` +} + +func initBucketsConfig() { + conf := make([]*BucketConf, 0, 5) + err := yaml.Unmarshal(bucketsData, &conf) + if err != nil { + panic(err) + } + bucketList = conf +} + +func GetBucketConfigList() []*BucketConf { + return bucketList +} diff --git a/module/monitor/driver/influxdb-v2/flux/flux.go b/module/monitor/driver/influxdb-v2/flux/flux.go new file mode 100644 index 00000000..291cc775 --- /dev/null +++ b/module/monitor/driver/influxdb-v2/flux/flux.go @@ -0,0 +1,349 @@ +package flux + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/APIParkLab/APIPark/common" + "github.com/eolinker/eosc/log" + "github.com/influxdata/influxdb-client-go/v2/api" +) + +type IFluxQuery interface { + CommonStatistics(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, groupBy, filters string, statisticsConf []*StatisticsFilterConf, limit int) (map[string]*FluxStatistics, error) + CommonProxyStatistics(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, groupBy, filters string, statisticsConf []*StatisticsFilterConf, limit int) (map[string]*FluxStatistics, error) + CommonTendency(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, table, filters string, dataFields []string, every, windowOffset string) ([]time.Time, map[string][]int64, error) + // CommonQueryOnce 查询只返回一条结果 + CommonQueryOnce(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, filters string, fieldsConf *StatisticsFilterConf) (map[string]interface{}, error) + CommonWarnStatistics(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, groupBy, filters string, statisticsConf *StatisticsFilterConf) (map[string]*FluxWarnStatistics, error) +} + +type fluxQuery struct { +} + +// CommonStatistics flux查询统计 +func (f *fluxQuery) CommonStatistics(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, groupBy, filters string, statisticsConf []*StatisticsFilterConf, limit int) (map[string]*FluxStatistics, error) { + //拼装请求 + query := f.assembleStatisticsFlux(start, end, bucket, groupBy, filters, statisticsConf, "total", limit) + + log.Info("flux sql=", query) + result, err := queryApi.Query(ctx, query) + if err != nil { + log.Error("flux err=", err) + return nil, err + } + + tempMap := make(map[string]map[string]interface{}) + for result.Next() { + key := "" + if v, ok := result.Record().Values()[groupBy]; ok { + if v == nil { + continue + } + key = v.(string) + } + tempMap[key] = result.Record().Values() + } + result.Close() + + resultMap := make(map[string]*FluxStatistics) + //拼装返回参数 + for key, maps := range tempMap { + total := common.FmtIntFromInterface(maps["total"]) + success := common.FmtIntFromInterface(maps["success"]) + pTotal := common.FmtIntFromInterface(maps["p_total"]) + pSuccess := common.FmtIntFromInterface(maps["p_success"]) + totalTiming := common.FmtIntFromInterface(maps["timing"]) + maxMinTiming := common.FmtIntFromInterface(maps["timing_max"]) + minTiming := common.FmtIntFromInterface(maps["timing_min"]) + totalRequest := common.FmtIntFromInterface(maps["request"]) + maxRequest := common.FmtIntFromInterface(maps["request_max"]) + minRequest := common.FmtIntFromInterface(maps["request_min"]) + + resultMap[key] = &FluxStatistics{ + Total: total, + Success: success, + ProxyTotal: pTotal, + ProxySuccess: pSuccess, + TotalTiming: totalTiming, + MaxTiming: maxMinTiming, + MinTiming: minTiming, + TotalRequest: totalRequest, + RequestMax: maxRequest, + RequestMin: minRequest, + } + } + + return resultMap, nil +} + +// CommonProxyStatistics flux查询统计(只查转发表) +func (f *fluxQuery) CommonProxyStatistics(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, groupBy, filters string, statisticsConf []*StatisticsFilterConf, limit int) (map[string]*FluxStatistics, error) { + //拼装请求 + query := f.assembleStatisticsFlux(start, end, bucket, groupBy, filters, statisticsConf, "p_total", limit) + + log.Info("flux sql=", query) + result, err := queryApi.Query(ctx, query) + if err != nil { + log.Error("flux err=", err) + return nil, err + } + + tempMap := make(map[string]map[string]interface{}) + for result.Next() { + key := "" + if v, ok := result.Record().Values()[groupBy]; ok { + key = v.(string) + } + tempMap[key] = result.Record().Values() + } + result.Close() + + resultMap := make(map[string]*FluxStatistics) + //拼装返回参数 + for key, maps := range tempMap { + pTotal := common.FmtIntFromInterface(maps["p_total"]) + pSuccess := common.FmtIntFromInterface(maps["p_success"]) + totalTiming := common.FmtIntFromInterface(maps["p_timing"]) + maxMinTiming := common.FmtIntFromInterface(maps["p_timing_max"]) + minTiming := common.FmtIntFromInterface(maps["p_timing_min"]) + totalRequest := common.FmtIntFromInterface(maps["p_request"]) + maxRequest := common.FmtIntFromInterface(maps["p_request_max"]) + minRequest := common.FmtIntFromInterface(maps["p_request_min"]) + + resultMap[key] = &FluxStatistics{ + ProxyTotal: pTotal, + ProxySuccess: pSuccess, + TotalTiming: totalTiming, + MaxTiming: maxMinTiming, + MinTiming: minTiming, + TotalRequest: totalRequest, + RequestMax: maxRequest, + RequestMin: minRequest, + } + } + + return resultMap, nil +} + +func (f *fluxQuery) CommonTendency(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, table, filters string, dataFields []string, every, windowOffset string) ([]time.Time, map[string][]int64, error) { + fieldConditions := f.assembleTendencyFieldCondition(dataFields) + //拼装请求 + query := f.assembleTendencyFlux(start, end, bucket, table, filters, fieldConditions, every, windowOffset) + + log.Info("flux sql=", query) + result, err := queryApi.Query(ctx, query) + if err != nil { + log.Error("flux err=", err) + return nil, nil, err + } + defer result.Close() + + resultList := make([]map[string]interface{}, 0, 10) + for result.Next() { + resultList = append(resultList, result.Record().Values()) + } + //初始返回内容 + dates := make([]time.Time, 0, len(resultList)) + resultMap := make(map[string][]int64, len(dataFields)) + for _, field := range dataFields { + resultMap[field] = make([]int64, 0, len(resultList)) + } + + for _, res := range resultList { + for _, field := range dataFields { + resultMap[field] = append(resultMap[field], common.FmtIntFromInterface(res[field])) + } + t, _ := res["_time"].(time.Time) + dates = append(dates, t) + } + + return dates, resultMap, nil +} + +func (f *fluxQuery) CommonQueryOnce(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, filters string, fieldsConf *StatisticsFilterConf) (map[string]interface{}, error) { + query := f.getCircularMapFlux(start, end, bucket, filters, fieldsConf) + + log.Info("flux sql=", query) + result, err := queryApi.Query(ctx, query) + if err != nil { + log.Error("flux err=", err) + return nil, err + } + + for result.Next() { + return result.Record().Values(), nil + } + //当某个时间段没有记录时,会返回空 + return map[string]interface{}{}, nil +} + +// CommonWarnStatistics flux查询统计(告警数据用) +func (f *fluxQuery) CommonWarnStatistics(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, groupBy, filters string, statisticsConf *StatisticsFilterConf) (map[string]*FluxWarnStatistics, error) { + //拼装请求 + query := f.assembleWarnStatisticsFlux(start, end, bucket, groupBy, filters, statisticsConf) + + log.Info("flux sql=", query) + result, err := queryApi.Query(ctx, query) + if err != nil { + log.Error("flux err=", err) + return nil, err + } + + tempMap := make(map[string]map[string]interface{}) + for result.Next() { + key := "" + if v, ok := result.Record().Values()[groupBy]; ok { + key = v.(string) + } + tempMap[key] = result.Record().Values() + } + result.Close() + + resultMap := make(map[string]*FluxWarnStatistics) + + //拼装返回参数 + for key, maps := range tempMap { + resultMap[key] = f.warnFormatFluxResults(maps, statisticsConf.Fields) + } + + return resultMap, nil +} + +// warnFormatFluxResults 格式化告警查询统计的返回数据 +func (f *fluxQuery) warnFormatFluxResults(results map[string]interface{}, fields []string) *FluxWarnStatistics { + result := &FluxWarnStatistics{} + for _, field := range fields { + switch field { + case "total": + result.Total = common.FmtIntFromInterface(results[field]) + case "success": + result.Success = common.FmtIntFromInterface(results[field]) + case "s4xx": + result.S4xx = common.FmtIntFromInterface(results[field]) + case "s5xx": + result.S5xx = common.FmtIntFromInterface(results[field]) + case "p_total": + result.ProxyTotal = common.FmtIntFromInterface(results[field]) + case "p_success": + result.ProxySuccess = common.FmtIntFromInterface(results[field]) + case "p_s4xx": + result.ProxyS4xx = common.FmtIntFromInterface(results[field]) + case "p_s5xx": + result.ProxyS5xx = common.FmtIntFromInterface(results[field]) + case "request": + result.TotalRequest = common.FmtIntFromInterface(results[field]) + case "response": + result.TotalResponse = common.FmtIntFromInterface(results[field]) + case "timing": + result.TotalTiming = common.FmtIntFromInterface(results[field]) + } + } + return result +} + +func (f *fluxQuery) assembleStatisticsFlux(start, end time.Time, bucket, groupBy, filters string, statisticsConf []*StatisticsFilterConf, sortBy string, limit int) string { + limitStr := "" + if limit > 0 { + //按请求量降序 + limitStr = fmt.Sprintf(`|> group() |> sort(columns: ["%s"], desc: true) |> limit(n: %d) `, sortBy, limit) + } + + streams := make([]string, 0, len(statisticsConf)) + for _, conf := range statisticsConf { + //拼装过滤的_field + fields := make([]string, 0, len(conf.Fields)) + for _, field := range conf.Fields { + fields = append(fields, fmt.Sprintf(` r["_field"] == "%s" `, field)) + } + //拼装union所需的数据流 + streams = append(streams, fmt.Sprintf(` +from(bucket: "%s") + |> range(start: %d, stop: %d) + |> filter(fn: (r) => r["_measurement"] == "%s") + %s + |> filter(fn: (r) =>%s) + |> group(columns:["%s","_field"])|> %s +`, bucket, start.Unix(), end.Unix(), conf.Measurement, filters, strings.Join(fields, "or"), groupBy, conf.AggregateFn)) + } + + return fmt.Sprintf(` +union(tables: [ +%s +]) +|> pivot(rowKey: ["%s"], columnKey: ["_field"], valueColumn: "_value") +%s +`, strings.Join(streams, ",\n"), groupBy, limitStr) +} + +func (f *fluxQuery) assembleTendencyFlux(start, end time.Time, bucket, table, filters, fieldConditions, every string, windowOffset string) string { + windowOffsetFlux := "" + if windowOffset != "" { + windowOffsetFlux = fmt.Sprintf(", offset: %s", windowOffset) + } + return fmt.Sprintf(`from(bucket: "%s") + |> range(start: %d, stop: %d) + |> filter(fn: (r) => r["_measurement"] == "%s") + %s + %s + |> group(columns: ["_field"]) + |> aggregateWindow(every: %s, fn: sum, location: {offset: 0ns, zone: "Asia/Shanghai"}, timeSrc: "_start"%s) + |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")`, bucket, start.Unix(), end.Unix(), table, + filters, fieldConditions, every, windowOffsetFlux) + +} + +// assembleTendencyFieldCondition 封装趋势图需要的Field数据 +func (f *fluxQuery) assembleTendencyFieldCondition(fieldConditions []string) string { + /* + 比如输入 {"total","success","s4xx","s5xx"} + 返回 |> filter(fn: (r) => r["_field"] == "total" or r["_field"] == "success" or r["_field"] == "s4xx" or r["_field"] == "s5xx") + */ + fields := make([]string, 0, len(fieldConditions)) + for _, field := range fieldConditions { + fields = append(fields, fmt.Sprintf(` r["_field"] == "%s" `, field)) + } + return fmt.Sprintf(`|> filter(fn: (r) => %s )`, strings.Join(fields, "or")) +} + +// 饼状图flux +func (f *fluxQuery) getCircularMapFlux(start, end time.Time, bucket, filters string, fieldsConf *StatisticsFilterConf) string { + fields := make([]string, 0, len(fieldsConf.Fields)) + for _, field := range fieldsConf.Fields { + fields = append(fields, fmt.Sprintf(` r["_field"] == "%s" `, field)) + } + + return fmt.Sprintf(` +from(bucket: "%s") + |> range(start: %d, stop: %d) + |> filter(fn: (r) => r["_measurement"] == "%s") + %s + |> filter(fn: (r) =>%s) + |> group(columns:["_field"]) + |> %s + |> pivot(rowKey: ["_start"], columnKey: ["_field"], valueColumn: "_value")`, bucket, start.Unix(), end.Unix(), fieldsConf.Measurement, filters, strings.Join(fields, "or"), fieldsConf.AggregateFn) +} + +// assembleWarnStatisticsFlux 组装告警用的统计flux +func (f *fluxQuery) assembleWarnStatisticsFlux(start, end time.Time, bucket, groupBy, filters string, statisticsConf *StatisticsFilterConf) string { + + //拼装过滤的_field + fields := make([]string, 0, len(statisticsConf.Fields)) + for _, field := range statisticsConf.Fields { + fields = append(fields, fmt.Sprintf(` r["_field"] == "%s" `, field)) + } + //拼装union所需的数据流 + return fmt.Sprintf(` +from(bucket: "%s") + |> range(start: %d, stop: %d) + |> filter(fn: (r) => r["_measurement"] == "%s") + %s + |> filter(fn: (r) =>%s) + |> group(columns:["%s","_field"]) + |> %s + |> pivot(rowKey: ["%s"], columnKey: ["_field"], valueColumn: "_value") +`, bucket, start.Unix(), end.Unix(), statisticsConf.Measurement, filters, strings.Join(fields, "or"), groupBy, statisticsConf.AggregateFn, groupBy) + +} diff --git a/module/monitor/driver/influxdb-v2/flux/influxdb_config/buckets.yaml b/module/monitor/driver/influxdb-v2/flux/influxdb_config/buckets.yaml new file mode 100644 index 00000000..cb636c12 --- /dev/null +++ b/module/monitor/driver/influxdb-v2/flux/influxdb_config/buckets.yaml @@ -0,0 +1,15 @@ +- + bucket_name: apinto + retention: 3600 #1个小时 +- + bucket_name: apinto/minute + retention: 604800 #7天 +- + bucket_name: apinto/hour + retention: 7776000 #90天 +- + bucket_name: apinto/day + retention: 157680000 #5年 +- + bucket_name: apinto/week + retention: 0 #永久 \ No newline at end of file diff --git a/module/monitor/driver/influxdb-v2/flux/influxdb_config/tasks.yaml b/module/monitor/driver/influxdb-v2/flux/influxdb_config/tasks.yaml new file mode 100644 index 00000000..f5f09c49 --- /dev/null +++ b/module/monitor/driver/influxdb-v2/flux/influxdb_config/tasks.yaml @@ -0,0 +1,1418 @@ +- + task_name: "apinto_minute_request_request_v1" + cron: "* * * * *" + offset: "10s" + flux: | + + request_request = + from(bucket: "apinto") + |> range(start: -1m) + |> filter(fn: (r) => r._measurement == "request") + |> filter(fn: (r) => r._field == "request") + |> group( + columns: [ + "api", + "app", + "upstream", + "method", + "node", + "cluster", + "provider", + "_measurement", + ], + ) + request_request + |> sum() + |> set(key: "_field", value: "request") + |> to( + bucket: "apinto/minute", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "node", + "cluster", + "provider", + ], + ) + request_request + |> max() + |> set(key: "_field", value: "request_max") + |> to( + bucket: "apinto/minute", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "node", + "cluster", + "provider", + ], + timeColumn: "_start", + ) + request_request + |> min() + |> set(key: "_field", value: "request_min") + |> to( + bucket: "apinto/minute", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "node", + "cluster", + "provider", + ], + ) +- + task_name: "apinto_minute_request_response_v1" + cron: "* * * * *" + offset: "12s" + flux: | + + request_response = + from(bucket: "apinto") + |> range(start: -1m) + |> filter(fn: (r) => r._measurement == "request") + |> filter(fn: (r) => r._field == "response") + |> group( + columns: [ + "api", + "app", + "upstream", + "method", + "node", + "cluster", + "provider", + "_measurement", + ], + ) + + request_response + |> sum() + |> set(key: "_field", value: "response") + |> to( + bucket: "apinto/minute", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "node", + "cluster", + "provider", + ], + ) + request_response + |> max() + |> set(key: "_field", value: "response_max") + |> to( + bucket: "apinto/minute", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "node", + "cluster", + "provider", + ], + timeColumn: "_start", + ) + request_response + |> min() + |> set(key: "_field", value: "response_min") + |> to( + bucket: "apinto/minute", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "node", + "cluster", + "provider", + ], + ) +- + task_name: "apinto_minute_request_retry_v1" + cron: "* * * * *" + offset: "14s" + flux: | + + request_retry = + from(bucket: "apinto") + |> range(start: -1m) + |> filter(fn: (r) => r._measurement == "request") + |> filter(fn: (r) => r._field == "retry") + |> group( + columns: [ + "api", + "app", + "upstream", + "method", + "node", + "cluster", + "provider", + "_measurement", + ], + ) + + request_retry + |> sum() + |> set(key: "_field", value: "retry") + |> to( + bucket: "apinto/minute", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "node", + "cluster", + "provider", + ], + ) + request_retry + |> max() + |> set(key: "_field", value: "retry_max") + |> to( + bucket: "apinto/minute", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "node", + "cluster", + "provider", + ], + timeColumn: "_start", + ) + request_retry + |> min() + |> set(key: "_field", value: "retry_min") + |> to( + bucket: "apinto/minute", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "node", + "cluster", + "provider", + ], + ) +- + task_name: "apinto_minute_request_status_v1" + cron: "* * * * *" + offset: "16s" + flux: | + + request_status = + from(bucket: "apinto") + |> range(start: -1m) + |> filter(fn: (r) => r._measurement == "request") + |> filter(fn: (r) => r._field == "status") + |> group( + columns: [ + "api", + "app", + "upstream", + "method", + "node", + "cluster", + "provider", + "_measurement", + ], + ) + + request_status + |> count() + |> set(key: "_field", value: "total") + |> to( + bucket: "apinto/minute", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "node", + "cluster", + "provider", + ], + ) + request_status + |> filter(fn: (r) => r._value < 400) + |> count() + |> set(key: "_field", value: "success") + |> to( + bucket: "apinto/minute", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "node", + "cluster", + "provider", + ], + timeColumn: "_start", + ) + request_status + |> filter(fn: (r) => r._value >= 400 and r._value < 500) + |> count() + |> set(key: "_field", value: "s4xx") + |> to( + bucket: "apinto/minute", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "node", + "cluster", + "provider", + ], + ) + request_status + |> filter(fn: (r) => r._value >= 500) + |> count() + |> set(key: "_field", value: "s5xx") + |> to( + bucket: "apinto/minute", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "node", + "cluster", + "provider", + ], + ) +- + task_name: "apinto_minute_request_timing_v1" + cron: "* * * * *" + offset: "18s" + flux: | + + request_timing = + from(bucket: "apinto") + |> range(start: -1m) + |> filter(fn: (r) => r._measurement == "request") + |> filter(fn: (r) => r._field == "timing") + |> group( + columns: [ + "api", + "app", + "upstream", + "method", + "node", + "cluster", + "_measurement", + ], + ) + + request_timing + |> sum() + |> set(key: "_field", value: "timing") + |> to( + bucket: "apinto/minute", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "node", + "cluster", + "provider", + ], + ) + request_timing + |> max() + |> set(key: "_field", value: "timing_max") + |> to( + bucket: "apinto/minute", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "node", + "cluster", + "provider", + ], + timeColumn: "_start", + ) + request_timing + |> min() + |> set(key: "_field", value: "timing_min") + |> to( + bucket: "apinto/minute", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "node", + "cluster", + "provider", + ], + ) +- + task_name: "apinto_minute_proxy_timing_v1" + cron: "* * * * *" + offset: "20s" + flux: | + + proxy_timing = + from(bucket: "apinto") + |> range(start: -1m) + |> filter(fn: (r) => r._measurement == "proxy") + |> filter(fn: (r) => r._field == "timing") + |> group( + columns: [ + "api", + "app", + "upstream", + "addr", + "method", + "node", + "cluster", + "provider", + "_measurement", + ], + ) + + proxy_timing + |> sum() + |> set(key: "_field", value: "p_timing") + |> to( + bucket: "apinto/minute", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "addr", + "node", + "cluster", + "provider", + ], + ) + proxy_timing + |> max() + |> set(key: "_field", value: "p_timing_max") + |> to( + bucket: "apinto/minute", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "addr", + "node", + "cluster", + "provider", + ], + ) + proxy_timing + |> min() + |> set(key: "_field", value: "p_timing_min") + |> to( + bucket: "apinto/minute", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "addr", + "node", + "cluster", + "provider", + ], + ) +- + task_name: "apinto_minute_proxy_status_v1" + cron: "* * * * *" + offset: "22s" + flux: | + + proxy_status = + from(bucket: "apinto") + |> range(start: -1m) + |> filter(fn: (r) => r._measurement == "proxy") + |> filter(fn: (r) => r._field == "status") + |> group( + columns: [ + "api", + "app", + "upstream", + "addr", + "method", + "node", + "cluster", + "provider", + "_measurement", + ], + ) + + proxy_status + |> count() + |> set(key: "_field", value: "p_total") + |> to( + bucket: "apinto/minute", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "addr", + "node", + "cluster", + "provider", + ], + ) + proxy_status + |> filter(fn: (r) => r._value < 400) + |> count() + |> set(key: "_field", value: "p_success") + |> to( + bucket: "apinto/minute", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "addr", + "node", + "cluster", + "provider", + ], + timeColumn: "_start", + ) + proxy_status + |> filter(fn: (r) => r._value >= 400 and r._value < 500) + |> count() + |> set(key: "_field", value: "p_s4xx") + |> to( + bucket: "apinto/minute", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "addr", + "node", + "cluster", + "provider", + ], + ) + proxy_status + |> filter(fn: (r) => r._value >= 500) + |> count() + |> set(key: "_field", value: "p_s5xx") + |> to( + bucket: "apinto/minute", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "addr", + "node", + "cluster", + "provider", + ], + ) +- + task_name: "apinto_minute_proxy_request_v1" + cron: "* * * * *" + offset: "24s" + flux: | + + proxy_request = + from(bucket: "apinto") + |> range(start: -1m) + |> filter(fn: (r) => r._measurement == "proxy") + |> filter(fn: (r) => r._field == "request") + |> group( + columns: [ + "api", + "app", + "upstream", + "addr", + "method", + "node", + "cluster", + "provider", + "_measurement", + ], + ) + + proxy_request + |> sum() + |> set(key: "_field", value: "p_request") + |> to( + bucket: "apinto/minute", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "addr", + "node", + "cluster", + "provider", + ], + ) + proxy_request + |> max() + |> set(key: "_field", value: "p_request_max") + |> to( + bucket: "apinto/minute", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "addr", + "node", + "cluster", + "provider", + ], + timeColumn: "_start", + ) + proxy_request + |> min() + |> set(key: "_field", value: "p_request_min") + |> to( + bucket: "apinto/minute", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "addr", + "node", + "cluster", + "provider", + ], + ) +- + task_name: "apinto_minute_proxy_response_v1" + cron: "* * * * *" + offset: "26s" + flux: | + + proxy_response = + from(bucket: "apinto") + |> range(start: -1m) + |> filter(fn: (r) => r._measurement == "proxy") + |> filter(fn: (r) => r._field == "response") + |> group( + columns: [ + "api", + "app", + "upstream", + "addr", + "method", + "node", + "cluster", + "provider", + "_measurement", + ], + ) + proxy_response + |> sum() + |> set(key: "_field", value: "p_response") + |> to( + bucket: "apinto/minute", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "addr", + "node", + "cluster", + "provider", + ], + ) + proxy_response + |> max() + |> set(key: "_field", value: "p_response_max") + |> to( + bucket: "apinto/minute", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "addr", + "node", + "cluster", + "provider", + ], + timeColumn: "_start", + ) + proxy_response + |> min() + |> set(key: "_field", value: "p_response_min") + |> to( + bucket: "apinto/minute", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "addr", + "node", + "cluster", + "provider", + ], + ) +- + task_name: "apinto_hour_request_v1" + cron: "0 * * * *" + offset: "1m30s" + flux: | + + from(bucket: "apinto/minute") + |> range(start: -1h) + |> filter(fn: (r) => r._measurement == "request") + |> filter( + fn: (r) => + r._field == "total" or r._field == "success" or r._field == "s4xx" or r._field == "s5xx" + or + r._field == "timing" or r._field == "request" or r._field == "response" or r._field + == + "retry", + ) + |> group( + columns: [ + "api", + "app", + "upstream", + "method", + "node", + "cluster", + "provider", + "_field", + "_measurement", + ], + ) + |> sum() + |> to( + bucket: "apinto/hour", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "node", + "cluster", + "provider", + ], + ) + from(bucket: "apinto/minute") + |> range(start: -1h) + |> filter(fn: (r) => r._measurement == "request") + |> filter( + fn: (r) => + r._field == "timing_max" or r._field == "request_max" or r._field == "response_max" + or + r._field == "retry_max", + ) + |> group( + columns: [ + "api", + "app", + "upstream", + "method", + "node", + "cluster", + "provider", + "_field", + "_measurement", + ], + ) + |> max() + |> to( + bucket: "apinto/hour", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "node", + "cluster", + "provider", + ], + ) + from(bucket: "apinto/minute") + |> range(start: -1h) + |> filter(fn: (r) => r._measurement == "request") + |> filter( + fn: (r) => + r._field == "timing_min" or r._field == "request_min" or r._field == "response_min" + or + r._field == "retry_min", + ) + |> group( + columns: [ + "api", + "app", + "upstream", + "method", + "node", + "cluster", + "provider", + "_field", + "_measurement", + ], + ) + |> max() + |> to( + bucket: "apinto/hour", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "node", + "cluster", + "provider", + ], + ) +- + task_name: "apinto_hour_proxy_v1" + cron: "0 * * * *" + offset: "1m45s" + flux: | + + from(bucket: "apinto/minute") + |> range(start: -1h) + |> filter(fn: (r) => r._measurement == "proxy") + |> filter( + fn: (r) => + r._field == "p_total" or r._field == "p_success" or r._field == "p_s4xx" or r._field + == + "p_s5xx" or r._field == "p_timing" or r._field == "p_request" or r._field + == + "p_response" or r._field == "p_retry", + ) + |> group( + columns: [ + "api", + "app", + "upstream", + "addr", + "method", + "node", + "cluster", + "provider", + "_field", + "_measurement", + ], + ) + |> sum() + |> to( + bucket: "apinto/hour", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "addr", + "node", + "cluster", + "provider", + ], + ) + + from(bucket: "apinto/minute") + |> range(start: -1h) + |> filter(fn: (r) => r._measurement == "proxy") + |> filter( + fn: (r) => + r._field == "p_timing_max" or r._field == "p_request_max" or r._field + == + "p_response_max" or r._field == "p_retry_max", + ) + |> group( + columns: [ + "api", + "app", + "upstream", + "addr", + "method", + "node", + "cluster", + "provider", + "_field", + "_measurement", + ], + ) + |> max() + |> to( + bucket: "apinto/hour", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "addr", + "node", + "cluster", + "provider", + ], + ) + + from(bucket: "apinto/minute") + |> range(start: -1h) + |> filter(fn: (r) => r._measurement == "proxy") + |> filter( + fn: (r) => + r._field == "p_timing_min" or r._field == "p_request_min" or r._field + == + "p_response_min" or r._field == "p_retry_min", + ) + |> group( + columns: [ + "api", + "app", + "upstream", + "addr", + "method", + "node", + "cluster", + "provider", + "_field", + "_measurement", + ], + ) + |> max() + |> to( + bucket: "apinto/hour", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "addr", + "node", + "cluster", + "provider", + ], + ) +- + task_name: "apinto_day_request_v1" + cron: "0 0 * * *" + offset: "2m30s" + flux: | + + from(bucket: "apinto/hour") + |> range(start: -1d) + |> filter(fn: (r) => r._measurement == "request") + |> filter( + fn: (r) => + r._field == "total" or r._field == "success" or r._field == "s4xx" or r._field == "s5xx" + or + r._field == "timing" or r._field == "request" or r._field == "response" or r._field + == + "retry", + ) + |> group( + columns: [ + "api", + "app", + "upstream", + "method", + "node", + "cluster", + "provider", + "_field", + ], + ) + |> sum() + |> set(key: "_measurement", value: "request") + |> to( + bucket: "apinto/day", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "node", + "cluster", + "provider", + ], + ) + from(bucket: "apinto/hour") + |> range(start: -1d) + |> filter(fn: (r) => r._measurement == "request") + |> filter( + fn: (r) => + r._field == "timing_max" or r._field == "request_max" or r._field == "response_max" + or + r._field == "retry_max", + ) + |> group( + columns: [ + "api", + "app", + "upstream", + "method", + "node", + "cluster", + "provider", + "_field", + ], + ) + |> max() + |> set(key: "_measurement", value: "request") + |> to( + bucket: "apinto/day", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "node", + "cluster", + "provider", + ], + ) + from(bucket: "apinto/hour") + |> range(start: -1d) + |> filter(fn: (r) => r._measurement == "request") + |> filter( + fn: (r) => + r._field == "timing_min" or r._field == "request_min" or r._field == "response_min" + or + r._field == "retry_min", + ) + |> group( + columns: [ + "api", + "app", + "upstream", + "method", + "node", + "cluster", + "provider", + "_field", + ], + ) + |> max() + |> set(key: "_measurement", value: "request") + |> to( + bucket: "apinto/day", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "node", + "cluster", + "provider", + ], + ) +- + task_name: "apinto_day_proxy_v1" + cron: "0 0 * * *" + offset: "2m45s" + flux: | + + from(bucket: "apinto/hour") + |> range(start: -1d) + |> filter(fn: (r) => r._measurement == "proxy") + |> filter( + fn: (r) => + r._field == "p_total" or r._field == "p_success" or r._field == "p_s4xx" or r._field + == + "p_s5xx" or r._field == "p_timing" or r._field == "p_request" or r._field + == + "p_response" or r._field == "p_retry", + ) + |> group( + columns: [ + "api", + "app", + "upstream", + "addr", + "method", + "node", + "cluster", + "provider", + "_field", + ], + ) + |> sum() + |> set(key: "_measurement", value: "proxy") + |> to( + bucket: "apinto/day", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "addr", + "node", + "cluster", + "provider", + ], + ) + + from(bucket: "apinto/hour") + |> range(start: -1d) + |> filter(fn: (r) => r._measurement == "proxy") + |> filter( + fn: (r) => + r._field == "p_timing_max" or r._field == "p_request_max" or r._field + == + "p_response_max" or r._field == "p_retry_max", + ) + |> group( + columns: [ + "api", + "app", + "upstream", + "addr", + "method", + "node", + "cluster", + "provider", + "_field", + ], + ) + |> max() + |> set(key: "_measurement", value: "proxy") + |> to( + bucket: "apinto/day", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "addr", + "node", + "cluster", + "provider", + ], + ) + + from(bucket: "apinto/hour") + |> range(start: -1d) + |> filter(fn: (r) => r._measurement == "proxy") + |> filter( + fn: (r) => + r._field == "p_timing_min" or r._field == "p_request_min" or r._field + == + "p_response_min" or r._field == "p_retry_min", + ) + |> group( + columns: [ + "api", + "app", + "upstream", + "addr", + "method", + "node", + "cluster", + "provider", + "_field", + ], + ) + |> max() + |> set(key: "_measurement", value: "proxy") + |> to( + bucket: "apinto/day", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "addr", + "node", + "cluster", + "provider", + ], + ) +- + task_name: "apinto_week_request_v1" + cron: "0 0 * * 1" + offset: "3m30s" + flux: | + + from(bucket: "apinto/day") + |> range(start: -1w) + |> filter(fn: (r) => r._measurement == "request") + |> filter( + fn: (r) => + r._field == "total" or r._field == "success" or r._field == "s4xx" or r._field == "s5xx" + or + r._field == "timing" or r._field == "request" or r._field == "response" or r._field + == + "retry", + ) + |> group( + columns: [ + "api", + "app", + "upstream", + "method", + "node", + "cluster", + "provider", + "_field", + ], + ) + |> sum() + |> set(key: "_measurement", value: "request") + |> to( + bucket: "apinto/week", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "node", + "cluster", + "provider", + ], + ) + from(bucket: "apinto/day") + |> range(start: -1w) + |> filter(fn: (r) => r._measurement == "request") + |> filter( + fn: (r) => + r._field == "timing_max" or r._field == "request_max" or r._field == "response_max" + or + r._field == "retry_max", + ) + |> group( + columns: [ + "api", + "app", + "upstream", + "method", + "node", + "cluster", + "provider", + "_field", + ], + ) + |> max() + |> set(key: "_measurement", value: "request") + |> to( + bucket: "apinto/week", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "node", + "cluster", + "provider", + ], + ) + from(bucket: "apinto/day") + |> range(start: -1w) + |> filter(fn: (r) => r._measurement == "request") + |> filter( + fn: (r) => + r._field == "timing_min" or r._field == "request_min" or r._field == "response_min" + or + r._field == "retry_min", + ) + |> group( + columns: [ + "api", + "app", + "upstream", + "method", + "node", + "cluster", + "provider", + "_field", + ], + ) + |> max() + |> set(key: "_measurement", value: "request") + |> to( + bucket: "apinto/week", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "node", + "cluster", + "provider", + ], + ) +- + task_name: "apinto_week_proxy_v1" + cron: "0 0 * * 1" + offset: "3m45s" + flux: | + + from(bucket: "apinto/day") + |> range(start: -1w) + |> filter(fn: (r) => r._measurement == "proxy") + |> filter( + fn: (r) => + r._field == "p_total" or r._field == "p_success" or r._field == "p_s4xx" or r._field + == + "p_s5xx" or r._field == "p_timing" or r._field == "p_request" or r._field + == + "p_response" or r._field == "p_retry", + ) + |> group( + columns: [ + "api", + "app", + "upstream", + "addr", + "method", + "node", + "cluster", + "provider", + "_field", + ], + ) + |> sum() + |> set(key: "_measurement", value: "proxy") + |> to( + bucket: "apinto/week", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "addr", + "node", + "cluster", + "provider", + ], + ) + from(bucket: "apinto/day") + |> range(start: -1w) + |> filter(fn: (r) => r._measurement == "proxy") + |> filter( + fn: (r) => + r._field == "p_timing_max" or r._field == "p_request_max" or r._field + == + "p_response_max" or r._field == "p_retry_max", + ) + |> group( + columns: [ + "api", + "app", + "upstream", + "addr", + "method", + "node", + "cluster", + "provider", + "_field", + ], + ) + |> max() + |> set(key: "_measurement", value: "proxy") + |> to( + bucket: "apinto/week", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "addr", + "node", + "cluster", + "provider", + ], + ) + from(bucket: "apinto/day") + |> range(start: -1w) + |> filter(fn: (r) => r._measurement == "proxy") + |> filter( + fn: (r) => + r._field == "p_timing_min" or r._field == "p_request_min" or r._field + == + "p_response_min" or r._field == "p_retry_min", + ) + |> group( + columns: [ + "api", + "app", + "upstream", + "addr", + "method", + "node", + "cluster", + "provider", + "_field", + ], + ) + |> max() + |> set(key: "_measurement", value: "proxy") + |> to( + bucket: "apinto/week", + timeColumn: "_start", + tagColumns: [ + "api", + "app", + "method", + "upstream", + "addr", + "node", + "cluster", + "provider", + ], + ) \ No newline at end of file diff --git a/module/monitor/driver/influxdb-v2/flux/init.go b/module/monitor/driver/influxdb-v2/flux/init.go new file mode 100644 index 00000000..ce040b0e --- /dev/null +++ b/module/monitor/driver/influxdb-v2/flux/init.go @@ -0,0 +1,18 @@ +package flux + +import ( + "reflect" + + "github.com/eolinker/go-common/autowire" +) + +func init() { + autowire.Auto[IFluxQuery](func() reflect.Value { + return reflect.ValueOf(new(fluxQuery)) + }) + + //初始化buckets配置 + initBucketsConfig() + //初始化tasks定时脚本配置 + initTasksConfig() +} diff --git a/module/monitor/driver/influxdb-v2/flux/monitor_flux.go b/module/monitor/driver/influxdb-v2/flux/monitor_flux.go new file mode 100644 index 00000000..8acbd155 --- /dev/null +++ b/module/monitor/driver/influxdb-v2/flux/monitor_flux.go @@ -0,0 +1,37 @@ +package flux + +// FluxStatistics flux统计通用字段 +type FluxStatistics struct { + Total int64 `json:"total"` //总数 + Success int64 `json:"success"` //成功数 + ProxyTotal int64 `json:"p_total"` //转发总数 + ProxySuccess int64 `json:"p_success"` //转发成功数 + TotalTiming int64 `json:"timing"` //平均响应时间 + MaxTiming int64 `json:"timing_max"` //最大响应时间 + MinTiming int64 `json:"timing_min"` //最小响应时间 + TotalRequest int64 `json:"request"` //总请求流量 + RequestMax int64 `json:"request_max"` //最大流量 + RequestMin int64 `json:"request_min"` //最小流量 +} + +// FluxWarnStatistics flux统计告警通用字段 +type FluxWarnStatistics struct { + Total int64 `json:"total"` //总数 + Success int64 `json:"success"` //成功数 + S4xx int64 `json:"s4xx"` + S5xx int64 `json:"s5xx"` + ProxyTotal int64 `json:"p_total"` //转发总数 + ProxySuccess int64 `json:"p_success"` //转发成功数 + ProxyS4xx int64 `json:"p_s4xx"` + ProxyS5xx int64 `json:"p_s5xx"` + TotalTiming int64 `json:"timing"` //平均响应时间 + TotalRequest int64 `json:"request"` //总请求流量 + TotalResponse int64 `json:"response"` //总响应流量 +} + +// StatisticsFilterConf 统计表过滤_field的配置 +type StatisticsFilterConf struct { + Measurement string + AggregateFn string + Fields []string +} diff --git a/module/monitor/driver/influxdb-v2/flux/tasks_config.go b/module/monitor/driver/influxdb-v2/flux/tasks_config.go new file mode 100644 index 00000000..54b595a4 --- /dev/null +++ b/module/monitor/driver/influxdb-v2/flux/tasks_config.go @@ -0,0 +1,34 @@ +package flux + +import ( + _ "embed" + + "gopkg.in/yaml.v3" +) + +//go:embed influxdb_config/tasks.yaml +var tasksData []byte + +var ( + taskList []*TaskConf +) + +type TaskConf struct { + TaskName string `yaml:"task_name"` + Cron string `yaml:"cron"` + Offset string `yaml:"offset"` + Flux string `yaml:"flux"` +} + +func initTasksConfig() { + conf := make([]*TaskConf, 0, 15) + err := yaml.Unmarshal(tasksData, &conf) + if err != nil { + panic(err) + } + taskList = conf +} + +func GetTaskConfigList() []*TaskConf { + return taskList +} diff --git a/module/monitor/driver/influxdb-v2/influxdb-v2.go b/module/monitor/driver/influxdb-v2/influxdb-v2.go new file mode 100644 index 00000000..c85b3d9e --- /dev/null +++ b/module/monitor/driver/influxdb-v2/influxdb-v2.go @@ -0,0 +1,61 @@ +package influxdb_v2 + +import ( + "encoding/json" + "fmt" + "net/url" + + "github.com/eolinker/go-common/autowire" + + "github.com/APIParkLab/APIPark/module/monitor/driver/influxdb-v2/flux" + + "github.com/APIParkLab/APIPark/module/monitor/driver" +) + +var _ driver.IDriver = (*influxdbV2)(nil) + +const ( + name = "influxdb-v2" +) + +func init() { + d := newInfluxdbV2() + autowire.Autowired(d) + driver.Register(d) +} + +type InfluxdbV2Config struct { + Addr string `json:"addr"` + Token string `json:"token"` + Org string `json:"org"` +} + +func newInfluxdbV2() *influxdbV2 { + return &influxdbV2{} +} + +type influxdbV2 struct { + fluxQuery flux.IFluxQuery `autowired:""` +} + +func (i *influxdbV2) Name() string { + return name +} + +func (i *influxdbV2) Check(cfg string) error { + var data InfluxdbV2Config + err := json.Unmarshal([]byte(cfg), &data) + if err != nil { + return err + } + + _, err = url.Parse(data.Addr) + if err != nil { + return fmt.Errorf("addr is invalid") + } + return nil +} + +func (i *influxdbV2) Create(cfg string) (driver.IExecutor, error) { + return newExecutor(cfg, i.fluxQuery) +} diff --git a/module/monitor/driver/influxdb-v2/util.go b/module/monitor/driver/influxdb-v2/util.go new file mode 100644 index 00000000..7021f5bf --- /dev/null +++ b/module/monitor/driver/influxdb-v2/util.go @@ -0,0 +1,189 @@ +package influxdb_v2 + +import ( + "fmt" + "strconv" + "strings" + "time" + + "github.com/APIParkLab/APIPark/service/monitor" +) + +const ( + oneHour = 3600 + oneDay = 24 * oneHour + tenDay = 10 * oneDay + oneYear = 365 * oneDay + + bucketMinuteRetention = (7 - 1) * oneDay + bucketHourRetention = (90 - 1) * oneDay + bucketDayRetention = (5*365 - 1) * oneDay + + bucketMinute = "apinto/minute" + bucketHour = "apinto/hour" + bucketDay = "apinto/day" + bucketWeek = "apinto/week" + + timeZone = "Asia/Shanghai" +) + +// formatStartTimeHour 将time格式化为小时整 +func formatStartTimeHour(t time.Time, location *time.Location) time.Time { + return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), 0, 0, 0, location) +} + +// formatStartTimeDay 将time格式化为天整 +func formatStartTimeDay(t time.Time, location *time.Location) time.Time { + return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, location) +} + +// formatStartTimeDay 将startTime向前移到周一,因为week桶里的time是每个周一才有数据 +func formatStartTimeToMonday(t time.Time, location *time.Location) time.Time { + var dayDiff int + switch t.Weekday() { + case time.Monday: + dayDiff = 0 + case time.Tuesday: + dayDiff = -1 + case time.Wednesday: + dayDiff = -2 + case time.Thursday: + dayDiff = -3 + case time.Friday: + dayDiff = -4 + case time.Saturday: + dayDiff = -5 + case time.Sunday: + dayDiff = -6 + } + + return time.Date(t.Year(), t.Month(), t.Day()+dayDiff, 0, 0, 0, 0, location) +} + +// FormatFloat64 float64保留places位小数 +func FormatFloat64(f float64, places int) float64 { + formatStr := "%." + strconv.Itoa(places) + "f" + f64, _ := strconv.ParseFloat(fmt.Sprintf(formatStr, f), 64) + return f64 +} + +func formatFilter(wheres []monitor.MonWhereItem) string { + filter := `` + if len(wheres) > 0 { + filters := make([]string, 0, len(wheres)) + for _, where := range wheres { + if len(where.Values) > 0 { + wl := make([]string, 0, len(where.Values)) + for _, v := range where.Values { + wl = append(wl, fmt.Sprintf(fmt.Sprintf(`r["%s"] == "%s"`, where.Key, v))) + } + filters = append(filters, fmt.Sprint("(", strings.Join(wl, " or "), ")")) + } + } + filter = fmt.Sprint(`|> filter(fn:(r)=>`, strings.Join(filters, " and "), ")") + } + return filter +} + +// getTimeIntervalAndBucket 根据start和end来获取窗口时间间隔,窗口偏移量offset,以及使用的bucket, 查询的startTime也会格式化 +func getTimeIntervalAndBucket(startTime, endTime time.Time) (time.Time, string, string, string) { + //根据start距离现在的时长算出可使用的最小桶 + minimumBucket := "" + startToNow := time.Now().Unix() - startTime.Unix() + if startToNow <= bucketMinuteRetention { + minimumBucket = bucketMinute + } else if startToNow <= bucketHourRetention { + minimumBucket = bucketHour + } else if startToNow <= bucketDayRetention { + minimumBucket = bucketDay + } else { + minimumBucket = bucketWeek + } + + //结合可使用的最小桶,根据end-start时间间隔来得出合适的桶和趋势图时间间隔 + diff := endTime.Unix() - startTime.Unix() + location, _ := time.LoadLocation(timeZone) + if diff <= oneHour { + + switch minimumBucket { + case bucketMinute: + return startTime, "1m", "", bucketMinute + case bucketHour: + //start变成小时整 + newStart := formatStartTimeHour(startTime, location) + return newStart, "1h", "", bucketHour + case bucketDay: + //start 变成一天整 + newStart := formatStartTimeDay(startTime, location) + return newStart, "1d", "", bucketDay + case bucketWeek: + //将startTime往前顺延到周一 + newStart := formatStartTimeToMonday(startTime, location) + return newStart, "1w", "-4d", bucketWeek + } + + } else if diff <= oneDay { + + switch minimumBucket { + case bucketMinute: + offset := "" + offsetTime := startTime.Minute() % 5 + if offsetTime != 0 { + offset = fmt.Sprintf("%dm", offsetTime) + } + return startTime, "5m", offset, bucketMinute + + case bucketHour: + newStart := formatStartTimeHour(startTime, location) + return newStart, "1h", "", bucketHour + case bucketDay: + newStart := formatStartTimeDay(startTime, location) + return newStart, "1d", "", bucketDay + case bucketWeek: + //将startTime往前顺延到周一 + newStart := formatStartTimeToMonday(startTime, location) + return newStart, "1w", "-4d", bucketWeek + } + + } else if diff <= tenDay { + + switch minimumBucket { + case bucketMinute, bucketHour: + newStart := formatStartTimeHour(startTime, location) + return newStart, "1h", "", bucketHour + case bucketDay: + newStart := formatStartTimeDay(startTime, location) + return newStart, "1d", "", bucketDay + case bucketWeek: + //将startTime往前顺延到周一 + newStart := formatStartTimeToMonday(startTime, location) + return newStart, "1w", "-4d", bucketWeek + } + + } else if diff < oneYear { + + switch minimumBucket { + case bucketMinute, bucketHour, bucketDay: + newStart := formatStartTimeDay(startTime, location) + return newStart, "1d", "", bucketDay + case bucketWeek: + //将startTime往前顺延到周一 + newStart := formatStartTimeToMonday(startTime, location) + return newStart, "1w", "-4d", bucketWeek + } + + } + + //end-start大于1年 时间间隔为1周 + //将startTime往前顺延到周一 + newStart := formatStartTimeToMonday(startTime, location) + return newStart, "1w", "-4d", bucketWeek +} + +func formatMessageTrendData(data []int64) []float64 { + floatData := make([]float64, 0, len(data)) + for _, d := range data { + floatData = append(floatData, FormatFloat64(float64(d)/1024, 2)) + } + return floatData +} diff --git a/module/monitor/driver/manager.go b/module/monitor/driver/manager.go new file mode 100644 index 00000000..03babe26 --- /dev/null +++ b/module/monitor/driver/manager.go @@ -0,0 +1,61 @@ +package driver + +import ( + "errors" + + "github.com/eolinker/eosc" +) + +var ( + ErrDriverNotFound = errors.New("driver not found") +) + +type Manager struct { + drivers eosc.Untyped[string, IDriver] +} + +var ( + manager = NewManager() +) + +func NewManager() *Manager { + return &Manager{ + drivers: eosc.BuildUntyped[string, IDriver](), + } +} + +func (m *Manager) Register(driver IDriver) { + m.drivers.Set(driver.Name(), driver) +} + +func (m *Manager) Get(name string) (IDriver, bool) { + return m.drivers.Get(name) +} + +func (m *Manager) Names() []string { + return m.drivers.Keys() +} + +func Get(name string) (IDriver, bool) { + return manager.Get(name) +} + +func Register(driver IDriver) { + manager.Register(driver) +} + +func CreateExecutor(name string, cfg string) (IExecutor, error) { + d, has := manager.Get(name) + if !has { + return nil, ErrDriverNotFound + } + return d.Create(cfg) +} + +func Check(name string, cfg string) error { + d, has := manager.Get(name) + if !has { + return ErrDriverNotFound + } + return d.Check(cfg) +} diff --git a/module/monitor/dto/input.go b/module/monitor/dto/input.go new file mode 100644 index 00000000..7de6a169 --- /dev/null +++ b/module/monitor/dto/input.go @@ -0,0 +1,35 @@ +package monitor_dto + +type MonWhereItem struct { + Key string + Operation string // 表达式,默认为 =,多个为 in,可以用其他 + Values []string +} + +const ( + DataTypeApi = "api" + DataTypeProvider = "provider" + DataTypeSubscriber = "subscriber" +) + +type Top10Input struct { + *CommonInput + DataType string `json:"data_type"` +} + +type CommonInput struct { + Start int64 `json:"start"` + End int64 `json:"end"` +} + +type StatisticInput struct { + Apis []string `json:"apis"` + Projects []string `json:"projects"` + Path string `json:"path"` + *CommonInput +} + +type SaveMonitorConfig struct { + Driver string `json:"driver"` + Config map[string]interface{} `json:"config"` +} diff --git a/module/monitor/dto/output.go b/module/monitor/dto/output.go new file mode 100644 index 00000000..0bf769cb --- /dev/null +++ b/module/monitor/dto/output.go @@ -0,0 +1,134 @@ +package monitor_dto + +import ( + "time" + + "github.com/APIParkLab/APIPark/service/monitor" + + "github.com/eolinker/go-common/auto" +) + +type ApiStatisticItem struct { + *ApiStatisticBasicItem + IsRed bool `json:"is_red"` //是否标红 +} + +type ApiStatisticBasicItem struct { + Id string `json:"id"` //apiID + Name string `json:"name"` //api名称 + Path string `json:"path"` + Service auto.Label `json:"service" aolabel:"service"` + *MonCommonData +} + +type ProjectStatisticItem struct { + *ProjectStatisticBasicItem + IsRed bool `json:"is_red"` //是否标红 +} + +type ProjectStatisticBasicItem struct { + Id string `json:"id"` //订阅方ID + Name string `json:"name"` //订阅方名称 + *MonCommonData +} + +// MonSummaryOutput 请求/转发统计 +type MonSummaryOutput struct { + Total int64 `json:"total"` // 请求总数 + Success int64 `json:"success"` //请求成功数 + Fail int64 `json:"fail"` //请求失败数 + Status4Xx int64 `json:"status_4xx"` //状态码4xx数 + Status5Xx int64 `json:"status_5xx"` //状态码5xx数 +} + +func ToMonSummaryOutput(output *monitor.Summary) *MonSummaryOutput { + return &MonSummaryOutput{ + Total: output.Total, + Success: output.Success, + Fail: output.Fail, + Status4Xx: output.Status4Xx, + Status5Xx: output.Status5Xx, + } +} + +type MonMessageTrend struct { + Dates []time.Time `json:"dates"` + ReqMessage []float64 `json:"req_message"` + RespMessage []float64 `json:"resp_message"` +} + +func ToMonMessageTrend(item *monitor.MonMessageTrend) *MonMessageTrend { + return &MonMessageTrend{ + Dates: item.Dates, + ReqMessage: item.ReqMessage, + RespMessage: item.RespMessage, + } +} + +// MonCommonData 通用字段 +type MonCommonData struct { + RequestTotal int64 `json:"request_total"` //请求总数 + RequestSuccess int64 `json:"request_success"` //请求成功数 + RequestRate float64 `json:"request_rate"` //请求成功率 + ProxyTotal int64 `json:"proxy_total"` //转发总数 + ProxySuccess int64 `json:"proxy_success"` //转发成功数 + ProxyRate float64 `json:"proxy_rate"` //转发成功率 + StatusFail int64 `json:"status_fail"` //失败状态数 + AvgResp float64 `json:"avg_resp"` //平均响应时间 + MaxResp int64 `json:"max_resp"` //最大响应时间 + MinResp int64 `json:"min_resp"` //最小响应时间 + AvgTraffic float64 `json:"avg_traffic"` //平均流量 + MaxTraffic int64 `json:"max_traffic"` //最大流量 + MinTraffic int64 `json:"min_traffic"` //最小流量 +} + +func ToMonCommonData(item monitor.MonCommonData) *MonCommonData { + return &MonCommonData{ + RequestTotal: item.RequestTotal, + RequestSuccess: item.RequestSuccess, + RequestRate: item.RequestRate, + ProxyTotal: item.ProxyTotal, + ProxySuccess: item.ProxySuccess, + ProxyRate: item.ProxyRate, + StatusFail: item.StatusFail, + AvgResp: item.AvgResp, + MaxResp: item.MaxResp, + MinResp: item.MinResp, + AvgTraffic: item.AvgTraffic, + MaxTraffic: item.MaxTraffic, + MinTraffic: item.MinTraffic, + } +} + +type MonInvokeCountTrend struct { + Date []time.Time `json:"date"` + Status5XX []int64 `json:"status_5xx"` + Status4XX []int64 `json:"status_4xx"` + ProxyRate []float64 `json:"proxy_rate"` + ProxyTotal []int64 `json:"proxy_total"` + RequestRate []float64 `json:"request_rate"` + RequestTotal []int64 `json:"request_total"` +} + +func ToMonInvokeCountTrend(item *monitor.MonInvokeCountTrend) *MonInvokeCountTrend { + return &MonInvokeCountTrend{ + Date: item.Date, + Status5XX: item.Status5XX, + Status4XX: item.Status4XX, + ProxyRate: item.ProxyRate, + ProxyTotal: item.ProxyTotal, + RequestRate: item.RequestRate, + RequestTotal: item.RequestTotal, + } +} + +type MonMessageChart struct { + Date []time.Time `json:"date"` + Request []float64 `json:"request"` + Response []float64 `json:"response"` +} + +type MonitorConfig struct { + Driver string `json:"driver"` + Config map[string]interface{} `json:"config"` +} diff --git a/module/monitor/iml.go b/module/monitor/iml.go new file mode 100644 index 00000000..d3f66f17 --- /dev/null +++ b/module/monitor/iml.go @@ -0,0 +1,405 @@ +package monitor + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "github.com/APIParkLab/APIPark/gateway" + "github.com/eolinker/eosc/log" + "github.com/eolinker/go-common/auto" + "github.com/eolinker/go-common/store" + "github.com/eolinker/go-common/utils" + "gorm.io/gorm" + "sort" + "time" + + "github.com/APIParkLab/APIPark/service/service" + + "github.com/APIParkLab/APIPark/service/subscribe" + + "github.com/APIParkLab/APIPark/service/cluster" + + "github.com/APIParkLab/APIPark/module/monitor/driver" + + "github.com/APIParkLab/APIPark/service/api" + + "github.com/APIParkLab/APIPark/service/monitor" + + monitor_dto "github.com/APIParkLab/APIPark/module/monitor/dto" +) + +var ( + _ IMonitorStatisticModule = (*imlMonitorStatisticModule)(nil) +) + +type imlMonitorStatisticModule struct { + monitorStatisticCacheService monitor.IMonitorStatisticsCache `autowired:""` + subscribeService subscribe.ISubscribeService `autowired:""` + serviceService service.IServiceService `autowired:""` + clusterService cluster.IClusterService `autowired:""` + monitorService monitor.IMonitorService `autowired:""` + apiService api.IAPIService `autowired:""` +} + +func (i *imlMonitorStatisticModule) MessageTrend(ctx context.Context, input *monitor_dto.CommonInput) (*monitor_dto.MonMessageTrend, string, error) { + clusterId := cluster.DefaultClusterID + wheres, err := i.genCommonWheres(ctx, clusterId) + if err != nil { + return nil, "", err + } + executor, err := i.getExecutor(ctx, clusterId) + if err != nil { + return nil, "", err + } + result, timeInterval, err := executor.MessageTrend(ctx, formatTimeByMinute(input.Start), formatTimeByMinute(input.End), wheres) + if err != nil { + return nil, "", err + } + return monitor_dto.ToMonMessageTrend(result), timeInterval, nil +} + +func (i *imlMonitorStatisticModule) InvokeTrend(ctx context.Context, input *monitor_dto.CommonInput) (*monitor_dto.MonInvokeCountTrend, string, error) { + clusterId := cluster.DefaultClusterID + wheres, err := i.genCommonWheres(ctx, clusterId) + if err != nil { + return nil, "", err + } + executor, err := i.getExecutor(ctx, clusterId) + if err != nil { + return nil, "", err + } + result, timeInterval, err := executor.InvokeTrend(ctx, formatTimeByMinute(input.Start), formatTimeByMinute(input.End), wheres) + if err != nil { + return nil, "", err + } + return monitor_dto.ToMonInvokeCountTrend(result), timeInterval, nil +} + +func (i *imlMonitorStatisticModule) genCommonWheres(ctx context.Context, clusterIds ...string) ([]monitor.MonWhereItem, error) { + + clusters, err := i.clusterService.List(ctx, clusterIds...) + if err != nil { + return nil, err + } + clusterIds = utils.SliceToSlice(clusters, func(item *cluster.Cluster) string { + return item.Uuid + }) + + wheres := make([]monitor.MonWhereItem, 0, 1) + + wheres = append(wheres, monitor.MonWhereItem{ + Key: "cluster", + Operation: "=", + Values: clusterIds, + }) + + return wheres, nil +} + +func (i *imlMonitorStatisticModule) statistics(ctx context.Context, clusterId string, groupBy string, start, end time.Time, wheres []monitor.MonWhereItem, limit int) (map[string]monitor.MonCommonData, error) { + statisticMap, _ := i.monitorStatisticCacheService.GetStatisticsCache(ctx, clusterId, start, end, groupBy, wheres, limit) + if len(statisticMap) > 0 { + return statisticMap, nil + } + + executor, err := i.getExecutor(ctx, clusterId) + if err != nil { + return nil, err + } + + result, err := executor.CommonStatistics(ctx, start, end, groupBy, limit, wheres) + if err != nil { + return nil, err + } + i.monitorStatisticCacheService.SetStatisticsCache(ctx, clusterId, start, end, groupBy, wheres, limit, result) + return result, nil +} + +func (i *imlMonitorStatisticModule) TopAPIStatistics(ctx context.Context, limit int, input *monitor_dto.CommonInput) ([]*monitor_dto.ApiStatisticItem, error) { + clusterId := cluster.DefaultClusterID + wheres, err := i.genCommonWheres(ctx, clusterId) + if err != nil { + return nil, err + } + + statisticMap, err := i.statistics(ctx, clusterId, "api", formatTimeByMinute(input.Start), formatTimeByMinute(input.End), wheres, limit) + if err != nil { + return nil, err + } + + uuids := utils.MapToSlice(statisticMap, func(key string, value monitor.MonCommonData) string { + return value.ID + }) + apis, err := i.apiService.ListInfo(ctx, uuids...) + if err != nil { + return nil, err + } + apiMap := utils.SliceToMap(apis, func(t *api.Info) string { + return t.UUID + }) + result := make([]*monitor_dto.ApiStatisticItem, 0, len(statisticMap)) + for key, item := range statisticMap { + statisticItem := &monitor_dto.ApiStatisticItem{ + ApiStatisticBasicItem: &monitor_dto.ApiStatisticBasicItem{ + Id: key, + MonCommonData: monitor_dto.ToMonCommonData(item), + }, + } + if a, ok := apiMap[item.ID]; ok { + statisticItem.Name = a.Name + statisticItem.Path = a.Path + statisticItem.Service = auto.UUID(a.Service) + } else { + statisticItem.IsRed = true + if key == "-" { + statisticItem.Name = "无API" + } else { + statisticItem.Name = fmt.Sprintf("未知API-%s", key) + } + } + result = append(result, statisticItem) + } + sort.Slice(result, func(i, j int) bool { + return result[i].RequestTotal > result[j].RequestTotal + }) + return result, nil + +} + +func (i *imlMonitorStatisticModule) TopSubscriberStatistics(ctx context.Context, limit int, input *monitor_dto.CommonInput) ([]*monitor_dto.ProjectStatisticItem, error) { + clusterId := cluster.DefaultClusterID + _, err := i.clusterService.Get(ctx, clusterId) + if err != nil { + return nil, err + } + return i.topProjectStatistics(ctx, clusterId, "app", input, limit) +} + +func (i *imlMonitorStatisticModule) TopProviderStatistics(ctx context.Context, limit int, input *monitor_dto.CommonInput) ([]*monitor_dto.ProjectStatisticItem, error) { + clusterId := cluster.DefaultClusterID + _, err := i.clusterService.Get(ctx, clusterId) + if err != nil { + return nil, err + } + return i.topProjectStatistics(ctx, clusterId, "provider", input, limit) +} + +func (i *imlMonitorStatisticModule) topProjectStatistics(ctx context.Context, partitionId string, groupBy string, input *monitor_dto.CommonInput, limit int) ([]*monitor_dto.ProjectStatisticItem, error) { + wheres, err := i.genCommonWheres(ctx, partitionId) + if err != nil { + return nil, err + } + statisticMap, err := i.statistics(ctx, partitionId, groupBy, formatTimeByMinute(input.Start), formatTimeByMinute(input.End), wheres, limit) + if err != nil { + return nil, err + } + var projects []*service.Service + switch groupBy { + case "app": + projects, err = i.serviceService.AppList(ctx) + case "provider": + projects, err = i.serviceService.ServiceList(ctx) + default: + return nil, errors.New("invalid group by") + } + if err != nil { + return nil, err + } + projectMap := utils.SliceToMap(projects, func(t *service.Service) string { + return t.Id + }) + + result := make([]*monitor_dto.ProjectStatisticItem, 0, len(statisticMap)) + for key, item := range statisticMap { + statisticItem := &monitor_dto.ProjectStatisticItem{ + ProjectStatisticBasicItem: &monitor_dto.ProjectStatisticBasicItem{ + Id: key, + MonCommonData: monitor_dto.ToMonCommonData(item), + }, + } + if a, ok := projectMap[item.ID]; ok { + statisticItem.Name = a.Name + } else { + statisticItem.IsRed = true + if key == "-" { + statisticItem.Name = "无系统" + } else { + statisticItem.Name = fmt.Sprintf("未知系统-%s", key) + } + } + result = append(result, statisticItem) + } + sort.Slice(result, func(i, j int) bool { + return result[i].RequestTotal > result[j].RequestTotal + }) + return result, nil +} +func (i *imlMonitorStatisticModule) getExecutor(ctx context.Context, clusterId string) (driver.IExecutor, error) { + info, err := i.monitorService.GetByCluster(ctx, clusterId) + if err != nil { + return nil, err + } + return driver.CreateExecutor(info.Driver, info.Config) +} + +func (i *imlMonitorStatisticModule) RequestSummary(ctx context.Context, input *monitor_dto.CommonInput) (*monitor_dto.MonSummaryOutput, error) { + clusterId := cluster.DefaultClusterID + wheres, err := i.genCommonWheres(ctx, clusterId) + if err != nil { + return nil, err + } + executor, err := i.getExecutor(ctx, clusterId) + if err != nil { + return nil, err + + } + summary, err := executor.RequestSummary(ctx, formatTimeByMinute(input.Start), formatTimeByMinute(input.End), wheres) + if err != nil { + return nil, err + } + + return monitor_dto.ToMonSummaryOutput(summary), nil +} + +func (i *imlMonitorStatisticModule) ProxySummary(ctx context.Context, input *monitor_dto.CommonInput) (*monitor_dto.MonSummaryOutput, error) { + clusterId := cluster.DefaultClusterID + wheres, err := i.genCommonWheres(ctx, clusterId) + if err != nil { + return nil, err + } + executor, err := i.getExecutor(ctx, clusterId) + if err != nil { + return nil, err + + } + summary, err := executor.ProxySummary(ctx, formatTimeByMinute(input.Start), formatTimeByMinute(input.End), wheres) + if err != nil { + return nil, err + } + + return monitor_dto.ToMonSummaryOutput(summary), nil +} + +var ( + _ IMonitorConfigModule = (*imlMonitorConfig)(nil) +) + +type imlMonitorConfig struct { + clusterService cluster.IClusterService `autowired:""` + monitorService monitor.IMonitorService `autowired:""` + transaction store.ITransaction `autowired:""` +} + +func (m *imlMonitorConfig) dynamicClient(ctx context.Context, clusterId string, resource string, f func(gateway.IDynamicClient) error) error { + client, err := m.clusterService.GatewayClient(ctx, clusterId) + if err != nil { + return err + } + defer func() { + err := client.Close(ctx) + if err != nil { + log.Warn("close apinto client:", err) + } + }() + dynamic, err := client.Dynamic(resource) + if err != nil { + return err + } + return f(dynamic) +} + +func (m *imlMonitorConfig) SaveMonitorConfig(ctx context.Context, cfg *monitor_dto.SaveMonitorConfig) (*monitor_dto.MonitorConfig, error) { + clusterId := cluster.DefaultClusterID + _, err := m.clusterService.Get(ctx, clusterId) + if err != nil { + return nil, err + } + + data, _ := json.Marshal(cfg.Config) + err = driver.Check(cfg.Driver, string(data)) + if err != nil { + return nil, err + } + + executor, err := driver.CreateExecutor(cfg.Driver, string(data)) + if err != nil { + return nil, err + } + err = executor.Init(ctx) + if err != nil { + return nil, err + } + clusters, err := m.clusterService.ListByClusters(ctx, clusterId) + if err != nil { + return nil, err + } + version := time.Now().Format("20060102150405") + id := fmt.Sprintf("%s_influxdb", clusterId) + for _, c := range clusters { + err := m.dynamicClient(ctx, c.Uuid, "influxdbv2", func(client gateway.IDynamicClient) error { + pubCfg := &gateway.DynamicRelease{ + BasicItem: &gateway.BasicItem{ + ID: id, + Description: "", + Version: version, + MatchLabels: map[string]string{ + "module": "monitor", + }, + }, + Attr: map[string]interface{}{ + "org": cfg.Config["org"], + "token": cfg.Config["token"], + "url": cfg.Config["addr"], + "bucket": "apinto", + "scopes": []string{"monitor"}, + }, + } + return client.Online(ctx, pubCfg) + }) + if err != nil { + return nil, err + } + + } + + err = m.monitorService.Save(ctx, &monitor.SaveMonitor{ + Cluster: clusterId, + Driver: cfg.Driver, + Config: string(data), + }) + if err != nil { + return nil, err + } + + return m.GetMonitorConfig(ctx) +} + +func (m *imlMonitorConfig) GetMonitorConfig(ctx context.Context) (*monitor_dto.MonitorConfig, error) { + clusterId := cluster.DefaultClusterID + _, err := m.clusterService.Get(ctx, clusterId) + if err != nil { + return nil, err + + } + info, err := m.monitorService.GetByCluster(ctx, clusterId) + if err != nil { + if !errors.Is(err, gorm.ErrRecordNotFound) { + return nil, err + } + return &monitor_dto.MonitorConfig{ + Driver: "influxdb-v2", + Config: map[string]interface{}{}, + }, nil + } + cfg := make(map[string]interface{}) + err = json.Unmarshal([]byte(info.Config), &cfg) + if err != nil { + return nil, err + } + return &monitor_dto.MonitorConfig{ + Driver: info.Driver, + Config: cfg, + }, nil + return nil, nil +} diff --git a/module/monitor/monitor.go b/module/monitor/monitor.go new file mode 100644 index 00000000..1a9fe2ad --- /dev/null +++ b/module/monitor/monitor.go @@ -0,0 +1,48 @@ +package monitor + +import ( + "context" + "reflect" + "time" + + "github.com/eolinker/go-common/autowire" + + _ "github.com/APIParkLab/APIPark/module/monitor/driver/influxdb-v2" + monitor_dto "github.com/APIParkLab/APIPark/module/monitor/dto" +) + +type IMonitorStatisticModule interface { + TopAPIStatistics(ctx context.Context, limit int, input *monitor_dto.CommonInput) ([]*monitor_dto.ApiStatisticItem, error) + TopProviderStatistics(ctx context.Context, limit int, input *monitor_dto.CommonInput) ([]*monitor_dto.ProjectStatisticItem, error) + TopSubscriberStatistics(ctx context.Context, limit int, input *monitor_dto.CommonInput) ([]*monitor_dto.ProjectStatisticItem, error) + // RequestSummary 请求概况 + RequestSummary(ctx context.Context, input *monitor_dto.CommonInput) (*monitor_dto.MonSummaryOutput, error) + // ProxySummary 转发概况 + ProxySummary(ctx context.Context, input *monitor_dto.CommonInput) (*monitor_dto.MonSummaryOutput, error) + + // InvokeTrend 调用次数趋势 + InvokeTrend(ctx context.Context, input *monitor_dto.CommonInput) (*monitor_dto.MonInvokeCountTrend, string, error) + + // MessageTrend 消息趋势 + MessageTrend(ctx context.Context, input *monitor_dto.CommonInput) (*monitor_dto.MonMessageTrend, string, error) +} + +type IMonitorConfigModule interface { + SaveMonitorConfig(ctx context.Context, cfg *monitor_dto.SaveMonitorConfig) (*monitor_dto.MonitorConfig, error) + GetMonitorConfig(ctx context.Context) (*monitor_dto.MonitorConfig, error) +} + +func init() { + autowire.Auto[IMonitorStatisticModule](func() reflect.Value { + return reflect.ValueOf(new(imlMonitorStatisticModule)) + }) + + autowire.Auto[IMonitorConfigModule](func() reflect.Value { + return reflect.ValueOf(new(imlMonitorConfig)) + }) +} +func formatTimeByMinute(org int64) time.Time { + t := time.Unix(org, 0) + location, _ := time.LoadLocation("Asia/Shanghai") + return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), 0, 0, location) +} diff --git a/module/publish/dto/out.go b/module/publish/dto/out.go index bcfa4d37..23dda7dd 100644 --- a/module/publish/dto/out.go +++ b/module/publish/dto/out.go @@ -22,7 +22,7 @@ type Publish struct { } func FromModel(m *publish.Publish, versionRemark string) *Publish { - + p := &Publish{ Id: m.Id, Version: m.Version, @@ -32,7 +32,7 @@ func FromModel(m *publish.Publish, versionRemark string) *Publish { Service: auto.UUID(m.Service), Applicant: auto.UUID(m.Applicant), Release: auto.UUID(m.Release), - + Status: m.Status, ApplyTIme: auto.TimeLabel(m.ApplyTime), ApproveTime: auto.TimeLabel(m.ApproveTime), @@ -53,7 +53,7 @@ type PublishDetail struct { } type PublishStatus struct { - //Partition auto.Label `json:"partition" aolabel:"partition"` + //Cluster auto.Label `json:"partition" aolabel:"partition"` //Cluster auto.Label `json:"cluster" aolabel:"cluster"` Status string `json:"status"` Error string `json:"error"` diff --git a/module/service-diff/iml.go b/module/service-diff/iml.go index 857d5524..12248178 100644 --- a/module/service-diff/iml.go +++ b/module/service-diff/iml.go @@ -4,7 +4,7 @@ import ( "context" "errors" "fmt" - + "github.com/APIParkLab/APIPark/service/api" "github.com/APIParkLab/APIPark/service/cluster" "github.com/APIParkLab/APIPark/service/release" @@ -26,9 +26,9 @@ func (m *imlServiceDiff) Diff(ctx context.Context, serviceId string, baseRelease if targetRelease == "" { return nil, fmt.Errorf("target release is required") } - + var target *projectInfo - + targetReleaseValue, err := m.releaseService.GetRelease(ctx, targetRelease) if err != nil { return nil, fmt.Errorf("get target release failed:%w", err) @@ -36,7 +36,7 @@ func (m *imlServiceDiff) Diff(ctx context.Context, serviceId string, baseRelease if targetReleaseValue.Service != serviceId { return nil, errors.New("project not match") } - + target, err = m.getReleaseInfo(ctx, targetRelease) if err != nil { return nil, err @@ -55,7 +55,7 @@ func (m *imlServiceDiff) Diff(ctx context.Context, serviceId string, baseRelease }) diff := m.diff(clusterIds, base, target) return diff, nil - + } func (m *imlServiceDiff) getBaseInfo(ctx context.Context, serviceId, baseRelease string) (*projectInfo, error) { if baseRelease == "" { @@ -72,16 +72,16 @@ func (m *imlServiceDiff) getBaseInfo(ctx context.Context, serviceId, baseRelease if err != nil { return nil, fmt.Errorf("get base release info failed:%w", err) } - + return base, nil } func (m *imlServiceDiff) DiffForLatest(ctx context.Context, serviceId string, baseRelease string) (*service_diff.Diff, bool, error) { - + apis, err := m.apiService.ListForService(ctx, serviceId) if err != nil { return nil, false, err } - + apiIds := utils.SliceToSlice(apis, func(i *api.API) string { return i.UUID }) @@ -97,12 +97,12 @@ func (m *imlServiceDiff) DiffForLatest(ctx context.Context, serviceId string, ba if err != nil { return nil, false, err } - + upstreamCommits, err := m.upstreamService.ListLatestCommit(ctx, serviceId) if err != nil { return nil, false, err } - + base, err := m.getBaseInfo(ctx, serviceId, baseRelease) if err != nil { return nil, false, err @@ -128,7 +128,7 @@ func (m *imlServiceDiff) getReleaseInfo(ctx context.Context, releaseId string) ( if err != nil { return nil, err } - + apiIds := utils.SliceToSlice(commits, func(i *release.ProjectCommits) string { return i.Target }, func(c *release.ProjectCommits) bool { @@ -187,14 +187,14 @@ func (m *imlServiceDiff) diff(partitions []string, base, target *projectInfo) *s baseAPIDoc := utils.SliceToMap(base.apiDocs, func(i *commit.Commit[api.Document]) string { return i.Target }) - + targetApiProxy := utils.SliceToMap(target.apiCommits, func(i *commit.Commit[api.Proxy]) string { return i.Target }) targetAPIDoc := utils.SliceToMap(target.apiDocs, func(i *commit.Commit[api.Document]) string { return i.Target }) - + for _, apiInfo := range target.apis { apiId := apiInfo.UUID a := &service_diff.ApiDiff{ @@ -204,7 +204,7 @@ func (m *imlServiceDiff) diff(partitions []string, base, target *projectInfo) *s Path: apiInfo.Path, Status: service_diff.Status{}, } - + pc, hasPc := targetApiProxy[apiId] dc, hasDC := targetAPIDoc[apiId] if !hasPc { @@ -215,12 +215,12 @@ func (m *imlServiceDiff) diff(partitions []string, base, target *projectInfo) *s // 未设置文档 a.Status.Doc = service_diff.StatusUnset } - + if !baseApis.Has(apiId) { a.Change = service_diff.ChangeTypeNew } else { a.Change = service_diff.ChangeTypeNone - + baseProxy, hasBaseProxy := baseApiProxy[apiId] baseDoc, hasBaseDoc := baseAPIDoc[apiId] if hasBaseDoc != hasDC || hasBaseProxy != hasPc { @@ -232,7 +232,7 @@ func (m *imlServiceDiff) diff(partitions []string, base, target *projectInfo) *s } } out.Apis = append(out.Apis, a) - + } baseApis.Remove(utils.SliceToSlice(out.Apis, func(i *service_diff.ApiDiff) string { return i.APi @@ -248,7 +248,7 @@ func (m *imlServiceDiff) diff(partitions []string, base, target *projectInfo) *s Change: service_diff.ChangeTypeDelete, }) } - + } // upstream diff targetUpstreamMap := utils.SliceToMap(target.upstreamCommits, func(i *commit.Commit[upstream.Config]) string { @@ -257,12 +257,12 @@ func (m *imlServiceDiff) diff(partitions []string, base, target *projectInfo) *s baseUpstreamMap := utils.SliceToMap(base.upstreamCommits, func(i *commit.Commit[upstream.Config]) string { return fmt.Sprintf("%s-%s", i.Target, i.Key) }) - + for _, partitionId := range partitions { key := fmt.Sprintf("%s-%s", target.id, partitionId) o := &service_diff.UpstreamDiff{ Upstream: target.id, - //Partition: partitionId, + //Cluster: partitionId, Data: nil, Change: service_diff.ChangeTypeNone, Status: 0, @@ -277,7 +277,7 @@ func (m *imlServiceDiff) diff(partitions []string, base, target *projectInfo) *s } else if tu.UUID != bu.UUID { o.Change = service_diff.ChangeTypeUpdate } - + } else { o.Status = service_diff.StatusLoss if hasBu { @@ -285,12 +285,12 @@ func (m *imlServiceDiff) diff(partitions []string, base, target *projectInfo) *s } } } - + return out } func (m *imlServiceDiff) Out(ctx context.Context, diff *service_diff.Diff) (*DiffOut, error) { - + clusters, err := m.clusterService.List(ctx, diff.Clusters...) if err != nil { return nil, err @@ -298,7 +298,7 @@ func (m *imlServiceDiff) Out(ctx context.Context, diff *service_diff.Diff) (*Dif if len(clusters) == 0 { return nil, fmt.Errorf("unset gateway for clusters %v", diff.Clusters) } - + out := &DiffOut{} out.Apis = utils.SliceToSlice(diff.Apis, func(i *service_diff.ApiDiff) *ApiDiffOut { return &ApiDiffOut{ @@ -310,10 +310,10 @@ func (m *imlServiceDiff) Out(ctx context.Context, diff *service_diff.Diff) (*Dif Status: i.Status, } }) - + for _, u := range diff.Upstreams { typeValue := u.Data.Type - + if typeValue == "" { typeValue = "static" } diff --git a/plugins/core/core.go b/plugins/core/core.go index 647d14dd..7167e0be 100644 --- a/plugins/core/core.go +++ b/plugins/core/core.go @@ -1,6 +1,7 @@ package core import ( + "github.com/APIParkLab/APIPark/controller/monitor" "net/http" plugin_cluster "github.com/APIParkLab/APIPark/controller/plugin-cluster" @@ -54,13 +55,14 @@ func (d *Driver) Create() (pm3.IPlugin, error) { } type plugin struct { - clusterController cluster.IClusterController `autowired:""` - certificateController certificate.ICertificateController `autowired:""` - teamManagerController team_manager.ITeamManagerController `autowired:""` - myTeamController my_team.ITeamController `autowired:""` - appController service.IAppController `autowired:""` - serviceController service.IServiceController `autowired:""` - //serviceController service.IServiceController `autowired:""` + clusterController cluster.IClusterController `autowired:""` + certificateController certificate.ICertificateController `autowired:""` + teamManagerController team_manager.ITeamManagerController `autowired:""` + myTeamController my_team.ITeamController `autowired:""` + appController service.IAppController `autowired:""` + serviceController service.IServiceController `autowired:""` + monitorStatisticController monitor.IMonitorStatisticController `autowired:""` + monitorConfigController monitor.IMonitorConfigController `autowired:""` catalogueController catalogue.ICatalogueController `autowired:""` upstreamController upstream.IUpstreamController `autowired:""` apiController api.IAPIController `autowired:""` @@ -88,7 +90,7 @@ func (p *plugin) OnComplete() { p.apis = append(p.apis, p.projectAuthorizationApis()...) p.apis = append(p.apis, p.releaseApis()...) p.apis = append(p.apis, p.DynamicModuleApis()...) - + p.apis = append(p.apis, p.monitorStatisticApis()...) p.apis = append(p.apis, p.PartitionPluginApi()...) p.apis = append(p.apis, p.commonApis()...) } diff --git a/plugins/core/monitor.go b/plugins/core/monitor.go new file mode 100644 index 00000000..067ffe78 --- /dev/null +++ b/plugins/core/monitor.go @@ -0,0 +1,19 @@ +package core + +import ( + "net/http" + + "github.com/eolinker/go-common/pm3" +) + +func (p *plugin) monitorStatisticApis() []pm3.Api { + return []pm3.Api{ + pm3.CreateApiWidthDoc(http.MethodPost, "/api/v1/monitor/overview/top10", []string{"context", "body"}, []string{"top10"}, p.monitorStatisticController.Top10), + pm3.CreateApiWidthDoc(http.MethodPost, "/api/v1/monitor/overview/summary", []string{"context", "body"}, []string{"request_summary", "proxy_summary"}, p.monitorStatisticController.Summary), + pm3.CreateApiWidthDoc(http.MethodPost, "/api/v1/monitor/overview/invoke", []string{"context", "body"}, []string{"date", "request_total", "proxy_total", "status_4xx", "status_5xx", "request_rate", "proxy_rate", "time_interval"}, p.monitorStatisticController.OverviewInvokeTrend), + pm3.CreateApiWidthDoc(http.MethodPost, "/api/v1/monitor/overview/message", []string{"context", "body"}, []string{"date", "request_message", "response_message", "time_interval"}, p.monitorStatisticController.OverviewMessageTrend), + + pm3.CreateApiWidthDoc(http.MethodPost, "/api/v1/partition/monitor", []string{"context", "body"}, []string{"info"}, p.monitorConfigController.SaveMonitorConfig), + pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/partition/monitor", []string{"context"}, []string{"info"}, p.monitorConfigController.GetMonitorConfig), + } +} diff --git a/service/monitor/iml.go b/service/monitor/iml.go new file mode 100644 index 00000000..713bd10d --- /dev/null +++ b/service/monitor/iml.go @@ -0,0 +1,257 @@ +package monitor + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "sort" + "strings" + "time" + + "github.com/eolinker/eosc/log" + + "github.com/eolinker/go-common/cache" + + "github.com/google/uuid" + + "gorm.io/gorm" + + "github.com/eolinker/go-common/utils" + + "github.com/APIParkLab/APIPark/stores/monitor" +) + +var ( + _ IMonitorService = (*imlMonitorService)(nil) +) + +type imlMonitorService struct { + store monitor.IMonitorStore `autowired:""` +} + +func (i *imlMonitorService) DeleteByPartition(ctx context.Context, partitionId string) error { + _, err := i.store.DeleteWhere(ctx, map[string]interface{}{ + "partition": partitionId, + }) + return err +} + +func (i *imlMonitorService) GetByCluster(ctx context.Context, partitionId string) (*Monitor, error) { + info, err := i.store.First(ctx, map[string]interface{}{ + "partition": partitionId, + }) + if err != nil { + return nil, err + } + return &Monitor{ + ID: info.UUID, + Cluster: info.Cluster, + Driver: info.Driver, + Config: info.Config, + Creator: info.Creator, + Updater: info.Updater, + CreateAt: info.CreateAt, + UpdateAt: info.UpdateAt, + }, nil +} + +func (i *imlMonitorService) Get(ctx context.Context, id string) (*Monitor, error) { + info, err := i.store.First(ctx, map[string]interface{}{ + "uuid": id, + }) + if err != nil { + return nil, err + } + return &Monitor{ + ID: info.UUID, + Cluster: info.Cluster, + Driver: info.Driver, + Config: info.Config, + Creator: info.Creator, + Updater: info.Updater, + CreateAt: info.CreateAt, + UpdateAt: info.UpdateAt, + }, nil +} + +func (i *imlMonitorService) MapByCluster(ctx context.Context, partitionIds ...string) (map[string]*Monitor, error) { + if len(partitionIds) == 0 { + return make(map[string]*Monitor), nil + } + list, err := i.store.List(ctx, map[string]interface{}{ + "partition": partitionIds, + }) + if err != nil { + return nil, err + } + return utils.SliceToMapO(list, func(m *monitor.Monitor) (string, *Monitor) { + return m.Cluster, &Monitor{ + ID: m.UUID, + Cluster: m.Cluster, + Driver: m.Driver, + Config: m.Config, + Creator: m.Creator, + Updater: m.Updater, + CreateAt: m.CreateAt, + UpdateAt: m.UpdateAt, + } + }), nil +} + +func (i *imlMonitorService) Save(ctx context.Context, m *SaveMonitor) error { + userId := utils.UserId(ctx) + now := time.Now() + info, err := i.store.First(ctx, map[string]interface{}{ + "partition": m.Cluster, + }) + if err != nil { + if !errors.Is(err, gorm.ErrRecordNotFound) { + return err + } + info = &monitor.Monitor{ + UUID: uuid.New().String(), + Cluster: m.Cluster, + Driver: m.Driver, + Config: m.Config, + Creator: userId, + Updater: userId, + CreateAt: now, + UpdateAt: now, + } + } else { + info.Config = m.Config + info.Updater = userId + info.UpdateAt = now + } + return i.store.Save(ctx, info) +} + +var ( + _ IMonitorStatisticsCache = (*imlMonitorStatisticsCacheService)(nil) +) + +type imlMonitorStatisticsCacheService struct { + commonCache cache.ICommonCache `autowired:""` +} + +func (i *imlMonitorStatisticsCacheService) GetStatisticsCache(ctx context.Context, partitionId string, start, end time.Time, groupBy string, wheres []MonWhereItem, limit int) (map[string]MonCommonData, error) { + key := fmt.Sprintf("monitor:statistics:%s:%d_%d:%s:%s:%d", partitionId, start.Unix(), end.Unix(), groupBy, formatWhereKey(wheres), limit) + + maps, err := i.commonCache.HGetAll(ctx, key) + if err != nil { + log.Errorf("GetStatisticsCache cache.HGetAll key=%s err=%s", key, err.Error()) + return nil, err + } + valMap := make(map[string]MonCommonData) + for k, v := range maps { + commonData := &MonCommonData{} + if err = json.Unmarshal([]byte(v), commonData); err != nil { + log.Errorf("GetStatisticsCache json.Unmarshal err=%s", err.Error()) + return nil, err + } + valMap[k] = *commonData + } + + return valMap, nil +} + +func (i *imlMonitorStatisticsCacheService) SetStatisticsCache(ctx context.Context, partitionId string, start, end time.Time, groupBy string, wheres []MonWhereItem, limit int, values map[string]MonCommonData) error { + key := fmt.Sprintf("monitor:statistics:%s:%d_%d:%s:%s:%d", partitionId, start.Unix(), end.Unix(), groupBy, formatWhereKey(wheres), limit) + + maps := make(map[string][]byte) + for k, data := range values { + bytes, err := json.Marshal(data) + if err != nil { + log.Errorf("SetStatisticsCache json.Marshal key=%s err=%s", key, err.Error()) + return err + } + maps[k] = bytes + } + + return i.commonCache.HMSet(ctx, key, maps, 5*time.Minute) +} + +func (i *imlMonitorStatisticsCacheService) GetTrendCache(ctx context.Context, partitionId string, start, end time.Time, wheres []MonWhereItem) (*MonInvokeCountTrend, error) { + key := fmt.Sprintf("monitor:trend:%s:%d_%d:%s", partitionId, start.Unix(), end.Unix(), formatWhereKey(wheres)) + + bytes, err := i.commonCache.Get(ctx, key) + if err != nil { + log.Errorf("GetTrendCache cache.Get key=%s err=%s", key, err.Error()) + return nil, err + } + + val := new(MonInvokeCountTrend) + + if err = json.Unmarshal(bytes, val); err != nil { + log.Errorf("GetTrendCache json.Unmarshal key=%s bytes=%v err=%s", key, bytes, err.Error()) + return nil, err + } + + return val, nil +} + +func (i *imlMonitorStatisticsCacheService) SetTrendCache(ctx context.Context, partitionId string, start, end time.Time, wheres []MonWhereItem, value *MonInvokeCountTrend) error { + key := fmt.Sprintf("monitor:trend:%s:%d_%d:%s", partitionId, start.Unix(), end.Unix(), formatWhereKey(wheres)) + + bytes, err := json.Marshal(value) + if err != nil { + log.Errorf("SetTrendCache json.Marshal key=%s val=%v err=%s", key, value, err.Error()) + return err + } + + return i.commonCache.Set(ctx, key, bytes, 5*time.Minute) +} + +func (i *imlMonitorStatisticsCacheService) GetMessageTrend(ctx context.Context, partitionId string, start, end time.Time, wheres []MonWhereItem) (*MonMessageTrend, error) { + key := fmt.Sprintf("monitor:message_trend:%s:%d_%d:%s", partitionId, start.Unix(), end.Unix(), formatWhereKey(wheres)) + + bytes, err := i.commonCache.Get(ctx, key) + if err != nil { + log.Errorf("GetMessageTrend cache.Get key=%s err=%s", key, err.Error()) + return nil, err + } + + val := new(MonMessageTrend) + + if err = json.Unmarshal(bytes, val); err != nil { + log.Errorf("GetMessageTrend json.Unmarshal key=%s bytes=%v err=%s", key, bytes, err.Error()) + return nil, err + } + + return val, nil +} + +func (i *imlMonitorStatisticsCacheService) SetMessageTrend(ctx context.Context, partitionId string, start, end time.Time, wheres []MonWhereItem, val *MonMessageTrend) error { + key := fmt.Sprintf("monitor:message_trend:%s:%d_%d:%s", partitionId, start.Unix(), end.Unix(), formatWhereKey(wheres)) + + bytes, err := json.Marshal(val) + if err != nil { + log.Errorf("SetMessageTrend json.Marshal key=%s val=%v err=%s", key, val, err.Error()) + return err + } + + return i.commonCache.Set(ctx, key, bytes, 5*time.Minute) +} + +func formatWhereKey(wheres []MonWhereItem) string { + + whereMap := make(map[string]MonWhereItem) + keys := make([]string, 0, len(wheres)) + for _, where := range wheres { + whereMap[where.Key] = where + keys = append(keys, where.Key) + } + + sort.Strings(keys) + + redisKeys := make([]string, 0) + for _, key := range keys { + if v, ok := whereMap[key]; ok { + sort.Strings(v.Values) + redisKeys = append(redisKeys, fmt.Sprintf("%v", strings.Join(v.Values, "_"))) + } + } + + return strings.Join(redisKeys, ":") +} diff --git a/service/monitor/model.go b/service/monitor/model.go new file mode 100644 index 00000000..6a6081f3 --- /dev/null +++ b/service/monitor/model.go @@ -0,0 +1,159 @@ +package monitor + +import "time" + +type CreateMonitorProxy struct { +} + +type UpdateMonitorProxy struct { +} + +type Monitor struct { + ID string + Cluster string + Driver string + Config string + Creator string + Updater string + CreateAt time.Time + UpdateAt time.Time +} + +type SaveMonitor struct { + Cluster string + Driver string + Config string +} + +type MonSortType string + +type MonCommonInput struct { + StartTime int64 + EndTime int64 + PartitionId string + Path string + Clusters []string + Ip string + Keyword string + ServiceName string + AppId string + ApiId string + ServiceNames []string + AppIds []string + ApiIds []string + PageNum int + PageSize int + Sort MonCommonSort +} + +type MonCommonSort struct { + Key MonSortType + Val bool +} + +// MonCommonStatistics 调用统计 +type MonCommonStatistics struct { + ApiId string //apiID + ApiName string //api名称 + ServiceID string //上游服务ID + ServiceName string //上游服务名称 + AppName string //应用名称 + AppId string //应用ID + Path string //路径 + ProxyPath string //转发路径 + Ip string //IP + Node string //目标节点 + IsRed bool //是否标红 + MonCommonData +} + +type MonPirMapInfo struct { + RequestTotal int `json:"request_total"` //请求总数 + RequestSuccess int `json:"request_success"` //请求成功数 + RequestFail int `json:"request_fail"` //请求失败数 + RequestStatus5XX int `json:"request_status_5_xx"` + RequestStatus4XX int `json:"request_status_4_xx"` + ProxyTotal int `json:"proxy_total"` //转发总数 + ProxySuccess int `json:"proxy_success"` //转发成功数 + ProxyFail int `json:"proxy_fail"` //转发失败数 + ProxyStatus5XX int `json:"proxy_status_5_xx"` + ProxyStatus4XX int `json:"proxy_status_4_xx"` +} + +type MonInvokeCountTrend struct { + Date []time.Time `json:"date"` + Status5XX []int64 `json:"status_5_xx"` + Status4XX []int64 `json:"status_4_xx"` + ProxyRate []float64 `json:"proxy_rate"` + ProxyTotal []int64 `json:"proxy_total"` + RequestRate []float64 `json:"request_rate"` + RequestTotal []int64 `json:"request_total"` +} + +// MonCommonData 通用字段 +type MonCommonData struct { + ID string `json:"id"` + RequestTotal int64 `json:"request_total"` //请求总数 + RequestSuccess int64 `json:"request_success"` //请求成功数 + RequestRate float64 `json:"request_rate"` //请求成功率 + ProxyTotal int64 `json:"proxy_total"` //转发总数 + ProxySuccess int64 `json:"proxy_success"` //转发成功数 + ProxyRate float64 `json:"proxy_rate"` //转发成功率 + StatusFail int64 `json:"status_fail"` //失败状态数 + AvgResp float64 `json:"avg_resp"` //平均响应时间 + MaxResp int64 `json:"max_resp"` //最大响应时间 + MinResp int64 `json:"min_resp"` //最小响应时间 + AvgTraffic float64 `json:"avg_traffic"` //平均流量 + MaxTraffic int64 `json:"max_traffic"` //最大流量 + MinTraffic int64 `json:"min_traffic"` //最小流量 +} + +type MonProxyData struct { + ProxyTotal int64 `json:"proxy_total"` //转发总数 + ProxySuccess int64 `json:"proxy_success"` //转发成功数 + ProxyRate float64 `json:"proxy_rate"` //转发成功率 + StatusFail int64 `json:"status_fail"` //失败状态数 + AvgResp float64 `json:"avg_resp"` //平均响应时间 + MaxResp int64 `json:"max_resp"` //最大响应时间 + MinResp int64 `json:"min_resp"` //最小响应时间 + AvgTraffic float64 `json:"avg_traffic"` //平均流量 + MaxTraffic int64 `json:"max_traffic"` //最大流量 + MinTraffic int64 `json:"min_traffic"` //最小流量 +} + +type MonMessageTrend struct { + Dates []time.Time `json:"dates"` + ReqMessage []float64 `json:"req_message"` + RespMessage []float64 `json:"resp_message"` +} + +type Summary struct { + Total int64 `json:"total"` + Success int64 `json:"success"` + Fail int64 `json:"fail"` + Status4Xx int64 `json:"status_4xx"` + Status5Xx int64 `json:"status_5xx"` +} + +type MonWhereItem struct { + Key string + Operation string // 表达式,默认为 =,多个为 in,可以用其他 + Values []string +} +type MonSortBy struct { + Key string + Desc bool +} +type MonStatisticsValue struct { + MonCommonData +} +type MonTrendFilter struct { + Name string + MonWhereItem +} + +type MonTrendValues struct { + Data []string + Names []string + Values [][]interface{} +} diff --git a/service/monitor/service.go b/service/monitor/service.go new file mode 100644 index 00000000..ed8f8566 --- /dev/null +++ b/service/monitor/service.go @@ -0,0 +1,53 @@ +package monitor + +import ( + "context" + "reflect" + "time" + + "github.com/eolinker/go-common/autowire" +) + +type IMonitorService interface { + // Get 获取监控配置 + Get(ctx context.Context, id string) (*Monitor, error) + // MapByCluster 获取监控配置 + MapByCluster(ctx context.Context, clusterIds ...string) (map[string]*Monitor, error) + GetByCluster(ctx context.Context, clusterId string) (*Monitor, error) + // Save 保存监控配置 + Save(ctx context.Context, monitor *SaveMonitor) error +} + +func init() { + autowire.Auto[IMonitorService](func() reflect.Value { + return reflect.ValueOf(new(imlMonitorService)) + }) + + autowire.Auto[IMonitorStatisticsCache](func() reflect.Value { + return reflect.ValueOf(new(imlMonitorStatisticsCacheService)) + }) +} + +type IMonitorStatisticsCache interface { + GetStatisticsCache(ctx context.Context, partitionId string, start, end time.Time, groupBy string, wheres []MonWhereItem, limit int) (map[string]MonCommonData, error) + SetStatisticsCache(ctx context.Context, partitionId string, start, end time.Time, groupBy string, wheres []MonWhereItem, limit int, values map[string]MonCommonData) error + + GetTrendCache(ctx context.Context, partitionId string, start, end time.Time, wheres []MonWhereItem) (*MonInvokeCountTrend, error) + SetTrendCache(ctx context.Context, partitionId string, start, end time.Time, wheres []MonWhereItem, value *MonInvokeCountTrend) error + + //GetCircularMap(ctx context.Context, partitionId string, start, end time.Time, wheres []MonWhereItem) (request, proxy *CircularDate, err error) + //SetCircularMap(ctx context.Context, partitionId string, start, end time.Time, wheres []MonWhereItem, request, proxy *CircularDate) error + + GetMessageTrend(ctx context.Context, partitionId string, start, end time.Time, wheres []MonWhereItem) (*MonMessageTrend, error) + SetMessageTrend(ctx context.Context, partitionId string, start, end time.Time, wheres []MonWhereItem, val *MonMessageTrend) error +} + +// IMonitorSourceDriver 监控数据源驱动 +type IMonitorSourceDriver interface { + CheckInput(config []byte) ([]byte, error) +} + +type IMonitorSourceManager interface { + //driver.IDriverManager[IMonitorSourceDriver] + List() []string +} diff --git a/service/service/iml.go b/service/service/iml.go index f0a79ea4..cbaf1acc 100644 --- a/service/service/iml.go +++ b/service/service/iml.go @@ -4,11 +4,11 @@ import ( "context" "fmt" "time" - + "github.com/eolinker/go-common/utils" - + "github.com/eolinker/go-common/auto" - + "github.com/APIParkLab/APIPark/service/universally" "github.com/APIParkLab/APIPark/stores/service" ) @@ -25,6 +25,19 @@ type imlServiceService struct { universally.IServiceEdit[Edit] } +func (i *imlServiceService) ServiceList(ctx context.Context, serviceIds ...string) ([]*Service, error) { + w := make(map[string]interface{}) + if len(serviceIds) > 0 { + w["uuid"] = serviceIds + } + w["as_server"] = true + list, err := i.store.List(ctx, w) + if err != nil { + return nil, err + } + return utils.SliceToSlice(list, FromEntity), nil +} + func (i *imlServiceService) SearchPublicServices(ctx context.Context, keyword string) ([]*Service, error) { w := map[string]interface{}{ "as_server": true, @@ -102,11 +115,11 @@ func (i *imlServiceService) GetLabels(ctx context.Context, ids ...string) map[st func (i *imlServiceService) OnComplete() { i.IServiceGet = universally.NewGetSoftDelete[Service, service.Service](i.store, FromEntity) - + i.IServiceDelete = universally.NewSoftDelete[service.Service](i.store) - + i.IServiceCreate = universally.NewCreatorSoftDelete[Create, service.Service](i.store, "service", createEntityHandler, uniquestHandler, labelHandler) - + i.IServiceEdit = universally.NewEdit[Edit, service.Service](i.store, updateHandler, labelHandler) auto.RegisterService("service", i) } @@ -151,5 +164,5 @@ func updateHandler(e *service.Service, i *Edit) { if i.Logo != nil { e.Logo = *i.Logo } - + } diff --git a/service/service/service.go b/service/service/service.go index 2373ba12..1f7dae45 100644 --- a/service/service/service.go +++ b/service/service/service.go @@ -3,7 +3,7 @@ package service import ( "context" "reflect" - + "github.com/APIParkLab/APIPark/service/universally" "github.com/eolinker/go-common/autowire" ) @@ -17,6 +17,7 @@ type IServiceService interface { AppCountByTeam(ctx context.Context, teamId ...string) (map[string]int64, error) SearchPublicServices(ctx context.Context, keyword string) ([]*Service, error) Check(ctx context.Context, id string, rule map[string]bool) (*Service, error) + ServiceList(ctx context.Context, serviceIds ...string) ([]*Service, error) AppList(ctx context.Context, appIds ...string) ([]*Service, error) } diff --git a/service/service_diff/diff.go b/service/service_diff/diff.go index 4fd12621..1ecd828d 100644 --- a/service/service_diff/diff.go +++ b/service/service_diff/diff.go @@ -32,7 +32,7 @@ type UpstreamConfig struct { } type UpstreamDiff struct { Upstream string `json:"upstream,omitempty" ` - //Partition string `json:"partition,omitempty"` + //Cluster string `json:"partition,omitempty"` Data *upstream.Config `json:"data,omitempty"` Change ChangeType `json:"change,omitempty"` Status StatusType `json:"status,omitempty"` diff --git a/stores/monitor/model.go b/stores/monitor/model.go new file mode 100644 index 00000000..0ae09b10 --- /dev/null +++ b/stores/monitor/model.go @@ -0,0 +1,23 @@ +package monitor + +import "time" + +type Monitor struct { + Id int64 `gorm:"column:id;type:BIGINT(20);AUTO_INCREMENT;NOT NULL;comment:id;primary_key;comment:主键ID;"` + UUID string `gorm:"type:varchar(36);not null;column:uuid;uniqueIndex:uuid;comment:UUID;"` + Cluster string `gorm:"column:cluster;type:varchar(36);NOT NULL;comment:集群ID"` + Driver string `gorm:"column:driver;type:VARCHAR(36);NOT NULL;comment:驱动"` + Config string `gorm:"column:config;type:TEXT;NOT NULL;comment:配置"` + Creator string `gorm:"type:varchar(36);not null;column:creator;comment:creator" aovalue:"creator"` + Updater string `gorm:"type:varchar(36);not null;column:updater;comment:updater" aovalue:"updater"` + CreateAt time.Time `gorm:"type:timestamp;NOT NULL;DEFAULT:CURRENT_TIMESTAMP;column:create_at;comment:创建时间"` + UpdateAt time.Time `gorm:"type:timestamp;NOT NULL;DEFAULT:CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;column:update_at;comment:修改时间" json:"update_at"` +} + +func (c *Monitor) IdValue() int64 { + return c.Id +} + +func (c *Monitor) TableName() string { + return "monitor" +} diff --git a/stores/monitor/store.go b/stores/monitor/store.go new file mode 100644 index 00000000..e6faddc8 --- /dev/null +++ b/stores/monitor/store.go @@ -0,0 +1,22 @@ +package monitor + +import ( + "reflect" + + "github.com/eolinker/go-common/autowire" + "github.com/eolinker/go-common/store" +) + +type IMonitorStore interface { + store.IBaseStore[Monitor] +} + +type storeMonitor struct { + store.Store[Monitor] +} + +func init() { + autowire.Auto[IMonitorStore](func() reflect.Value { + return reflect.ValueOf(new(storeMonitor)) + }) +}