From 2195ff900f27e0b76f65d984f5ac4a502276cbda Mon Sep 17 00:00:00 2001 From: Liujian <824010343@qq.com> Date: Thu, 5 Dec 2024 14:39:57 +0800 Subject: [PATCH] data mask log commit --- controller/log/controller.go | 21 ++ controller/log/iml.go | 19 ++ controller/strategy/iml.go | 44 +++ controller/strategy/strategy.go | 3 + gateway/apinto/client.go | 8 +- gateway/apinto/plugin/apinto_plugin.yml | 7 + gateway/apinto/strategy.go | 52 ++++ gateway/client.go | 1 + gateway/profession.go | 1 + gateway/resource.go | 22 +- go.mod | 6 +- go.sum | 4 +- log-driver/driver.go | 41 +++ log-driver/entity.go | 25 ++ log-driver/factory.go | 39 +++ log-driver/loki/entity.go | 72 +++++ log-driver/loki/loki.go | 273 ++++++++++++++++++ log-driver/loki/loki_test.go | 95 ++++++ module/log/dto/input.go | 7 + module/log/dto/output.go | 14 + module/log/iml.go | 62 ++++ module/log/module.go | 21 ++ module/publish/iml.go | 221 ++++++-------- module/release/iml.go | 14 +- module/service-diff/iml.go | 14 +- module/service-diff/out.go | 2 +- module/strategy/driver/data-masking/driver.go | 29 +- module/strategy/driver/driver.go | 4 +- module/strategy/dto/output.go | 26 +- module/strategy/iml.go | 119 +++++++- module/strategy/module.go | 4 + plugins/core/core.go | 4 + plugins/core/log.go | 14 + plugins/core/strategy.go | 2 + resources/plugin/plugin.yml | 8 +- ..._com_apinto_strategy-plugin-data_mask.json | 1 + resources/plugin/render/render.init.sh | 1 + service/cluster/cluster.go | 9 +- service/cluster/model.go | 4 +- service/log/iml.go | 156 ++++++++++ service/log/model.go | 57 ++++ service/log/service.go | 25 ++ service/strategy/iml.go | 15 +- service/strategy/model.go | 3 +- service/strategy/service.go | 10 +- stores/cluster/model.go | 8 +- stores/log-source/model.go | 23 ++ stores/log-source/store.go | 22 ++ 48 files changed, 1442 insertions(+), 190 deletions(-) create mode 100644 controller/log/controller.go create mode 100644 controller/log/iml.go create mode 100644 gateway/apinto/strategy.go create mode 100644 log-driver/driver.go create mode 100644 log-driver/entity.go create mode 100644 log-driver/factory.go create mode 100644 log-driver/loki/entity.go create mode 100644 log-driver/loki/loki.go create mode 100644 log-driver/loki/loki_test.go create mode 100644 module/log/dto/input.go create mode 100644 module/log/dto/output.go create mode 100644 module/log/iml.go create mode 100644 module/log/module.go create mode 100644 plugins/core/log.go create mode 100644 resources/plugin/render/eolinker_com_apinto_strategy-plugin-data_mask.json create mode 100644 service/log/iml.go create mode 100644 service/log/model.go create mode 100644 service/log/service.go create mode 100644 stores/log-source/model.go create mode 100644 stores/log-source/store.go diff --git a/controller/log/controller.go b/controller/log/controller.go new file mode 100644 index 00000000..1c6b68c7 --- /dev/null +++ b/controller/log/controller.go @@ -0,0 +1,21 @@ +package log + +import ( + "reflect" + + log_dto "github.com/APIParkLab/APIPark/module/log/dto" + "github.com/eolinker/go-common/autowire" + "github.com/gin-gonic/gin" +) + +type ILogController interface { + Save(ctx *gin.Context, driver string, input *log_dto.Save) error + Get(ctx *gin.Context, driver string) (*log_dto.LogSource, error) +} + +func init() { + logController := &imlLogController{} + autowire.Auto[ILogController](func() reflect.Value { + return reflect.ValueOf(logController) + }) +} diff --git a/controller/log/iml.go b/controller/log/iml.go new file mode 100644 index 00000000..4aa1b9ea --- /dev/null +++ b/controller/log/iml.go @@ -0,0 +1,19 @@ +package log + +import ( + "github.com/APIParkLab/APIPark/module/log" + log_dto "github.com/APIParkLab/APIPark/module/log/dto" + "github.com/gin-gonic/gin" +) + +type imlLogController struct { + module log.ILogModule `autowired:""` +} + +func (c *imlLogController) Save(ctx *gin.Context, driver string, input *log_dto.Save) error { + return c.module.Save(ctx, driver, input) +} + +func (c *imlLogController) Get(ctx *gin.Context, driver string) (*log_dto.LogSource, error) { + return c.module.Get(ctx, driver) +} diff --git a/controller/strategy/iml.go b/controller/strategy/iml.go index 6c35e2da..766af947 100644 --- a/controller/strategy/iml.go +++ b/controller/strategy/iml.go @@ -211,3 +211,47 @@ func (i *imlStrategyController) DisableStrategy(ctx *gin.Context, id string) err func (i *imlStrategyController) DeleteStrategy(ctx *gin.Context, id string) error { return i.strategyModule.Delete(ctx, id) } + +func (i *imlStrategyController) GetStrategyLogs(ctx *gin.Context, keyword string, strategyId string, start string, end string, limit string, offset string) ([]*strategy_dto.LogItem, int64, error) { + now := time.Now() + s, err := time.ParseInLocation("2006-01-02 15:04:05", start, time.Local) + if err != nil { + if start == "" { + s = now.Add(-time.Hour * 24 * 30) + } else { + return nil, 0, fmt.Errorf("start time error: %s", err) + } + } + e, err := time.ParseInLocation("2006-01-02 15:04:05", end, time.Local) + if err != nil { + if end == "" { + e = now + } else { + return nil, 0, fmt.Errorf("end time error: %s", err) + } + } + if s.After(e) { + return nil, 0, fmt.Errorf("start time must be less than end time") + } + l, err := strconv.ParseInt(limit, 10, 64) + if err != nil && limit != "" { + + return nil, 0, err + } + o, err := strconv.ParseInt(offset, 10, 64) + if err != nil && offset != "" { + return nil, 0, err + } + if l < 1 { + l = 15 + } + if o < 1 { + o = 1 + } + return i.strategyModule.GetStrategyLogs(ctx, keyword, strategyId, s, e, l, o) +} + +func (i *imlStrategyController) LogInfo(ctx *gin.Context, id string) (*strategy_dto.LogInfo, error) { + + return i.strategyModule.StrategyLogInfo(ctx, id) +} diff --git a/controller/strategy/strategy.go b/controller/strategy/strategy.go index ad0f4fac..cfee47ff 100644 --- a/controller/strategy/strategy.go +++ b/controller/strategy/strategy.go @@ -33,6 +33,9 @@ type IStrategyController interface { FilterServiceRemote(ctx *gin.Context, serviceId string, name string) ([]*strategy_dto.Title, []any, int64, string, string, error) ToPublish(ctx *gin.Context, driver string) ([]*strategy_dto.ToPublishItem, string, string, bool, error) + + GetStrategyLogs(ctx *gin.Context, keyword string, strategyId string, start string, end string, limit string, offset string) ([]*strategy_dto.LogItem, int64, error) + LogInfo(ctx *gin.Context, id string) (*strategy_dto.LogInfo, error) } type IStrategyCommonController interface { diff --git a/gateway/apinto/client.go b/gateway/apinto/client.go index 9c292ecb..4c531891 100644 --- a/gateway/apinto/client.go +++ b/gateway/apinto/client.go @@ -3,7 +3,7 @@ package apinto import ( "context" "strings" - + "github.com/APIParkLab/APIPark/gateway" admin_client "github.com/eolinker/eosc/process-admin/client" ) @@ -14,6 +14,10 @@ type ClientDriver struct { client admin_client.Client } +func (c *ClientDriver) Strategy() gateway.IStrategyClient { + return NewStrategyClient(c.client) +} + func (c *ClientDriver) Close(ctx context.Context) error { if c.client != nil { return c.client.Close() @@ -74,7 +78,7 @@ func NewClientDriver(cfg *gateway.ClientConfig) (*ClientDriver, error) { } func genWorkerID(id string, profession string) string { - + suffix := "@" + profession if strings.HasSuffix(id, suffix) { return id diff --git a/gateway/apinto/plugin/apinto_plugin.yml b/gateway/apinto/plugin/apinto_plugin.yml index 629bddb5..4bd3765a 100644 --- a/gateway/apinto/plugin/apinto_plugin.yml +++ b/gateway/apinto/plugin/apinto_plugin.yml @@ -63,6 +63,13 @@ rely: eolinker.com:apinto:plugin_app config: cache: redis@output + +- id: eolinker.com:apinto:strategy-plugin-data_mask + name: strategy_data_mask + status: global + rely: eolinker.com:apinto:plugin_app + config: + cache: redis@output - id: eolinker.com:apinto:ai_prompt name: ai_prompt diff --git a/gateway/apinto/strategy.go b/gateway/apinto/strategy.go new file mode 100644 index 00000000..8c4ccecc --- /dev/null +++ b/gateway/apinto/strategy.go @@ -0,0 +1,52 @@ +package apinto + +import ( + "context" + + "github.com/eolinker/eosc" + + "github.com/APIParkLab/APIPark/gateway" + admin_client "github.com/eolinker/eosc/process-admin/client" +) + +var _ gateway.IStrategyClient = &StrategyClient{} + +type StrategyClient struct { + client admin_client.Client +} + +func (s *StrategyClient) Online(ctx context.Context, resources ...*eosc.Base[gateway.StrategyRelease]) error { + s.client.Begin(ctx) + for _, r := range resources { + if r.Config.IsDelete { + err := s.client.Del(ctx, genWorkerID(r.Config.Name, gateway.ProfessionStrategy)) + if err != nil { + s.client.Rollback(ctx) + return err + } + continue + } + err := s.client.Set(ctx, genWorkerID(r.Config.Name, gateway.ProfessionStrategy), r) + if err != nil { + s.client.Rollback(ctx) + return err + } + } + return s.client.Commit(ctx) +} + +func (s *StrategyClient) Offline(ctx context.Context, resources ...*eosc.Base[gateway.StrategyRelease]) error { + s.client.Begin(ctx) + for _, r := range resources { + err := s.client.Del(ctx, genWorkerID(r.Config.Name, gateway.ProfessionStrategy)) + if err != nil { + s.client.Rollback(ctx) + return err + } + } + return s.client.Commit(ctx) +} + +func NewStrategyClient(client admin_client.Client) *StrategyClient { + return &StrategyClient{client: client} +} diff --git a/gateway/client.go b/gateway/client.go index 56b17cdc..293ffa2b 100644 --- a/gateway/client.go +++ b/gateway/client.go @@ -21,6 +21,7 @@ type IClientDriver interface { Application() IApplicationClient Service() IServiceClient Subscribe() ISubscribeClient + Strategy() IStrategyClient Dynamic(resource string) (IDynamicClient, error) PluginSetting() IPluginSetting Commit(ctx context.Context) error diff --git a/gateway/profession.go b/gateway/profession.go index 1d83480e..3596f388 100644 --- a/gateway/profession.go +++ b/gateway/profession.go @@ -5,6 +5,7 @@ const ( ProfessionCertificate = "certificate" ProfessionRouter = "router" ProfessionApplication = "app" + ProfessionStrategy = "strategy" ProfessionService = "service" ProfessionAIProvider = "ai-provider" ) diff --git a/gateway/resource.go b/gateway/resource.go index a1fd9a0c..85931ead 100644 --- a/gateway/resource.go +++ b/gateway/resource.go @@ -4,6 +4,8 @@ import ( "context" "encoding/json" + "github.com/eolinker/eosc" + "github.com/APIParkLab/APIPark/model/plugin_model" ) @@ -15,6 +17,8 @@ type IServiceClient IResourceClient[ServiceRelease] type ISubscribeClient IResourceClient[SubscribeRelease] +type IStrategyClient IResourceClient[eosc.Base[StrategyRelease]] + type IResourceClient[T any] interface { Online(ctx context.Context, resources ...*T) error Offline(ctx context.Context, resources ...*T) error @@ -27,10 +31,11 @@ type IDynamicClient interface { } type ProjectRelease struct { - Id string `json:"id"` - Version string `json:"version"` - Apis []*ApiRelease `json:"apis"` - Upstream *UpstreamRelease `json:"upstreams"` + Id string `json:"id"` + Version string `json:"version"` + Apis []*ApiRelease `json:"apis"` + Upstream *UpstreamRelease `json:"upstreams"` + Strategies []*eosc.Base[StrategyRelease] `json:"strategies"` } type ApiRelease struct { @@ -73,6 +78,15 @@ type UpstreamRelease struct { Labels map[string]string } +type StrategyRelease struct { + Name string `json:"name"` + Desc string `json:"description"` + Driver string `json:"driver"` + Priority int `json:"priority"` + Filters map[string][]string `json:"filters"` + IsDelete bool `json:"-"` +} + type MatchRule struct { Position string MatchType string diff --git a/go.mod b/go.mod index 79c4402d..6b967907 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ go 1.21 require ( github.com/eolinker/ap-account v1.0.15 - github.com/eolinker/eosc v0.17.3 + github.com/eolinker/eosc v0.18.2 github.com/eolinker/go-common v1.1.1 github.com/gabriel-vasile/mimetype v1.4.4 github.com/getkin/kin-openapi v0.127.0 @@ -75,6 +75,10 @@ require ( gorm.io/driver/mysql v1.5.2 // indirect ) +replace ( + github.com/eolinker/eosc => ../../eolinker/eosc +) + //replace github.com/eolinker/ap-account => ../aoaccount // //replace github.com/eolinker/go-common => ../go-common diff --git a/go.sum b/go.sum index 525ea006..6344a0ec 100644 --- a/go.sum +++ b/go.sum @@ -30,8 +30,8 @@ github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4 github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/eolinker/ap-account v1.0.15 h1:n6DJeL6RHZ8eLlZUcY2U3H4d/GPaA5oelAx3R0E6yL8= github.com/eolinker/ap-account v1.0.15/go.mod h1:zm/Ivs6waJ/M/nEszhpPmM6g50y/MKO+5eABFAdeD0g= -github.com/eolinker/eosc v0.17.3 h1:sr2yT+v/AsqEdciRaaZZj0zL9pTufR5RvDW6+65hraQ= -github.com/eolinker/eosc v0.17.3/go.mod h1:xgq816hpanlMXFtZw7Ztdctb1eEk9UPHchY4NfFO6Cw= +github.com/eolinker/eosc v0.18.2 h1:fpPCS3jLzDEjMfYasIVr8rdjiy4yF7tohm5EEYF0TRw= +github.com/eolinker/eosc v0.18.2/go.mod h1:O9PQQXFCpB6fjHf+oFt/LN6EOAv779ItbMixMKCfTfk= github.com/eolinker/go-common v1.1.1 h1:3WqqecGqcHDgpa8Ljp156c1uWeZKP1CKScdU+6sOfcc= github.com/eolinker/go-common v1.1.1/go.mod h1:Kb/jENMN1mApnodvRgV4YwO9FJby1Jkt2EUjrBjvSX4= github.com/gabriel-vasile/mimetype v1.4.4 h1:QjV6pZ7/XZ7ryI2KuyeEDE8wnh7fHP9YnQy+R0LnH8I= diff --git a/log-driver/driver.go b/log-driver/driver.go new file mode 100644 index 00000000..f3f35329 --- /dev/null +++ b/log-driver/driver.go @@ -0,0 +1,41 @@ +package log_driver + +import ( + "time" + + "github.com/eolinker/eosc" +) + +type ILogDriver interface { + LogInfo(clusterId string, id string) (*LogInfo, error) + LogCount(clusterId string, conditions map[string]string, spendHour int64, group string) (map[string]int64, error) + Logs(clusterId string, conditions map[string]string, start time.Time, end time.Time, limit int64, offset int64) ([]*Log, int64, error) +} + +var ( + driverManager = NewDriverManager() +) + +type DriverManager struct { + drivers eosc.Untyped[string, ILogDriver] +} + +func NewDriverManager() *DriverManager { + return &DriverManager{drivers: eosc.BuildUntyped[string, ILogDriver]()} +} + +func (m *DriverManager) Set(name string, driver ILogDriver) { + m.drivers.Set(name, driver) +} + +func (m *DriverManager) Get(name string) (ILogDriver, bool) { + return m.drivers.Get(name) +} + +func SetDriver(name string, driver ILogDriver) { + driverManager.Set(name, driver) +} + +func GetDriver(name string) (ILogDriver, bool) { + return driverManager.Get(name) +} diff --git a/log-driver/entity.go b/log-driver/entity.go new file mode 100644 index 00000000..cad3edd0 --- /dev/null +++ b/log-driver/entity.go @@ -0,0 +1,25 @@ +package log_driver + +import ( + "time" +) + +type Log struct { + ID string + Service string + Method string + Url string + RemoteIP string + Consumer string + Authorization string + RecordTime time.Time +} + +type LogInfo struct { + ID string + ContentType string + RequestBody string + ProxyBody string + ProxyResponseBody string + ResponseBody string +} diff --git a/log-driver/factory.go b/log-driver/factory.go new file mode 100644 index 00000000..49bc3855 --- /dev/null +++ b/log-driver/factory.go @@ -0,0 +1,39 @@ +package log_driver + +import "github.com/eolinker/eosc" + +var ( + defaultFactoryManager = NewFactoryManager() +) + +type IFactory interface { + Create(config string) (ILogDriver, error) +} + +type factoryManager struct { + factories eosc.Untyped[string, IFactory] +} + +func NewFactoryManager() *factoryManager { + return &factoryManager{factories: eosc.BuildUntyped[string, IFactory]()} +} + +func (m *factoryManager) Set(name string, factory IFactory) { + m.factories.Set(name, factory) +} + +func (m *factoryManager) Get(name string) (IFactory, bool) { + return m.factories.Get(name) +} + +func RegisterFactory(name string, factory IFactory) { + defaultFactoryManager.Set(name, factory) +} + +func GetFactory(name string) (IFactory, bool) { + return defaultFactoryManager.Get(name) +} + +func Drivers() []string { + return defaultFactoryManager.factories.Keys() +} diff --git a/log-driver/loki/entity.go b/log-driver/loki/entity.go new file mode 100644 index 00000000..9930a170 --- /dev/null +++ b/log-driver/loki/entity.go @@ -0,0 +1,72 @@ +package loki + +import ( + "fmt" + "net/url" +) + +type DriverConfig struct { + URL string `json:"url"` + Header map[string]string `json:"headers"` +} + +func (d *DriverConfig) Check() error { + if d.URL == "" { + return fmt.Errorf("url is empty") + } + u, err := url.Parse(d.URL) + if err != nil { + return err + } + if u.Host == "" { + return fmt.Errorf("host is empty") + } + if u.Scheme == "" { + u.Scheme = "http" + } + d.URL = fmt.Sprintf("%s://%s", u.Scheme, u.Host) + return nil +} + +type Response[T any] struct { + Data *Data[T] `json:"data"` + Status string `json:"status"` +} + +type Data[T any] struct { + ResultType string `json:"resultType"` + Result []*T `json:"result"` +} + +type LogCount struct { + Metric map[string]string `json:"metric"` + Value []interface{} `json:"value"` +} + +type LogInfo struct { + Stream *LogDetail `json:"stream"` +} + +type LogDetail struct { + Api string `json:"api"` + Application string `json:"application"` + BlockName string `json:"block_name"` + ContentType string `json:"content_type"` + Cluster string `json:"cluster"` + Msec string `json:"msec"` + Node string `json:"node"` + RequestId string `json:"request_id"` + RequestMethod string `json:"request_method"` + RequestScheme string `json:"request_scheme"` + RequestTime string `json:"request_time"` + RequestUri string `json:"request_uri"` + RequestBody string `json:"request_body"` + ProxyBody string `json:"proxy_body"` + ResponseBody string `json:"response_body"` + ProxyResponseBody string `json:"proxy_response_body"` + Service string `json:"service"` + Provider string `json:"provider"` + Auth string `json:"auth"` + SrcIp string `json:"src_ip"` + Status string `json:"status"` +} diff --git a/log-driver/loki/loki.go b/log-driver/loki/loki.go new file mode 100644 index 00000000..d3db0865 --- /dev/null +++ b/log-driver/loki/loki.go @@ -0,0 +1,273 @@ +package loki + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "strconv" + "strings" + "time" + + log_driver "github.com/APIParkLab/APIPark/log-driver" +) + +func init() { + log_driver.RegisterFactory("loki", &factory{}) +} + +type factory struct { +} + +func (f *factory) Create(config string) (log_driver.ILogDriver, error) { + + return NewDriver(config) +} + +var ( + client = http.Client{} +) + +type Driver struct { + url string + headers map[string]string +} + +func NewDriver(config string) (*Driver, error) { + cfg := new(DriverConfig) + err := json.Unmarshal([]byte(config), cfg) + if err != nil { + return nil, err + } + err = cfg.Check() + if err != nil { + return nil, err + } + return &Driver{ + url: cfg.URL, + headers: cfg.Header, + }, nil +} + +func (d *Driver) LogInfo(clusterId string, id string) (*log_driver.LogInfo, error) { + if id == "" { + return nil, fmt.Errorf("id is empty") + } + queries := url.Values{} + queries.Set("query", fmt.Sprintf("{cluster=\"%s\"} | json | request_id = `%s`", clusterId, id)) + now := time.Now() + start := now.Add(-time.Hour * 24 * 30) + queries.Set("start", strconv.FormatInt(start.UnixNano(), 10)) + queries.Set("end", strconv.FormatInt(now.UnixNano(), 10)) + queries.Set("limit", "1") + list, err := send[LogInfo](http.MethodGet, fmt.Sprintf("%s/loki/api/v1/query_range", d.url), d.headers, queries, "") + if err != nil { + return nil, err + } + if len(list) < 1 || list[0].Stream == nil { + return nil, fmt.Errorf("no log found") + } + stream := list[0].Stream + return &log_driver.LogInfo{ + ID: stream.RequestId, + ContentType: stream.ContentType, + RequestBody: stream.RequestBody, + ProxyBody: stream.ProxyBody, + ProxyResponseBody: stream.ProxyResponseBody, + ResponseBody: stream.ResponseBody, + }, nil +} + +func (d *Driver) LogCount(clusterId string, conditions map[string]string, spendHour int64, group string) (map[string]int64, error) { + + cs := make([]string, 0, len(conditions)) + for k, v := range conditions { + if strings.HasPrefix(k, "#") { + cs = append(cs, v) + continue + } + cs = append(cs, fmt.Sprintf("%s=\"%s\"", k, v)) + } + tmpCondition := "" + if len(conditions) > 0 { + tmpCondition = "|" + strings.Join(cs, "|") + } + queries := url.Values{} + queries.Set("query", fmt.Sprintf("sum(count_over_time({cluster=\"%s\"} %s [%dh])) by (%s)", clusterId, tmpCondition, spendHour, group)) + list, err := send[LogCount](http.MethodGet, fmt.Sprintf("%s/loki/api/v1/query", d.url), d.headers, queries, "") + if err != nil { + return nil, err + } + result := make(map[string]int64) + for _, l := range list { + if len(l.Value) != 2 { + continue + } + value, ok := l.Value[1].(string) + if !ok { + continue + } + v, err := strconv.ParseInt(value, 10, 64) + if err != nil { + continue + } + result[l.Metric[group]] = v + } + return result, nil +} + +func (d *Driver) Logs(clusterId string, conditions map[string]string, start time.Time, end time.Time, limit int64, offset int64) ([]*log_driver.Log, int64, error) { + if start.After(end) { + return nil, 0, fmt.Errorf("start time is greater than end time") + } + if len(conditions) < 1 { + return nil, 0, fmt.Errorf("conditions is empty") + } + if offset < 1 { + offset = 1 + } + if limit < 1 { + limit = 15 + } + count, err := d.logCount(clusterId, conditions, start, end) + if err != nil { + return nil, 0, err + } + if count == 0 { + return nil, 0, nil + } + if count < (offset-1)*limit { + return nil, 0, fmt.Errorf("offset is greater than count") + } + cs := make([]string, 0, len(conditions)) + for k, v := range conditions { + if strings.HasPrefix(v, "#") { + cs = append(cs, v) + continue + } + cs = append(cs, fmt.Sprintf("%s=~\"%s\"", k, v)) + } + queries := url.Values{} + queries.Set("query", fmt.Sprintf("{cluster=\"%s\"} | json | %s", clusterId, strings.Join(cs, " | "))) + queries.Set("limit", strconv.FormatInt(limit, 10)) + queries.Set("start", strconv.FormatInt(start.UnixNano(), 10)) + logs, err := d.recuseLogs(queries, end, offset) + if err != nil { + return nil, 0, err + } + + return logs, count, nil +} + +func (d *Driver) recuseLogs(queries url.Values, end time.Time, offset int64) ([]*log_driver.Log, error) { + queries.Set("end", strconv.FormatInt(end.UnixNano(), 10)) + list, err := send[LogInfo](http.MethodGet, fmt.Sprintf("%s/loki/api/v1/query_range", d.url), d.headers, queries, "") + if err != nil { + return nil, err + } + if len(list) < 1 { + return nil, nil + } + if offset > 1 { + // 获取list最后一个元素的时间戳 + last := list[len(list)-1].Stream + if last == nil { + return nil, fmt.Errorf("last log is empty") + } + msec, err := strconv.ParseInt(last.Msec, 10, 64) + if err != nil { + return nil, fmt.Errorf("parse last log time error: %v", err) + } + return d.recuseLogs(queries, time.UnixMilli(msec), offset-1) + } + logs := make([]*log_driver.Log, 0, len(list)) + for _, l := range list { + if l.Stream == nil { + continue + } + detail := l.Stream + msec, _ := strconv.ParseInt(detail.Msec, 10, 64) + + logs = append(logs, &log_driver.Log{ + ID: detail.RequestId, + Service: detail.Provider, + Method: detail.RequestMethod, + Url: detail.RequestUri, + RemoteIP: detail.SrcIp, + Consumer: detail.Application, + Authorization: detail.Auth, + RecordTime: time.UnixMilli(msec), + }) + } + return logs, nil +} + +func (d *Driver) logCount(clusterId string, conditions map[string]string, start time.Time, end time.Time) (int64, error) { + // 先查在这段时间内符合条件的日志数量 + queries := url.Values{} + queries.Add("start", strconv.FormatInt(start.UnixNano(), 10)) + queries.Add("end", strconv.FormatInt(end.UnixNano(), 10)) + cs := make([]string, 0, len(conditions)) + for k, v := range conditions { + if strings.HasPrefix(k, "#") { + cs = append(cs, v) + continue + } + cs = append(cs, fmt.Sprintf("%s=\"%s\"", k, v)) + } + tmpCondition := "" + if len(conditions) > 0 { + tmpCondition = "|" + strings.Join(cs, "|") + } + queries.Set("query", fmt.Sprintf("sum(count_over_time({cluster=\"%s\"} %s [720h]))", clusterId, tmpCondition)) + list, err := send[LogCount](http.MethodGet, fmt.Sprintf("%s/loki/api/v1/query", d.url), d.headers, queries, "") + if err != nil { + return 0, err + } + if len(list) < 1 || len(list[0].Value) < 2 { + return 0, nil + } + value, ok := list[0].Value[1].(string) + if !ok { + return 0, nil + } + v, err := strconv.ParseInt(value, 10, 64) + if err != nil { + return 0, err + } + return v, nil +} + +func send[T any](method string, uri string, headers map[string]string, queries url.Values, body string) ([]*T, error) { + if queries != nil && len(queries) > 0 { + uri = fmt.Sprintf("%s?%s", uri, queries.Encode()) + } + req, err := http.NewRequest(method, uri, strings.NewReader(body)) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w,uri is %s", err, uri) + } + for key, value := range headers { + req.Header.Set(key, value) + } + resp, err := client.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to send request: %w", err) + } + defer resp.Body.Close() + respData, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read response: %w", err) + } + + if resp.StatusCode > 399 { + return nil, fmt.Errorf("failed to send request: %s,body is %s", resp.Status, string(respData)) + } + + result := new(Response[T]) + err = json.Unmarshal(respData, result) + if err != nil { + return nil, fmt.Errorf("failed to decode response: %w,body is %s", err, string(respData)) + } + return result.Data.Result, nil +} diff --git a/log-driver/loki/loki_test.go b/log-driver/loki/loki_test.go new file mode 100644 index 00000000..61f2133e --- /dev/null +++ b/log-driver/loki/loki_test.go @@ -0,0 +1,95 @@ +package loki + +import ( + "testing" + "time" +) + +func TestLoki(t *testing.T) { + d, err := NewDriver(`{"url":"http://localhost:3100","header":{"Content-Type":"application/json","X-Scope-OrgID":"tenant1"}}`) + if err != nil { + t.Fatalf("failed to create driver: %v", err) + } + logCountResult, err := d.LogCount("apinto", nil, 720, "block_name") + if err != nil { + t.Fatalf("failed to get log count: %v", err) + } + t.Log(logCountResult) + logs, count, err := d.Logs("apinto", map[string]string{"block_name": "03899736-5d79-4f26-bd6a-c312a5880780"}, time.Now().Add(-time.Hour*24), time.Now(), 1, 1) + if err != nil { + t.Fatalf("failed to get logs: %v", err) + } + t.Log(logs, count) + info, err := d.LogInfo("apinto", "c9f6b19c-7dfe-496b-9b39-4d049232fe95") + if err != nil { + t.Fatalf("failed to get log info: %v", err) + } + t.Log(info) +} + +//func TestLokiLog(t *testing.T) { +// +// headers := make(map[string]string) +// headers["Content-Type"] = "application/json" +// headers["X-Scope-OrgID"] = "tenant1" +// queries := url.Values{} +// queries.Set("query", "{cluster=\"apinto\"} | json | request_id = `c9f6b19c-7dfe-496b-9b39-4d049232fe95`") +// now := time.Now() +// start := now.Add(-time.Hour * 24 * 30) +// queries.Set("start", strconv.FormatInt(start.UnixNano(), 10)) +// queries.Set("end", strconv.FormatInt(now.UnixNano(), 10)) +// queries.Set("limit", "100") +// a := time.Now() +// result, err := send[LogInfo](http.MethodGet, "http://localhost:3100/loki/api/v1/query_range", headers, queries, "") +// if err != nil { +// t.Fatalf("failed to send request: %v", err) +// } +// t.Log(time.Now().Sub(a)) +// data, err := json.Marshal(result) +// if err != nil { +// t.Fatalf("failed to marshal data: %v", err) +// } +// t.Log(string(data)) +//} +// +//func TestLokiLogCount(t *testing.T) { +// headers := make(map[string]string) +// headers["Content-Type"] = "application/json" +// headers["X-Scope-OrgID"] = "tenant1" +// queries := url.Values{} +// //queries.Set("query", "sum(count_over_time({cluster=\"apinto\"}[24h])) by (block_name)") +// queries.Set("query", "sum(count_over_time({cluster=\"apinto\"}[24h]))") +// result, err := send[LogCount](http.MethodGet, "http://localhost:3100/loki/api/v1/query", headers, queries, "") +// if err != nil { +// t.Fatalf("failed to send request: %v", err) +// } +// data, err := json.Marshal(result) +// if err != nil { +// t.Fatalf("failed to marshal data: %v", err) +// } +// t.Log(string(data)) +//} +// +//func TestLokiLogs(t *testing.T) { +// headers := make(map[string]string) +// headers["Content-Type"] = "application/json" +// headers["X-Scope-OrgID"] = "tenant1" +// queries := url.Values{} +// queries.Set("query", "{cluster=\"apinto\"} | json | block_name=\"03899736-5d79-4f26-bd6a-c312a5880780\"") +// now := time.Now() +// start := now.Add(-time.Hour * 24 * 30) +// queries.Set("start", strconv.FormatInt(start.UnixNano(), 10)) +// queries.Set("end", strconv.FormatInt(now.UnixNano(), 10)) +// queries.Set("limit", "1") +// now = time.Now() +// result, err := send[map[string]interface{}](http.MethodGet, "http://localhost:3100/loki/api/v1/query_range", headers, queries, "") +// t.Log(time.Now().Sub(now)) +// if err != nil { +// t.Fatalf("failed to send request: %v", err) +// } +// data, err := json.Marshal(result) +// if err != nil { +// t.Fatalf("failed to marshal data: %v", err) +// } +// t.Log(string(data)) +//} diff --git a/module/log/dto/input.go b/module/log/dto/input.go new file mode 100644 index 00000000..6d3254c0 --- /dev/null +++ b/module/log/dto/input.go @@ -0,0 +1,7 @@ +package log_dto + +type Save struct { + ID string `json:"id"` + Cluster string `json:"cluster"` + Config map[string]interface{} `json:"config"` +} diff --git a/module/log/dto/output.go b/module/log/dto/output.go new file mode 100644 index 00000000..e6f7cb47 --- /dev/null +++ b/module/log/dto/output.go @@ -0,0 +1,14 @@ +package log_dto + +import ( + "github.com/eolinker/go-common/auto" +) + +type LogSource struct { + ID string `json:"id"` + Config map[string]interface{} `json:"config"` + Creator auto.Label `json:"creator" aolabel:"user"` + Updater auto.Label `json:"updater" aolabel:"user"` + CreateAt auto.TimeLabel `json:"create_time"` + UpdateAt auto.TimeLabel `json:"update_time"` +} diff --git a/module/log/iml.go b/module/log/iml.go new file mode 100644 index 00000000..eb3d39d5 --- /dev/null +++ b/module/log/iml.go @@ -0,0 +1,62 @@ +package log + +import ( + "context" + "encoding/json" + "errors" + + "gorm.io/gorm" + + "github.com/APIParkLab/APIPark/service/cluster" + + "github.com/eolinker/go-common/auto" + + log_dto "github.com/APIParkLab/APIPark/module/log/dto" + "github.com/APIParkLab/APIPark/service/log" +) + +var _ ILogModule = (*imlLogModule)(nil) + +type imlLogModule struct { + service log.ILogService `autowired:""` +} + +func (i *imlLogModule) Save(ctx context.Context, driver string, input *log_dto.Save) error { + input.Cluster = cluster.DefaultClusterID + var cfg *string + if input.Config != nil { + data, _ := json.Marshal(input.Config) + tmp := string(data) + cfg = &tmp + } + return i.service.UpdateLogSource(ctx, driver, &log.Save{ + ID: input.ID, + Cluster: &input.Cluster, + Config: cfg, + }) +} + +func (i *imlLogModule) Get(ctx context.Context, driver string) (*log_dto.LogSource, error) { + info, err := i.service.GetLogSource(ctx, driver) + if err != nil { + if !errors.Is(err, gorm.ErrRecordNotFound) { + return nil, err + } + return nil, nil + } + cfg := make(map[string]interface{}) + if info.Config != "" { + err = json.Unmarshal([]byte(info.Config), &cfg) + if err != nil { + return nil, err + } + } + return &log_dto.LogSource{ + ID: info.ID, + Config: cfg, + Creator: auto.UUID(info.Creator), + Updater: auto.UUID(info.Updater), + CreateAt: auto.TimeLabel(info.CreateAt), + UpdateAt: auto.TimeLabel(info.UpdateAt), + }, nil +} diff --git a/module/log/module.go b/module/log/module.go new file mode 100644 index 00000000..548e3db3 --- /dev/null +++ b/module/log/module.go @@ -0,0 +1,21 @@ +package log + +import ( + "context" + "reflect" + + "github.com/eolinker/go-common/autowire" + + log_dto "github.com/APIParkLab/APIPark/module/log/dto" +) + +type ILogModule interface { + Save(ctx context.Context, driver string, input *log_dto.Save) error + Get(ctx context.Context, driver string) (*log_dto.LogSource, error) +} + +func init() { + autowire.Auto[ILogModule](func() reflect.Value { + return reflect.ValueOf(new(imlLogModule)) + }) +} diff --git a/module/publish/iml.go b/module/publish/iml.go index c1feb4bd..04cafa75 100644 --- a/module/publish/iml.go +++ b/module/publish/iml.go @@ -2,10 +2,18 @@ package publish import ( "context" + "encoding/json" "errors" "fmt" "time" + strategy_driver "github.com/APIParkLab/APIPark/module/strategy/driver" + strategy_dto "github.com/APIParkLab/APIPark/module/strategy/dto" + + "github.com/eolinker/eosc" + + "github.com/APIParkLab/APIPark/service/strategy" + "github.com/eolinker/go-common/store" "github.com/APIParkLab/APIPark/service/service" @@ -43,6 +51,7 @@ type imlPublishModule struct { publishService publish.IPublishService `autowired:""` apiService api.IAPIService `autowired:""` upstreamService upstream.IUpstreamService `autowired:""` + strategyService strategy.IStrategyService `autowired:""` releaseService release.IReleaseService `autowired:""` clusterService cluster.IClusterService `autowired:""` serviceService service.IServiceService `autowired:""` @@ -59,7 +68,7 @@ func (m *imlPublishModule) initGateway(ctx context.Context, partitionId string, return p.Id }) for _, projectId := range projectIds { - releaseInfo, err := m.getProjectRelease(ctx, projectId, partitionId) + releaseInfo, err := m.GetProjectRelease(ctx, projectId, partitionId) if err != nil { return err } @@ -75,22 +84,15 @@ func (m *imlPublishModule) initGateway(ctx context.Context, partitionId string, return nil } -func (m *imlPublishModule) getProjectRelease(ctx context.Context, projectID string, partitionId string) (*gateway.ProjectRelease, error) { - - releaseInfo, err := m.releaseService.GetRunning(ctx, projectID) - if err != nil { - if !errors.Is(err, gorm.ErrRecordNotFound) { - return nil, err - } - return nil, nil - } - commits, err := m.releaseService.GetCommits(ctx, releaseInfo.UUID) +func (m *imlPublishModule) getProjectRelease(ctx context.Context, projectID string, commitId string) (*gateway.ProjectRelease, error) { + commits, err := m.releaseService.GetCommits(ctx, commitId) if err != nil { return nil, err } apiIds := make([]string, 0, len(commits)) apiProxyCommitIds := make([]string, 0, len(commits)) upstreamCommitIds := make([]string, 0, len(commits)) + strategyCommitIds := make([]string, 0, len(commits)) for _, c := range commits { switch c.Type { case release.CommitApiProxy: @@ -98,6 +100,8 @@ func (m *imlPublishModule) getProjectRelease(ctx context.Context, projectID stri apiProxyCommitIds = append(apiProxyCommitIds, c.Commit) case release.CommitUpstream: upstreamCommitIds = append(upstreamCommitIds, c.Commit) + case release.CommitStrategy: + strategyCommitIds = append(strategyCommitIds, c.Commit) } } @@ -114,99 +118,11 @@ func (m *imlPublishModule) getProjectRelease(ctx context.Context, projectID stri return c.Target, c.Data }) - upstreamCommits, err := m.upstreamService.ListCommit(ctx, upstreamCommitIds...) - if err != nil { - return nil, err + version := commitId + r := &gateway.ProjectRelease{ + Id: projectID, + Version: version, } - version := releaseInfo.UUID - apis := make([]*gateway.ApiRelease, 0, len(apiInfos)) - for _, a := range apiInfos { - apiInfo := &gateway.ApiRelease{ - BasicItem: &gateway.BasicItem{ - ID: a.UUID, - Description: a.Description, - Version: version, - }, - Path: a.Path, - Methods: a.Methods, - Service: a.Service, - } - proxy, ok := proxyCommitMap[a.UUID] - if ok { - apiInfo.ProxyPath = proxy.Path - apiInfo.ProxyHeaders = utils.SliceToSlice(proxy.Headers, func(h *api.Header) *gateway.ProxyHeader { - return &gateway.ProxyHeader{ - Key: h.Key, - Value: h.Value, - } - }) - apiInfo.Retry = proxy.Retry - apiInfo.Timeout = proxy.Timeout - } - apis = append(apis, apiInfo) - } - var upstreamRelease *gateway.UpstreamRelease - for _, c := range upstreamCommits { - if c.Key != partitionId { - continue - } - upstreamRelease = &gateway.UpstreamRelease{ - BasicItem: &gateway.BasicItem{ - ID: c.Target, - Version: version, - MatchLabels: map[string]string{ - "serviceId": projectID, - }, - }, - PassHost: c.Data.PassHost, - Scheme: c.Data.Scheme, - Balance: c.Data.Balance, - Timeout: c.Data.Timeout, - Nodes: utils.SliceToSlice(c.Data.Nodes, func(n *upstream.NodeConfig) string { - return fmt.Sprintf("%s weight=%d", n.Address, n.Weight) - }), - } - } - - return &gateway.ProjectRelease{ - Id: projectID, - Version: version, - Apis: apis, - Upstream: upstreamRelease, - }, nil -} - -func (m *imlPublishModule) getReleaseInfo(ctx context.Context, projectID, releaseId, version string, clusterIds []string) (map[string]*gateway.ProjectRelease, error) { - commits, err := m.releaseService.GetCommits(ctx, releaseId) - if err != nil { - return nil, err - } - apiIds := make([]string, 0, len(commits)) - apiProxyCommitIds := make([]string, 0, len(commits)) - upstreamCommitIds := make([]string, 0, len(commits)) - for _, c := range commits { - switch c.Type { - case release.CommitApiProxy: - apiIds = append(apiIds, c.Target) - apiProxyCommitIds = append(apiProxyCommitIds, c.Commit) - case release.CommitUpstream: - upstreamCommitIds = append(upstreamCommitIds, c.Commit) - } - } - - apiInfos, err := m.apiService.ListInfo(ctx, apiIds...) - if err != nil { - return nil, err - } - - proxyCommits, err := m.apiService.ListProxyCommit(ctx, apiProxyCommitIds...) - if err != nil { - return nil, err - } - proxyCommitMap := utils.SliceToMapO(proxyCommits, func(c *commit.Commit[api.Proxy]) (string, *api.Proxy) { - return c.Target, c.Data - }) - apis := make([]*gateway.ApiRelease, 0, len(apiInfos)) for _, a := range apiInfos { apiInfo := &gateway.ApiRelease{ @@ -240,44 +156,93 @@ func (m *imlPublishModule) getReleaseInfo(ctx context.Context, projectID, releas } apis = append(apis, apiInfo) } - projectReleaseMap := make(map[string]*gateway.ProjectRelease) - upstreamReleaseMap := make(map[string]*gateway.UpstreamRelease) + r.Apis = apis + var upstreamRelease *gateway.UpstreamRelease if len(upstreamCommitIds) > 0 { upstreamCommits, err := m.upstreamService.ListCommit(ctx, upstreamCommitIds...) if err != nil { return nil, err } for _, c := range upstreamCommits { - for _, partitionId := range clusterIds { - upstreamRelease := &gateway.UpstreamRelease{ - BasicItem: &gateway.BasicItem{ - ID: c.Target, - Version: version, - MatchLabels: map[string]string{ - "serviceId": projectID, - }, + upstreamRelease = &gateway.UpstreamRelease{ + BasicItem: &gateway.BasicItem{ + ID: c.Target, + Version: version, + MatchLabels: map[string]string{ + "serviceId": projectID, }, - PassHost: c.Data.PassHost, - Scheme: c.Data.Scheme, - Balance: c.Data.Balance, - Timeout: c.Data.Timeout, - Nodes: utils.SliceToSlice(c.Data.Nodes, func(n *upstream.NodeConfig) string { - return fmt.Sprintf("%s weight=%d", n.Address, n.Weight) - }), - } - - upstreamReleaseMap[partitionId] = upstreamRelease + }, + PassHost: c.Data.PassHost, + Scheme: c.Data.Scheme, + Balance: c.Data.Balance, + Timeout: c.Data.Timeout, + Nodes: utils.SliceToSlice(c.Data.Nodes, func(n *upstream.NodeConfig) string { + return fmt.Sprintf("%s weight=%d", n.Address, n.Weight) + }), } } - + r.Upstream = upstreamRelease + } + if len(strategyCommitIds) > 0 { + strategyCommits, err := m.strategyService.ListStrategyCommit(ctx, strategyCommitIds...) + if err != nil { + return nil, err + } + strategyReleases := make([]*eosc.Base[gateway.StrategyRelease], 0, len(strategyCommits)) + for _, c := range strategyCommits { + s := c.Data + driver, has := strategy_driver.GetDriver(c.Data.Driver) + if !has { + continue + } + filters := make([]*strategy_dto.Filter, 0) + json.Unmarshal([]byte(s.Filters), &filters) + var cfg interface{} + json.Unmarshal([]byte(s.Config), &cfg) + strategyReleases = append(strategyReleases, driver.ToRelease(&strategy_dto.Strategy{ + Id: fmt.Sprintf("%s-%s", projectID, s.Id), + Name: s.Name, + Priority: s.Priority, + Filters: filters, + Config: cfg, + IsDelete: s.IsDelete || s.IsStop, + }, map[string][]string{ + "provider": {projectID}, + }, 5000)) + } + r.Strategies = strategyReleases } + return r, nil +} + +func (m *imlPublishModule) GetProjectRelease(ctx context.Context, projectID string, partitionId string) (*gateway.ProjectRelease, error) { + + releaseInfo, err := m.releaseService.GetRunning(ctx, projectID) + if err != nil { + if !errors.Is(err, gorm.ErrRecordNotFound) { + return nil, err + } + return nil, nil + } + + return m.getProjectRelease(ctx, projectID, releaseInfo.UUID) +} + +func (m *imlPublishModule) getReleaseInfo(ctx context.Context, projectID, releaseId, version string, clusterIds []string) (map[string]*gateway.ProjectRelease, error) { + projectRelease, err := m.getProjectRelease(ctx, projectID, releaseId) + if err != nil { + return nil, err + } + + projectReleaseMap := make(map[string]*gateway.ProjectRelease) for _, clusterId := range clusterIds { projectReleaseMap[clusterId] = &gateway.ProjectRelease{ - Id: projectID, - Version: version, - Apis: apis, - Upstream: upstreamReleaseMap[clusterId], + Id: projectID, + Version: version, + Apis: projectRelease.Apis, + Upstream: projectRelease.Upstream, + Strategies: projectRelease.Strategies, } } return projectReleaseMap, nil diff --git a/module/release/iml.go b/module/release/iml.go index 24fc4a20..b38a0ecb 100644 --- a/module/release/iml.go +++ b/module/release/iml.go @@ -5,6 +5,8 @@ import ( "errors" "fmt" + "github.com/google/uuid" + "github.com/APIParkLab/APIPark/service/strategy" api_doc "github.com/APIParkLab/APIPark/service/api-doc" @@ -48,18 +50,19 @@ type imlReleaseModule struct { clusterService cluster.IClusterService `autowired:""` } -func (m *imlReleaseModule) latestStrategyCommits(ctx context.Context, serviceId string) ([]*commit.Commit[strategy.StrategyCommit], error) { +func (m *imlReleaseModule) latestStrategyCommits(ctx context.Context, serviceId string) ([]*commit.Commit[strategy.Commit], error) { list, err := m.strategyService.All(ctx, 2, serviceId) if err != nil { return nil, fmt.Errorf("get latest strategy failed:%w", err) } - return utils.SliceToSlice(list, func(s *strategy.Strategy) *commit.Commit[strategy.StrategyCommit] { + return utils.SliceToSlice(list, func(s *strategy.Strategy) *commit.Commit[strategy.Commit] { key := fmt.Sprintf("service-%s", s.Id) - return &commit.Commit[strategy.StrategyCommit]{ + return &commit.Commit[strategy.Commit]{ + UUID: uuid.NewString(), Target: s.Id, Key: key, - Data: &strategy.StrategyCommit{ + Data: &strategy.Commit{ Id: s.Id, Name: s.Name, Priority: s.Priority, @@ -137,7 +140,7 @@ func (m *imlReleaseModule) Create(ctx context.Context, serviceId string, input * if err != nil { return "", err } - strategyCommits := utils.SliceToMapO(strategies, func(c *commit.Commit[strategy.StrategyCommit]) (string, string) { + strategyCommits := utils.SliceToMapO(strategies, func(c *commit.Commit[strategy.Commit]) (string, string) { return c.Target, c.UUID }) @@ -193,6 +196,7 @@ func (m *imlReleaseModule) Create(ctx context.Context, serviceId string, input * if err != nil { return err } + if !m.releaseService.Completeness(utils.SliceToSlice(clusters, func(s *cluster.Cluster) string { return s.Uuid }), apiUUIDS, requestCommits, apiProxy, upstreams) { diff --git a/module/service-diff/iml.go b/module/service-diff/iml.go index 168bbadd..ddc11a87 100644 --- a/module/service-diff/iml.go +++ b/module/service-diff/iml.go @@ -83,18 +83,18 @@ func (m *imlServiceDiff) getBaseInfo(ctx context.Context, serviceId, baseRelease return base, nil } -func (m *imlServiceDiff) latestStrategyCommits(ctx context.Context, serviceId string) ([]*commit.Commit[strategy.StrategyCommit], error) { +func (m *imlServiceDiff) latestStrategyCommits(ctx context.Context, serviceId string) ([]*commit.Commit[strategy.Commit], error) { list, err := m.strategyService.All(ctx, 2, serviceId) if err != nil { return nil, fmt.Errorf("get latest strategy failed:%w", err) } - return utils.SliceToSlice(list, func(s *strategy.Strategy) *commit.Commit[strategy.StrategyCommit] { + return utils.SliceToSlice(list, func(s *strategy.Strategy) *commit.Commit[strategy.Commit] { key := fmt.Sprintf("service-%s", s.Id) - return &commit.Commit[strategy.StrategyCommit]{ + return &commit.Commit[strategy.Commit]{ Target: s.Id, Key: key, - Data: &strategy.StrategyCommit{ + Data: &strategy.Commit{ Id: s.Id, Name: s.Name, Priority: s.Priority, @@ -252,11 +252,11 @@ func (m *imlServiceDiff) getReleaseInfo(ctx context.Context, releaseId string) ( }, nil } -func (m *imlServiceDiff) diffStrategies(base, target []*commit.Commit[strategy.StrategyCommit]) []*service_diff.StrategyDiff { - baseStrategy := utils.SliceToMap(base, func(i *commit.Commit[strategy.StrategyCommit]) string { +func (m *imlServiceDiff) diffStrategies(base, target []*commit.Commit[strategy.Commit]) []*service_diff.StrategyDiff { + baseStrategy := utils.SliceToMap(base, func(i *commit.Commit[strategy.Commit]) string { return i.Target }) - targetStrategy := utils.SliceToMap(target, func(i *commit.Commit[strategy.StrategyCommit]) string { + targetStrategy := utils.SliceToMap(target, func(i *commit.Commit[strategy.Commit]) string { return i.Target }) out := make([]*service_diff.StrategyDiff, 0, len(target)) diff --git a/module/service-diff/out.go b/module/service-diff/out.go index b4b3dbfb..c02c214c 100644 --- a/module/service-diff/out.go +++ b/module/service-diff/out.go @@ -46,5 +46,5 @@ type projectInfo struct { apiProxyCommits []*commit.Commit[api.Proxy] apiDocCommits []*commit.Commit[api_doc.DocCommit] upstreamCommits []*commit.Commit[upstream.Config] - strategyCommits []*commit.Commit[strategy.StrategyCommit] + strategyCommits []*commit.Commit[strategy.Commit] } diff --git a/module/strategy/driver/data-masking/driver.go b/module/strategy/driver/data-masking/driver.go index f3608d82..9a5ea34f 100644 --- a/module/strategy/driver/data-masking/driver.go +++ b/module/strategy/driver/data-masking/driver.go @@ -5,6 +5,9 @@ import ( "errors" "fmt" + "github.com/APIParkLab/APIPark/gateway" + "github.com/eolinker/eosc" + strategy_driver "github.com/APIParkLab/APIPark/module/strategy/driver" strategy_dto "github.com/APIParkLab/APIPark/module/strategy/dto" @@ -22,20 +25,30 @@ func (d *strategyDriver) Driver() string { return "data-masking" } -func (d *strategyDriver) ToApinto(s strategy_dto.Strategy) interface{} { +func (d *strategyDriver) ToRelease(s *strategy_dto.Strategy, labels map[string][]string, initStep int) *eosc.Base[gateway.StrategyRelease] { filters := make(map[string][]string) for _, f := range s.Filters { filters[f.Name] = f.Values } - - return map[string]interface{}{ - "name": s.Filters, - "description": s.Desc, - "priority": s.Priority, - "filters": filters, - d.confName: s.Config, + for key, value := range labels { + filters[key] = value } + + base := eosc.NewBase[gateway.StrategyRelease]() + base.Config = &gateway.StrategyRelease{ + Name: s.Id, + Desc: s.Name, + Driver: "data_mask", + Priority: initStep + s.Priority, + Filters: filters, + IsDelete: s.IsDelete, + } + cfg := make(map[string]interface{}) + cfg[d.confName] = s.Config + base.Append = cfg + return base + } func (d *strategyDriver) Check(config interface{}) error { diff --git a/module/strategy/driver/driver.go b/module/strategy/driver/driver.go index 1bb5e4a2..2779ebeb 100644 --- a/module/strategy/driver/driver.go +++ b/module/strategy/driver/driver.go @@ -1,11 +1,13 @@ package strategy_driver import ( + "github.com/APIParkLab/APIPark/gateway" strategy_dto "github.com/APIParkLab/APIPark/module/strategy/dto" + "github.com/eolinker/eosc" ) type IStrategyDriver interface { Driver() string - ToApinto(strategy strategy_dto.Strategy) interface{} + ToRelease(s *strategy_dto.Strategy, labels map[string][]string, initStep int) *eosc.Base[gateway.StrategyRelease] Check(config interface{}) error } diff --git a/module/strategy/dto/output.go b/module/strategy/dto/output.go index e299f257..31ae38b2 100644 --- a/module/strategy/dto/output.go +++ b/module/strategy/dto/output.go @@ -25,7 +25,7 @@ func StrategyStatus(s *strategy.Strategy, publishVersion string) string { return publishStatus } -func ToStrategyItem(s *strategy.Strategy, publishVersion string, filters string) *StrategyItem { +func ToStrategyItem(s *strategy.Strategy, publishVersion string, filters string, processedTotal int64) *StrategyItem { publishStatus := PublishStatusOffline if publishVersion != "" { if s.IsDelete { @@ -48,7 +48,7 @@ func ToStrategyItem(s *strategy.Strategy, publishVersion string, filters string) Filters: filters, Updater: auto.UUID(s.Updater), UpdateTime: auto.TimeLabel(s.UpdateAt), - ProcessedTotal: 0, + ProcessedTotal: processedTotal, PublishStatus: publishStatus, IsStop: s.IsStop, IsDelete: s.IsDelete, @@ -67,6 +67,7 @@ func ToStrategy(s *strategy.Strategy) *Strategy { Desc: s.Desc, Filters: filters, Config: cfg, + IsDelete: s.IsDelete || s.IsStop, } } @@ -77,6 +78,7 @@ type Strategy struct { Desc string `json:"desc"` Filters []*Filter `json:"filters"` Config interface{} `json:"config"` + IsDelete bool `json:"is_delete"` } type StrategyItem struct { @@ -87,7 +89,7 @@ type StrategyItem struct { Filters string `json:"filters"` Updater auto.Label `json:"updater" aolabel:"user"` UpdateTime auto.TimeLabel `json:"update_time"` - ProcessedTotal int `json:"processed_total"` + ProcessedTotal int64 `json:"processed_total"` PublishStatus string `json:"publish_status"` IsStop bool `json:"is_stop"` IsDelete bool `json:"is_delete"` @@ -112,3 +114,21 @@ type ToPublishItem struct { Status string `json:"status"` OptTime time.Time `json:"opt_time"` } + +type LogItem struct { + ID string `json:"id"` + Service auto.Label `json:"service" aolabel:"service"` + Method string `json:"method"` + Url string `json:"url"` + RemoteIP string `json:"remote_ip"` + Consumer auto.Label `json:"consumer" aolabel:"service"` + Authorization auto.Label `json:"authorization" aolabel:"authorization"` + RecordTime auto.TimeLabel `json:"record_time"` +} + +type LogInfo struct { + ID string `json:"id"` + ContentType string `json:"content_type"` + ProxyResponseBody string `json:"origin"` + ResponseBody string `json:"target"` +} diff --git a/module/strategy/iml.go b/module/strategy/iml.go index 41b99d4e..9b6ac886 100644 --- a/module/strategy/iml.go +++ b/module/strategy/iml.go @@ -5,11 +5,22 @@ import ( "encoding/json" "errors" "fmt" + "sort" "strings" + "time" - "gorm.io/gorm" + "github.com/APIParkLab/APIPark/service/service" + "github.com/eolinker/go-common/auto" + + "github.com/APIParkLab/APIPark/service/cluster" + + "github.com/APIParkLab/APIPark/gateway" + "github.com/eolinker/eosc" + + log2 "github.com/APIParkLab/APIPark/service/log" "github.com/eolinker/eosc/log" + "gorm.io/gorm" strategy_filter "github.com/APIParkLab/APIPark/strategy-filter" @@ -32,9 +43,74 @@ var _ IStrategyModule = (*imlStrategyModule)(nil) type imlStrategyModule struct { strategyService strategy.IStrategyService `autowired:""` + appService service.IServiceService `autowired:""` + logService log2.ILogService `autowired:""` + clusterService cluster.IClusterService `autowired:""` transaction store.ITransaction `autowired:""` } +func (i *imlStrategyModule) StrategyLogInfo(ctx context.Context, id string) (*strategy_dto.LogInfo, error) { + c, err := i.clusterService.Get(ctx, cluster.DefaultClusterID) + if err != nil { + return nil, fmt.Errorf("cluster %s not found", cluster.DefaultClusterID) + } + + info, err := i.logService.LogInfo(ctx, "loki", c.Cluster, id) + if err != nil { + return nil, err + } + return &strategy_dto.LogInfo{ + ID: info.ID, + ContentType: info.ContentType, + ProxyResponseBody: info.ProxyResponseBody, + ResponseBody: info.ResponseBody, + }, nil +} + +func (i *imlStrategyModule) GetStrategyLogs(ctx context.Context, keyword string, strategyID string, start time.Time, end time.Time, limit int64, offset int64) ([]*strategy_dto.LogItem, int64, error) { + conditions := map[string]string{ + "block_name": strategyID, + } + if keyword != "" { + // 查询符合条件的应用ID + apps, err := i.appService.Search(ctx, keyword, map[string]interface{}{ + "as_app": true, + }) + if err != nil { + return nil, 0, err + } + orCondition := fmt.Sprintf("request_uri =~ \"*%s*\"", keyword) + if len(apps) > 0 { + appIds := utils.SliceToSlice(apps, func(a *service.Service) string { return a.Id }) + orCondition = fmt.Sprintf("%s or application =~ \"%s\"", orCondition, strings.Join(appIds, "|")) + } + conditions["#1"] = orCondition + } + + c, err := i.clusterService.Get(ctx, cluster.DefaultClusterID) + if err != nil { + return nil, 0, fmt.Errorf("cluster %s not found", cluster.DefaultClusterID) + } + items, total, err := i.logService.Logs(ctx, "loki", c.Cluster, conditions, start, end, limit, offset) + if err != nil { + return nil, 0, err + } + result := make([]*strategy_dto.LogItem, 0, len(items)) + for _, item := range items { + result = append(result, &strategy_dto.LogItem{ + ID: item.ID, + Service: auto.UUID(item.Service), + Method: item.Method, + Url: item.Url, + RemoteIP: item.RemoteIP, + Consumer: auto.UUID(item.Consumer), + Authorization: auto.UUID(item.Authorization), + RecordTime: auto.TimeLabel(item.RecordTime), + }) + } + return result, total, nil +} + func (i *imlStrategyModule) Restore(ctx context.Context, id string) error { return i.strategyService.Restore(ctx, id) } @@ -62,7 +138,7 @@ func (i *imlStrategyModule) ToPublish(ctx context.Context, driver string) ([]*st if err != nil { return nil, err } - commitMap := utils.SliceToMapO(commits, func(c *commit.Commit[strategy.StrategyCommit]) (string, string) { return c.Data.Id, c.Data.Version }) + commitMap := utils.SliceToMapO(commits, func(c *commit.Commit[strategy.Commit]) (string, string) { return c.Data.Id, c.Data.Version }) items := make([]*strategy_dto.ToPublishItem, 0, len(list)) for _, l := range list { status := strategy_dto.StrategyStatus(l, commitMap[l.Id]) @@ -84,13 +160,28 @@ func (i *imlStrategyModule) Search(ctx context.Context, keyword string, driver s if err != nil { return nil, 0, err } + if len(list) < 1 { + return nil, 0, nil + } strategyIds := utils.SliceToSlice(list, func(l *strategy.Strategy) string { return l.Id }) commits, err := i.strategyService.ListLatestStrategyCommit(ctx, scope.String(), target, strategyIds...) if err != nil { return nil, 0, err } - commitMap := utils.SliceToMapO(commits, func(c *commit.Commit[strategy.StrategyCommit]) (string, string) { return c.Data.Id, c.Data.Version }) + commitMap := utils.SliceToMapO(commits, func(c *commit.Commit[strategy.Commit]) (string, string) { return c.Data.Id, c.Data.Version }) items := make([]*strategy_dto.StrategyItem, 0, len(list)) + countMap := make(map[string]int64) + c, err := i.clusterService.Get(ctx, cluster.DefaultClusterID) + if err == nil { + countMap, err = i.logService.LogCount(ctx, "loki", c.Cluster, map[string]string{ + "#1": fmt.Sprintf("block_name =~ \"%s\"", strings.Join(strategyIds, "|")), + }, 720, + "block_name") + if err != nil { + log.Errorf("get log count error: %v", err) + } + } + for _, l := range list { fs := make([]*strategy_dto.Filter, 0) @@ -104,9 +195,12 @@ func (i *imlStrategyModule) Search(ctx context.Context, keyword string, driver s } filterList = append(filterList, fmt.Sprintf("[%s:%s]", info.Title, info.Label)) } - item := strategy_dto.ToStrategyItem(l, commitMap[l.Id], strings.Join(filterList, ";")) + item := strategy_dto.ToStrategyItem(l, commitMap[l.Id], strings.Join(filterList, ";"), countMap[l.Id]) items = append(items, item) } + sort.Slice(items, func(i, j int) bool { + return items[i].Priority < items[j].Priority + }) return items, total, nil } @@ -213,11 +307,17 @@ func (i *imlStrategyModule) Disable(ctx context.Context, id string) error { } func (i *imlStrategyModule) Publish(ctx context.Context, driver string, scope string, target string) error { + d, has := strategy_driver.GetDriver(driver) + if !has { + return fmt.Errorf("driver not found: %s", driver) + } list, err := i.strategyService.AllByDriver(ctx, driver, strategy_dto.ToScope(scope).Int(), target) if err != nil { return err } + return i.transaction.Transaction(ctx, func(txCtx context.Context) error { + publishStrategies := make([]*eosc.Base[gateway.StrategyRelease], 0, len(list)) for _, l := range list { if l.IsDelete { err = i.strategyService.Delete(ctx, l.Id) @@ -225,14 +325,21 @@ func (i *imlStrategyModule) Publish(ctx context.Context, driver string, scope st return err } } + publishStrategies = append(publishStrategies, d.ToRelease(strategy_dto.ToStrategy(l), nil, 0)) - // TODO:同步到网关 err = i.strategyService.CommitStrategy(txCtx, scope, target, l.Id, l) if err != nil { return err } } - return nil + client, err := i.clusterService.GatewayClient(ctx, cluster.DefaultClusterID) + if err != nil { + return err + } + defer func() { + _ = client.Close(ctx) + }() + return client.Strategy().Online(ctx, publishStrategies...) }) } diff --git a/module/strategy/module.go b/module/strategy/module.go index 396622f2..c716f9c0 100644 --- a/module/strategy/module.go +++ b/module/strategy/module.go @@ -3,6 +3,7 @@ package strategy import ( "context" "reflect" + "time" "github.com/eolinker/go-common/autowire" @@ -23,6 +24,9 @@ type IStrategyModule interface { Restore(ctx context.Context, id string) error DeleteServiceStrategy(ctx context.Context, serviceId string, id string) error + + StrategyLogInfo(ctx context.Context, id string) (*strategy_dto.LogInfo, error) + GetStrategyLogs(ctx context.Context, keyword string, strategyID string, start time.Time, end time.Time, limit int64, offset int64) ([]*strategy_dto.LogItem, int64, error) } func init() { diff --git a/plugins/core/core.go b/plugins/core/core.go index 1049006d..7a3e57d6 100644 --- a/plugins/core/core.go +++ b/plugins/core/core.go @@ -3,6 +3,8 @@ package core import ( "net/http" + "github.com/APIParkLab/APIPark/controller/log" + "github.com/APIParkLab/APIPark/controller/strategy" "github.com/APIParkLab/APIPark/controller/ai" @@ -87,6 +89,7 @@ type plugin struct { aiProviderController ai.IProviderController `autowired:""` settingController system.ISettingController `autowired:""` initController system.IInitController `autowired:""` + logController log.ILogController `autowired:""` apis []pm3.Api } @@ -109,6 +112,7 @@ func (p *plugin) OnComplete() { p.apis = append(p.apis, p.systemApis()...) p.apis = append(p.apis, p.aiAPIs()...) p.apis = append(p.apis, p.strategyApis()...) + p.apis = append(p.apis, p.logApis()...) } func (p *plugin) Name() string { diff --git a/plugins/core/log.go b/plugins/core/log.go new file mode 100644 index 00000000..98a6dc25 --- /dev/null +++ b/plugins/core/log.go @@ -0,0 +1,14 @@ +package core + +import ( + "net/http" + + "github.com/eolinker/go-common/pm3" +) + +func (p *plugin) logApis() []pm3.Api { + return []pm3.Api{ + pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/log/:driver", []string{"context", "rest:driver"}, []string{"info"}, p.logController.Get), + pm3.CreateApiWidthDoc(http.MethodPost, "/api/v1/log/:driver", []string{"context", "rest:driver", "body"}, nil, p.logController.Save), + } +} diff --git a/plugins/core/strategy.go b/plugins/core/strategy.go index d9e9a074..6d8ca8f3 100644 --- a/plugins/core/strategy.go +++ b/plugins/core/strategy.go @@ -33,5 +33,7 @@ func (p *plugin) strategyApis() []pm3.Api { pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/strategy/service/filter-remote/:name", []string{"context", "query:service", "rest:name"}, []string{"titles", "list", "total", "key", "value"}, p.strategyController.FilterServiceRemote), pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/strategy/global/:driver/to-publishs", []string{"context", "rest:driver"}, []string{"strategies", "source", "version_name", "is_publish"}, p.strategyController.ToPublish), + pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/strategy/global/:driver/logs", []string{"context", "query:keyword", "query:strategy", "query:begin", "query:end", "query:page", "query:page_size"}, []string{"logs", "total"}, p.strategyController.GetStrategyLogs), + pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/strategy/global/:driver/log", []string{"context", "query:id"}, []string{"log"}, p.strategyController.LogInfo), } } diff --git a/resources/plugin/plugin.yml b/resources/plugin/plugin.yml index d1996727..f731a53f 100644 --- a/resources/plugin/plugin.yml +++ b/resources/plugin/plugin.yml @@ -1,4 +1,4 @@ -version: v6 +version: v7 sort: - "access_log" - "monitor" @@ -75,6 +75,12 @@ plugin: status: global config: cache: redis@output + strategy_data_mask: + id: eolinker.com:apinto:strategy-plugin-data_mask + name: strategy_data_mask + status: global + config: + cache: redis@output ai_prompt: id: eolinker.com:apinto:ai_prompt name: ai_prompt diff --git a/resources/plugin/render/eolinker_com_apinto_strategy-plugin-data_mask.json b/resources/plugin/render/eolinker_com_apinto_strategy-plugin-data_mask.json new file mode 100644 index 00000000..01d71639 --- /dev/null +++ b/resources/plugin/render/eolinker_com_apinto_strategy-plugin-data_mask.json @@ -0,0 +1 @@ +{"group":"eolinker.com","project":"apinto","name":"strategy-plugin-data_mask","version":"innert","render":{"type":"object","eo:type":"object"}} \ No newline at end of file diff --git a/resources/plugin/render/render.init.sh b/resources/plugin/render/render.init.sh index 8ada333f..69377aec 100644 --- a/resources/plugin/render/render.init.sh +++ b/resources/plugin/render/render.init.sh @@ -1,6 +1,7 @@ #!/bin/sh curl -s http://127.0.0.1:9400/extender/eolinker.com:apinto/access_log > eolinker_com_apinto_access_log.json curl -s http://127.0.0.1:9400/extender/eolinker.com:apinto/strategy-plugin-visit > eolinker_com_apinto_strategy-plugin-visit.json +curl -s http://127.0.0.1:9400/extender/eolinker.com:apinto/strategy-plugin-data_mask > eolinker_com_apinto_strategy-plugin-data_mask.json curl -s http://127.0.0.1:9400/extender/eolinker.com:apinto/plugin_app > eolinker_com_apinto_plugin_app.json curl -s http://127.0.0.1:9400/extender/eolinker.com:apinto/monitor > eolinker_com_apinto_monitor.json curl -s http://127.0.0.1:9400/extender/eolinker.com:apinto/http_to_dubbo2 > eolinker_com_apinto_http_to_dubbo2.json diff --git a/service/cluster/cluster.go b/service/cluster/cluster.go index 3156b044..0550c748 100644 --- a/service/cluster/cluster.go +++ b/service/cluster/cluster.go @@ -159,6 +159,7 @@ func (s *imlClusterService) Create(ctx context.Context, name string, resume stri UUID: apintoInfo.Cluster, Name: name, Resume: resume, + Cluster: apintoInfo.Cluster, Creator: operator, Updater: operator, CreateAt: time.Now(), @@ -285,13 +286,15 @@ func (s *imlClusterService) UpdateAddress(ctx context.Context, id string, addres return nil, err } cv = &cluster.Cluster{ - UUID: id, - Name: "默认集群", - Resume: "默认集群", + UUID: id, + Name: "默认集群", + Resume: "默认集群", + Creator: operator, CreateAt: now, } } + cv.Cluster = info.Cluster // check node nodeIds := utils.SliceToSlice(info.Nodes, func(i *admin.Node) string { diff --git a/service/cluster/model.go b/service/cluster/model.go index d6857f87..be0c31ee 100644 --- a/service/cluster/model.go +++ b/service/cluster/model.go @@ -2,7 +2,7 @@ package cluster import ( "time" - + "github.com/APIParkLab/APIPark/stores/cluster" ) @@ -10,6 +10,7 @@ type Cluster struct { Uuid string Name string Resume string + Cluster string Creator string Updater string Status int @@ -22,6 +23,7 @@ func FromEntity(entity *cluster.Cluster) *Cluster { Uuid: entity.UUID, Name: entity.Name, Resume: entity.Resume, + Cluster: entity.Cluster, Creator: entity.Creator, Updater: entity.Updater, CreateTime: entity.CreateAt, diff --git a/service/log/iml.go b/service/log/iml.go new file mode 100644 index 00000000..eba95f5d --- /dev/null +++ b/service/log/iml.go @@ -0,0 +1,156 @@ +package log + +import ( + "context" + "errors" + "time" + + "github.com/google/uuid" + + log_driver "github.com/APIParkLab/APIPark/log-driver" + + "github.com/eolinker/go-common/utils" + + "gorm.io/gorm" + + log_source "github.com/APIParkLab/APIPark/stores/log-source" +) + +var ( + _ ILogService = (*imlLogService)(nil) +) + +type imlLogService struct { + store log_source.ILogSourceStore `autowired:""` +} + +func (i *imlLogService) OnComplete() { + drivers := log_driver.Drivers() + for _, d := range drivers { + factory, has := log_driver.GetFactory(d) + if !has { + continue + } + s, err := i.GetLogSource(context.Background(), d) + if err != nil { + continue + } + driver, err := factory.Create(s.Config) + if err != nil { + continue + } + log_driver.SetDriver(d, driver) + + } +} + +func (i *imlLogService) UpdateLogSource(ctx context.Context, driver string, input *Save) error { + factory, has := log_driver.GetFactory(driver) + if !has { + return errors.New("driver not found") + } + s, err := i.store.First(ctx, map[string]interface{}{"driver": driver}) + if err != nil { + if !errors.Is(err, gorm.ErrRecordNotFound) { + return err + } + if input.ID == "" { + input.ID = uuid.NewString() + } + if input.Cluster == nil || *input.Cluster == "" { + return errors.New("cluster is required") + } + if input.Config == nil || *input.Config == "" { + return errors.New("config is required") + } + now := time.Now() + userId := utils.UserId(ctx) + s = &log_source.Log{ + UUID: input.ID, + Cluster: *input.Cluster, + Driver: driver, + Config: *input.Config, + Creator: userId, + Updater: userId, + CreateAt: now, + UpdateAt: now, + } + + } else { + if input.Config == nil || *input.Config == "" { + s.Config = *input.Config + } + s.Updater = utils.UserId(ctx) + s.UpdateAt = time.Now() + } + newDriver, err := factory.Create(s.Config) + if err != nil { + return err + } + err = i.store.Save(ctx, s) + if err != nil { + return err + } + log_driver.SetDriver(driver, newDriver) + return nil +} + +func (i *imlLogService) GetLogSource(ctx context.Context, driver string) (*Source, error) { + s, err := i.store.First(ctx, map[string]interface{}{"driver": driver}) + if err != nil { + return nil, err + } + return FromEntity(s), nil +} + +func (i *imlLogService) Logs(ctx context.Context, driver string, cluster string, conditions map[string]string, start time.Time, end time.Time, limit int64, offset int64) ([]*Item, int64, error) { + d, has := log_driver.GetDriver(driver) + if !has { + return nil, 0, errors.New("driver not found") + } + list, count, err := d.Logs(cluster, conditions, start, end, limit, offset) + if err != nil { + return nil, 0, err + } + result := make([]*Item, 0, len(list)) + for _, l := range list { + result = append(result, &Item{ + ID: l.ID, + Service: l.Service, + Method: l.Method, + Url: l.Url, + RemoteIP: l.RemoteIP, + Consumer: l.Consumer, + Authorization: l.Authorization, + RecordTime: l.RecordTime, + }) + } + return result, count, nil +} + +func (i *imlLogService) LogCount(ctx context.Context, driver string, cluster string, conditions map[string]string, spendHour int64, group string) (map[string]int64, error) { + d, has := log_driver.GetDriver(driver) + if !has { + return nil, errors.New("driver not found") + } + return d.LogCount(cluster, conditions, spendHour, group) +} + +func (i *imlLogService) LogInfo(ctx context.Context, driver string, cluster string, id string) (*Info, error) { + d, has := log_driver.GetDriver(driver) + if !has { + return nil, errors.New("driver not found") + } + info, err := d.LogInfo(cluster, id) + if err != nil { + return nil, err + } + return &Info{ + ID: info.ID, + ContentType: info.ContentType, + RequestBody: info.RequestBody, + ProxyBody: info.ProxyBody, + ProxyResponseBody: info.ProxyResponseBody, + ResponseBody: info.ResponseBody, + }, nil +} diff --git a/service/log/model.go b/service/log/model.go new file mode 100644 index 00000000..c42728ac --- /dev/null +++ b/service/log/model.go @@ -0,0 +1,57 @@ +package log + +import ( + "time" + + log_source "github.com/APIParkLab/APIPark/stores/log-source" +) + +type Save struct { + ID string + Cluster *string + Config *string +} + +type Source struct { + ID string + Cluster string + Driver string + Config string + Creator string + Updater string + CreateAt time.Time + UpdateAt time.Time +} + +func FromEntity(ov *log_source.Log) *Source { + return &Source{ + ID: ov.UUID, + Cluster: ov.Cluster, + Driver: ov.Driver, + Config: ov.Config, + Creator: ov.Creator, + Updater: ov.Updater, + CreateAt: ov.CreateAt, + UpdateAt: ov.UpdateAt, + } +} + +type Item struct { + ID string + Service string + Method string + Url string + RemoteIP string + Consumer string + Authorization string + RecordTime time.Time +} + +type Info struct { + ID string + ContentType string + RequestBody string + ProxyBody string + ProxyResponseBody string + ResponseBody string +} diff --git a/service/log/service.go b/service/log/service.go new file mode 100644 index 00000000..bedfbf28 --- /dev/null +++ b/service/log/service.go @@ -0,0 +1,25 @@ +package log + +import ( + "context" + "reflect" + "time" + + _ "github.com/APIParkLab/APIPark/log-driver/loki" + "github.com/eolinker/go-common/autowire" +) + +type ILogService interface { + UpdateLogSource(ctx context.Context, driver string, input *Save) error + GetLogSource(ctx context.Context, driver string) (*Source, error) + Logs(ctx context.Context, driver string, cluster string, conditions map[string]string, start time.Time, end time.Time, limit int64, offset int64) ([]*Item, int64, error) + LogCount(ctx context.Context, driver string, cluster string, conditions map[string]string, spendHour int64, group string) (map[string]int64, error) + LogInfo(ctx context.Context, driver string, cluster string, id string) (*Info, error) +} + +func init() { + logService := &imlLogService{} + autowire.Auto[ILogService](func() reflect.Value { + return reflect.ValueOf(logService) + }) +} diff --git a/service/strategy/iml.go b/service/strategy/iml.go index 3398c0a5..0f4ca210 100644 --- a/service/strategy/iml.go +++ b/service/strategy/iml.go @@ -16,8 +16,8 @@ import ( var _ IStrategyService = (*imlStrategyService)(nil) type imlStrategyService struct { - store strategy.IStrategyStore `autowired:""` - commitService commit.ICommitService[StrategyCommit] `autowired:""` + store strategy.IStrategyStore `autowired:""` + commitService commit.ICommitService[Commit] `autowired:""` universally.IServiceCreate[Create] universally.IServiceEdit[Edit] } @@ -73,23 +73,24 @@ func (i *imlStrategyService) CommitStrategy(ctx context.Context, scope string, t key = fmt.Sprintf("%s-%s", scope, target) } - return i.commitService.Save(ctx, strategyId, key, &StrategyCommit{ + return i.commitService.Save(ctx, strategyId, key, &Commit{ Id: data.Id, Name: data.Name, Priority: data.Priority, Filters: data.Filters, Config: data.Config, Driver: data.Driver, + IsDelete: data.IsDelete, IsStop: data.IsStop, Version: data.UpdateAt.Format("20060102150405"), }) } -func (i *imlStrategyService) GetStrategyCommit(ctx context.Context, commitId string) (*commit.Commit[StrategyCommit], error) { +func (i *imlStrategyService) GetStrategyCommit(ctx context.Context, commitId string) (*commit.Commit[Commit], error) { return i.commitService.Get(ctx, commitId) } -func (i *imlStrategyService) LatestStrategyCommit(ctx context.Context, scope string, target string, strategyId string) (*commit.Commit[StrategyCommit], error) { +func (i *imlStrategyService) LatestStrategyCommit(ctx context.Context, scope string, target string, strategyId string) (*commit.Commit[Commit], error) { key := scope if target != "" { key = fmt.Sprintf("%s-%s", scope, target) @@ -97,7 +98,7 @@ func (i *imlStrategyService) LatestStrategyCommit(ctx context.Context, scope str return i.commitService.Latest(ctx, strategyId, key) } -func (i *imlStrategyService) ListLatestStrategyCommit(ctx context.Context, scope string, target string, strategyIds ...string) ([]*commit.Commit[StrategyCommit], error) { +func (i *imlStrategyService) ListLatestStrategyCommit(ctx context.Context, scope string, target string, strategyIds ...string) ([]*commit.Commit[Commit], error) { key := scope if target != "" { key = fmt.Sprintf("%s-%s", scope, target) @@ -105,7 +106,7 @@ func (i *imlStrategyService) ListLatestStrategyCommit(ctx context.Context, scope return i.commitService.ListLatest(ctx, key, strategyIds...) } -func (i *imlStrategyService) ListStrategyCommit(ctx context.Context, commitIds ...string) ([]*commit.Commit[StrategyCommit], error) { +func (i *imlStrategyService) ListStrategyCommit(ctx context.Context, commitIds ...string) ([]*commit.Commit[Commit], error) { if len(commitIds) < 1 { return nil, fmt.Errorf("commit ids is empty") } diff --git a/service/strategy/model.go b/service/strategy/model.go index b6217850..dab4ddd3 100644 --- a/service/strategy/model.go +++ b/service/strategy/model.go @@ -65,13 +65,14 @@ type Edit struct { IsStop *bool } -type StrategyCommit struct { +type Commit struct { Id string Name string Priority int Filters string Config string Driver string + IsDelete bool IsStop bool Version string } diff --git a/service/strategy/service.go b/service/strategy/service.go index 24501de9..5b6209f3 100644 --- a/service/strategy/service.go +++ b/service/strategy/service.go @@ -25,15 +25,15 @@ type IStrategyService interface { Restore(ctx context.Context, id string) error CommitStrategy(ctx context.Context, scope string, target string, strategyId string, data *Strategy) error - GetStrategyCommit(ctx context.Context, commitId string) (*commit.Commit[StrategyCommit], error) - LatestStrategyCommit(ctx context.Context, scope string, target string, strategyId string) (*commit.Commit[StrategyCommit], error) - ListLatestStrategyCommit(ctx context.Context, scope string, target string, strategyIds ...string) ([]*commit.Commit[StrategyCommit], error) - ListStrategyCommit(ctx context.Context, commitIds ...string) ([]*commit.Commit[StrategyCommit], error) + GetStrategyCommit(ctx context.Context, commitId string) (*commit.Commit[Commit], error) + LatestStrategyCommit(ctx context.Context, scope string, target string, strategyId string) (*commit.Commit[Commit], error) + ListLatestStrategyCommit(ctx context.Context, scope string, target string, strategyIds ...string) ([]*commit.Commit[Commit], error) + ListStrategyCommit(ctx context.Context, commitIds ...string) ([]*commit.Commit[Commit], error) } func init() { autowire.Auto[IStrategyService](func() reflect.Value { return reflect.ValueOf(new(imlStrategyService)) }) - commit.InitCommitService[StrategyCommit]("strategy") + commit.InitCommitService[Commit]("strategy") } diff --git a/stores/cluster/model.go b/stores/cluster/model.go index abd3e459..62bc8d3b 100644 --- a/stores/cluster/model.go +++ b/stores/cluster/model.go @@ -3,10 +3,10 @@ package cluster import "time" type Cluster struct { - Id int64 `gorm:"column:id;type:BIGINT(20);AUTO_INCREMENT;NOT NULL;comment:id;primary_key;comment:主键ID;"` - UUID string `gorm:"type:varchar(36);not null;column:uuid;uniqueIndex:uuid;comment:UUID;"` - Name string `gorm:"type:varchar(100);not null;column:name;comment:name"` - //Cluster string `gorm:"type:varchar(36);not null;column:partition;comment:partition"` + Id int64 `gorm:"column:id;type:BIGINT(20);AUTO_INCREMENT;NOT NULL;comment:id;primary_key;comment:主键ID;"` + UUID string `gorm:"type:varchar(36);not null;column:uuid;uniqueIndex:uuid;comment:UUID;"` + Name string `gorm:"type:varchar(100);not null;column:name;comment:name"` + Cluster string `gorm:"type:varchar(36);not null;column:cluster;comment:cluster id"` Resume string `gorm:"type:varchar(255);not null;column:resume;comment:resume"` Creator string `gorm:"type:varchar(36);not null;column:creator;comment:creator" aovalue:"creator"` Updater string `gorm:"type:varchar(36);not null;column:updater;comment:updater" aovalue:"updater"` diff --git a/stores/log-source/model.go b/stores/log-source/model.go new file mode 100644 index 00000000..b3214cb1 --- /dev/null +++ b/stores/log-source/model.go @@ -0,0 +1,23 @@ +package log_source + +import "time" + +type Log struct { + Id int64 `gorm:"column:id;type:BIGINT(20);AUTO_INCREMENT;NOT NULL;comment:id;primary_key;comment:主键ID;"` + UUID string `gorm:"type:varchar(36);not null;column:uuid;uniqueIndex:uuid;comment:UUID;"` + Cluster string `gorm:"column:cluster;type:varchar(36);NOT NULL;comment:集群ID"` + Driver string `gorm:"column:driver;type:VARCHAR(36);NOT NULL;comment:驱动"` + Config string `gorm:"column:config;type:TEXT;NOT NULL;comment:配置"` + Creator string `gorm:"type:varchar(36);not null;column:creator;comment:creator" aovalue:"creator"` + Updater string `gorm:"type:varchar(36);not null;column:updater;comment:updater" aovalue:"updater"` + CreateAt time.Time `gorm:"type:timestamp;NOT NULL;DEFAULT:CURRENT_TIMESTAMP;column:create_at;comment:创建时间"` + UpdateAt time.Time `gorm:"type:timestamp;NOT NULL;DEFAULT:CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;column:update_at;comment:修改时间" json:"update_at"` +} + +func (c *Log) IdValue() int64 { + return c.Id +} + +func (c *Log) TableName() string { + return "log" +} diff --git a/stores/log-source/store.go b/stores/log-source/store.go new file mode 100644 index 00000000..7ccfd599 --- /dev/null +++ b/stores/log-source/store.go @@ -0,0 +1,22 @@ +package log_source + +import ( + "reflect" + + "github.com/eolinker/go-common/autowire" + "github.com/eolinker/go-common/store" +) + +type ILogSourceStore interface { + store.IBaseStore[Log] +} + +type storeLogSource struct { + store.Store[Log] +} + +func init() { + autowire.Auto[ILogSourceStore](func() reflect.Value { + return reflect.ValueOf(new(storeLogSource)) + }) +}