mirror of
https://github.com/APIParkLab/APIPark.git
synced 2026-06-04 10:13:53 +08:00
395 lines
12 KiB
Go
395 lines
12 KiB
Go
package strategy
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/APIParkLab/APIPark/service/service"
|
|
|
|
"github.com/eolinker/go-common/auto"
|
|
|
|
"github.com/APIParkLab/APIPark/service/cluster"
|
|
|
|
"github.com/APIParkLab/APIPark/gateway"
|
|
"github.com/eolinker/eosc"
|
|
|
|
log2 "github.com/APIParkLab/APIPark/service/log"
|
|
"github.com/eolinker/eosc/log"
|
|
"gorm.io/gorm"
|
|
|
|
strategy_filter "github.com/APIParkLab/APIPark/strategy-filter"
|
|
|
|
"github.com/eolinker/go-common/store"
|
|
|
|
"github.com/APIParkLab/APIPark/service/universally/commit"
|
|
|
|
"github.com/eolinker/go-common/utils"
|
|
|
|
"github.com/google/uuid"
|
|
|
|
strategy_driver "github.com/APIParkLab/APIPark/module/strategy/driver"
|
|
|
|
strategy_dto "github.com/APIParkLab/APIPark/module/strategy/dto"
|
|
|
|
"github.com/APIParkLab/APIPark/service/strategy"
|
|
)
|
|
|
|
var _ IStrategyModule = (*imlStrategyModule)(nil)
|
|
|
|
type imlStrategyModule struct {
|
|
strategyService strategy.IStrategyService `autowired:""`
|
|
appService service.IServiceService `autowired:""`
|
|
logService log2.ILogService `autowired:""`
|
|
clusterService cluster.IClusterService `autowired:""`
|
|
transaction store.ITransaction `autowired:""`
|
|
}
|
|
|
|
func (i *imlStrategyModule) StrategyLogInfo(ctx context.Context, id string) (*strategy_dto.LogInfo, error) {
|
|
c, err := i.clusterService.Get(ctx, cluster.DefaultClusterID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cluster %s not found", cluster.DefaultClusterID)
|
|
}
|
|
|
|
info, err := i.logService.LogInfo(ctx, "loki", c.Cluster, id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &strategy_dto.LogInfo{
|
|
ID: info.ID,
|
|
ContentType: info.ContentType,
|
|
ProxyResponseBody: info.ProxyResponseBody,
|
|
ResponseBody: info.ResponseBody,
|
|
}, nil
|
|
}
|
|
|
|
func (i *imlStrategyModule) GetStrategyLogs(ctx context.Context, keyword string, strategyID string, start time.Time, end time.Time, limit int64, offset int64) ([]*strategy_dto.LogItem, int64, error) {
|
|
if strategyID == "" {
|
|
return nil, 0, errors.New("strategy id required")
|
|
}
|
|
conditions := map[string]string{
|
|
"strategy": strategyID,
|
|
}
|
|
if keyword != "" {
|
|
// 查询符合条件的应用ID
|
|
apps, err := i.appService.Search(ctx, keyword, map[string]interface{}{
|
|
"as_app": true,
|
|
})
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
orCondition := fmt.Sprintf("request_uri =~ \".*%s.*\"", keyword)
|
|
if len(apps) > 0 {
|
|
appIds := utils.SliceToSlice(apps, func(a *service.Service) string { return a.Id })
|
|
orCondition = fmt.Sprintf("%s or application =~ \"%s\"", orCondition, strings.Join(appIds, "|"))
|
|
}
|
|
conditions["#1"] = orCondition
|
|
}
|
|
|
|
c, err := i.clusterService.Get(ctx, cluster.DefaultClusterID)
|
|
if err != nil {
|
|
return nil, 0, fmt.Errorf("cluster %s not found", cluster.DefaultClusterID)
|
|
}
|
|
items, total, err := i.logService.Logs(ctx, "loki", c.Cluster, conditions, start, end, limit, offset)
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
result := make([]*strategy_dto.LogItem, 0, len(items))
|
|
for _, item := range items {
|
|
result = append(result, &strategy_dto.LogItem{
|
|
ID: item.ID,
|
|
Service: auto.UUID(item.Service),
|
|
Method: item.Method,
|
|
Url: item.Url,
|
|
RemoteIP: item.RemoteIP,
|
|
Consumer: auto.UUID(item.Consumer),
|
|
Authorization: auto.UUID(item.Authorization),
|
|
RecordTime: auto.TimeLabel(item.RecordTime),
|
|
})
|
|
}
|
|
return result, total, nil
|
|
}
|
|
|
|
func (i *imlStrategyModule) Restore(ctx context.Context, id string) error {
|
|
return i.strategyService.Restore(ctx, id)
|
|
}
|
|
|
|
func (i *imlStrategyModule) DeleteServiceStrategy(ctx context.Context, serviceId string, id string) error {
|
|
_, err := i.strategyService.LatestStrategyCommit(ctx, strategy_dto.ScopeService, serviceId, id)
|
|
if err != nil {
|
|
// 判断是否已经发布,如果未发布则直接删除
|
|
if !errors.Is(err, gorm.ErrRecordNotFound) {
|
|
return err
|
|
}
|
|
return i.strategyService.Delete(ctx, id)
|
|
}
|
|
return i.strategyService.SortDelete(ctx, id)
|
|
}
|
|
|
|
func (i *imlStrategyModule) ToPublish(ctx context.Context, driver string) ([]*strategy_dto.ToPublishItem, error) {
|
|
scope := strategy_dto.ToScope(strategy_dto.ScopeGlobal)
|
|
list, err := i.strategyService.SearchAllByDriver(ctx, "", driver, scope.Int(), "")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
strategyIds := utils.SliceToSlice(list, func(l *strategy.Strategy) string { return l.Id })
|
|
commits, err := i.strategyService.ListLatestStrategyCommit(ctx, scope.String(), "", strategyIds...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
commitMap := utils.SliceToMapO(commits, func(c *commit.Commit[strategy.Commit]) (string, string) { return c.Data.Id, c.Data.Version })
|
|
items := make([]*strategy_dto.ToPublishItem, 0, len(list))
|
|
for _, l := range list {
|
|
status := strategy_dto.StrategyStatus(l, commitMap[l.Id])
|
|
if status == strategy_dto.PublishStatusOnline {
|
|
continue
|
|
}
|
|
items = append(items, &strategy_dto.ToPublishItem{
|
|
Name: l.Name,
|
|
Priority: l.Priority,
|
|
Status: status,
|
|
OptTime: l.UpdateAt,
|
|
})
|
|
}
|
|
sort.Slice(items, func(i, j int) bool {
|
|
return items[i].Priority < items[j].Priority
|
|
})
|
|
return items, nil
|
|
}
|
|
|
|
func (i *imlStrategyModule) Search(ctx context.Context, keyword string, driver string, scope strategy_dto.Scope, target string, page int, pageSize int, filters []string, order ...string) ([]*strategy_dto.StrategyItem, int64, error) {
|
|
list, total, err := i.strategyService.SearchByDriver(ctx, keyword, driver, scope.Int(), target, page, pageSize, filters, order...)
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
if len(list) < 1 {
|
|
return nil, 0, nil
|
|
}
|
|
strategyIds := utils.SliceToSlice(list, func(l *strategy.Strategy) string { return l.Id })
|
|
commits, err := i.strategyService.ListLatestStrategyCommit(ctx, scope.String(), target, strategyIds...)
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
commitMap := utils.SliceToMapO(commits, func(c *commit.Commit[strategy.Commit]) (string, string) { return c.Data.Id, c.Data.Version })
|
|
items := make([]*strategy_dto.StrategyItem, 0, len(list))
|
|
countMap := make(map[string]int64)
|
|
c, err := i.clusterService.Get(ctx, cluster.DefaultClusterID)
|
|
if err == nil {
|
|
countMap, err = i.logService.LogCount(ctx, "loki", c.Cluster, map[string]string{
|
|
"#1": fmt.Sprintf("strategy =~ \"%s\"", strings.Join(strategyIds, "|")),
|
|
}, 720,
|
|
"strategy")
|
|
if err != nil {
|
|
log.Errorf("get log count error: %v", err)
|
|
}
|
|
}
|
|
|
|
for _, l := range list {
|
|
fs := make([]*strategy_dto.Filter, 0)
|
|
|
|
json.Unmarshal([]byte(l.Filters), &fs)
|
|
filterList := make([]string, 0, len(fs))
|
|
for _, f := range fs {
|
|
info, err := strategy_filter.FilterLabel(f.Name, f.Values)
|
|
if err != nil {
|
|
log.Errorf("get filter label error: %v", err)
|
|
continue
|
|
}
|
|
filterList = append(filterList, fmt.Sprintf("[%s:%s]", info.Title, info.Label))
|
|
}
|
|
item := strategy_dto.ToStrategyItem(l, commitMap[l.Id], strings.Join(filterList, ";"), countMap[l.Id])
|
|
items = append(items, item)
|
|
}
|
|
sort.Slice(items, func(i, j int) bool {
|
|
return items[i].Priority < items[j].Priority
|
|
})
|
|
return items, total, nil
|
|
}
|
|
|
|
func (i *imlStrategyModule) Get(ctx context.Context, id string) (*strategy_dto.Strategy, error) {
|
|
info, err := i.strategyService.Get(ctx, id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
s := strategy_dto.ToStrategy(info)
|
|
for _, f := range s.Filters {
|
|
ff, has := strategy_filter.FilterGet(f.Name)
|
|
if !has {
|
|
return nil, fmt.Errorf("filter not found: %s", f.Name)
|
|
}
|
|
f.Title = ff.Title()
|
|
f.Type = ff.Type()
|
|
f.Label = strings.Join(ff.Labels(f.Values...), ",")
|
|
}
|
|
return s, nil
|
|
}
|
|
|
|
func (i *imlStrategyModule) Create(ctx context.Context, input *strategy_dto.Create) error {
|
|
if input.Name == "" {
|
|
return fmt.Errorf("name required")
|
|
}
|
|
if input.ID == "" {
|
|
input.ID = uuid.NewString()
|
|
}
|
|
|
|
if input.Priority < 1 {
|
|
input.Priority = 1000
|
|
}
|
|
err := strategy_filter.CheckFilters(input.Driver, input.Scope, input.Filters)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = strategy_driver.CheckConfig(input.Driver, input.Config)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
filters, _ := json.Marshal(input.Filters)
|
|
cfg, _ := json.Marshal(input.Config)
|
|
return i.strategyService.Create(ctx, &strategy.Create{
|
|
Id: input.ID,
|
|
Name: input.Name,
|
|
Priority: input.Priority,
|
|
Desc: input.Desc,
|
|
Filters: string(filters),
|
|
Config: string(cfg),
|
|
Scope: input.Scope.Int(),
|
|
Target: input.Target,
|
|
Driver: input.Driver,
|
|
})
|
|
}
|
|
|
|
func (i *imlStrategyModule) Edit(ctx context.Context, id string, input *strategy_dto.Edit) error {
|
|
if input.Name != nil && *input.Name == "" {
|
|
return fmt.Errorf("name required")
|
|
}
|
|
info, err := i.strategyService.Get(ctx, id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if input.Priority != nil && *input.Priority < 1 {
|
|
*input.Priority = 1000
|
|
}
|
|
filters := info.Filters
|
|
if input.Filters != nil {
|
|
err = strategy_filter.CheckFilters(info.Driver, strategy_dto.Scope(info.Scope), *input.Filters)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
data, _ := json.Marshal(input.Filters)
|
|
filters = string(data)
|
|
}
|
|
cfg := info.Config
|
|
if input.Config != nil {
|
|
err = strategy_driver.CheckConfig(info.Driver, input.Config)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
data, _ := json.Marshal(input.Config)
|
|
cfg = string(data)
|
|
}
|
|
|
|
return i.strategyService.Save(ctx, id, &strategy.Edit{
|
|
Name: input.Name,
|
|
Priority: input.Priority,
|
|
Desc: input.Desc,
|
|
Filters: &filters,
|
|
Config: &cfg,
|
|
})
|
|
}
|
|
|
|
func (i *imlStrategyModule) Enable(ctx context.Context, id string) error {
|
|
stop := false
|
|
return i.strategyService.Save(ctx, id, &strategy.Edit{IsStop: &stop})
|
|
}
|
|
|
|
func (i *imlStrategyModule) Disable(ctx context.Context, id string) error {
|
|
stop := true
|
|
return i.strategyService.Save(ctx, id, &strategy.Edit{IsStop: &stop})
|
|
}
|
|
|
|
func (i *imlStrategyModule) Publish(ctx context.Context, driver string, scope string, target string) error {
|
|
d, has := strategy_driver.GetDriver(driver)
|
|
if !has {
|
|
return fmt.Errorf("driver not found: %s", driver)
|
|
}
|
|
list, err := i.strategyService.AllByDriver(ctx, driver, strategy_dto.ToScope(scope).Int(), target)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return i.transaction.Transaction(ctx, func(txCtx context.Context) error {
|
|
publishStrategies := make([]*eosc.Base[gateway.StrategyRelease], 0, len(list))
|
|
for _, l := range list {
|
|
if l.IsDelete {
|
|
err = i.strategyService.Delete(ctx, l.Id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
publishStrategies = append(publishStrategies, d.ToRelease(strategy_dto.ToStrategy(l), nil, 5000))
|
|
|
|
err = i.strategyService.CommitStrategy(txCtx, scope, target, l.Id, l)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
client, err := i.clusterService.GatewayClient(ctx, cluster.DefaultClusterID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() {
|
|
_ = client.Close(ctx)
|
|
}()
|
|
return client.Strategy().Online(ctx, publishStrategies...)
|
|
})
|
|
}
|
|
|
|
func (i *imlStrategyModule) Delete(ctx context.Context, id string) error {
|
|
_, err := i.strategyService.LatestStrategyCommit(ctx, strategy_dto.ScopeGlobal, "", id)
|
|
if err != nil {
|
|
if !errors.Is(err, gorm.ErrRecordNotFound) {
|
|
return nil
|
|
}
|
|
return i.strategyService.Delete(ctx, id)
|
|
}
|
|
return i.strategyService.SortDelete(ctx, id)
|
|
}
|
|
|
|
func (i *imlStrategyModule) initGateway(ctx context.Context, clusterId string, clientDriver gateway.IClientDriver) error {
|
|
commits, err := i.strategyService.ListLatestStrategyCommit(ctx, strategy_dto.ScopeGlobal, "")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
publishStrategies := make([]*eosc.Base[gateway.StrategyRelease], 0, len(commits))
|
|
for _, c := range commits {
|
|
l := c.Data
|
|
if l.IsDelete {
|
|
err = i.strategyService.Delete(ctx, l.Id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
d, has := strategy_driver.GetDriver(l.Driver)
|
|
if !has {
|
|
continue
|
|
}
|
|
publishStrategies = append(publishStrategies, d.ToRelease(strategy_dto.ToStrategy(&strategy.Strategy{
|
|
Id: l.Id,
|
|
Name: l.Name,
|
|
Priority: l.Priority,
|
|
Filters: l.Filters,
|
|
Config: l.Config,
|
|
Driver: l.Driver,
|
|
IsStop: l.IsStop,
|
|
IsDelete: l.IsDelete,
|
|
}), nil, 5000))
|
|
}
|
|
|
|
return clientDriver.Strategy().Online(ctx, publishStrategies...)
|
|
}
|