mirror of
https://github.com/APIParkLab/APIPark.git
synced 2026-06-04 10:13:53 +08:00
400 lines
9.7 KiB
Go
400 lines
9.7 KiB
Go
package log
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/eolinker/go-common/server"
|
|
|
|
log_driver "github.com/APIParkLab/APIPark/log-driver"
|
|
"github.com/eolinker/go-common/register"
|
|
"github.com/eolinker/go-common/utils"
|
|
|
|
"github.com/APIParkLab/APIPark/gateway"
|
|
|
|
"github.com/eolinker/go-common/store"
|
|
|
|
"gorm.io/gorm"
|
|
|
|
"github.com/APIParkLab/APIPark/service/cluster"
|
|
|
|
log_dto "github.com/APIParkLab/APIPark/module/log/dto"
|
|
"github.com/APIParkLab/APIPark/service/log"
|
|
eosc_log "github.com/eolinker/eosc/log"
|
|
log_print "github.com/eolinker/eosc/log"
|
|
"github.com/eolinker/go-common/auto"
|
|
)
|
|
|
|
var _ ILogModule = (*imlLogModule)(nil)
|
|
|
|
type imlLogModule struct {
|
|
service log.ILogService `autowired:""`
|
|
clusterService cluster.IClusterService `autowired:""`
|
|
|
|
transaction store.ITransaction `autowired:""`
|
|
//scheduleCtx context.Context
|
|
scheduleCancel context.CancelFunc
|
|
}
|
|
|
|
var labels = map[string]string{
|
|
"cluster": "$cluster",
|
|
"node": "$node",
|
|
}
|
|
var logFormatter = map[string]interface{}{
|
|
"fields": []string{
|
|
"$msec",
|
|
"$service",
|
|
"$provider",
|
|
"$scheme as request_scheme",
|
|
"$url as request_uri",
|
|
"$host as request_host",
|
|
"$header as request_header",
|
|
"$remote_addr",
|
|
"$request_body",
|
|
"$proxy_body",
|
|
"$proxy_method",
|
|
"$proxy_scheme",
|
|
"$proxy_uri",
|
|
"$api",
|
|
"$proxy_host",
|
|
"$proxy_header",
|
|
"$proxy_addr",
|
|
"$response_header",
|
|
"$response_headers",
|
|
"$status",
|
|
"$content_type",
|
|
"$proxy_status",
|
|
"$request_time",
|
|
"$response_time",
|
|
"$node",
|
|
"$cluster",
|
|
"$application",
|
|
"$src_ip",
|
|
"$block_name as strategy",
|
|
"$request_id",
|
|
"$request_method",
|
|
"$authorization",
|
|
"$response_body",
|
|
"$proxy_response_body",
|
|
"$ai_provider",
|
|
"$ai_model",
|
|
"$ai_model_input_token",
|
|
"$ai_model_output_token",
|
|
"$ai_model_total_token",
|
|
},
|
|
}
|
|
|
|
func (i *imlLogModule) Save(ctx context.Context, driver string, input *log_dto.Save) error {
|
|
factory, has := log_driver.GetFactory(driver)
|
|
if !has {
|
|
return errors.New("driver not found")
|
|
}
|
|
input.Cluster = cluster.DefaultClusterID
|
|
var cfg *string
|
|
if input.Config != nil {
|
|
data, _ := json.Marshal(input.Config)
|
|
tmp := string(data)
|
|
cfg = &tmp
|
|
}
|
|
return i.transaction.Transaction(ctx, func(txCtx context.Context) error {
|
|
err := i.service.UpdateLogSource(txCtx, driver, &log.Save{
|
|
ID: input.ID,
|
|
Cluster: &input.Cluster,
|
|
Config: cfg,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
info, err := i.service.GetLogSource(txCtx, driver)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
d, c, err := factory.Create(info.Config)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
client, err := i.clusterService.GatewayClient(txCtx, input.Cluster)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer client.Close(txCtx)
|
|
dynamicClient, err := client.Dynamic(driver)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
attr := make(map[string]interface{})
|
|
attr["driver"] = driver
|
|
attr["formatter"] = logFormatter
|
|
attr["labels"] = labels
|
|
attr["method"] = "POST"
|
|
attr["scopes"] = []string{"access_log"}
|
|
attr["type"] = "json"
|
|
for k, v := range c {
|
|
attr[k] = v
|
|
}
|
|
err = dynamicClient.Online(txCtx, &gateway.DynamicRelease{
|
|
BasicItem: &gateway.BasicItem{
|
|
ID: driver,
|
|
Description: "collect access log",
|
|
Version: time.Now().Format("20060102150405"),
|
|
Resource: gateway.ProfessionOutput,
|
|
},
|
|
Attr: attr,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
log_driver.SetDriver(driver, d)
|
|
newCtx, cancel := context.WithCancel(context.Background())
|
|
newCtx = utils.SetUserId(newCtx, "admin")
|
|
i.scheduleCancel()
|
|
i.scheduleCancel = cancel
|
|
i.scheduleUpdateLogRecord(newCtx)
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func (i *imlLogModule) Get(ctx context.Context, driver string) (*log_dto.LogSource, error) {
|
|
info, err := i.service.GetLogSource(ctx, driver)
|
|
if err != nil {
|
|
if !errors.Is(err, gorm.ErrRecordNotFound) {
|
|
return nil, err
|
|
}
|
|
return nil, nil
|
|
}
|
|
cfg := make(map[string]interface{})
|
|
if info.Config != "" {
|
|
err = json.Unmarshal([]byte(info.Config), &cfg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
return &log_dto.LogSource{
|
|
ID: info.ID,
|
|
Config: cfg,
|
|
Creator: auto.UUID(info.Creator),
|
|
Updater: auto.UUID(info.Updater),
|
|
CreateAt: auto.TimeLabel(info.CreateAt),
|
|
UpdateAt: auto.TimeLabel(info.UpdateAt),
|
|
}, nil
|
|
}
|
|
|
|
func (i *imlLogModule) OnInit() {
|
|
register.Handle(func(v server.Server) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
ctx = utils.SetUserId(ctx, "admin")
|
|
//i.scheduleCtx = ctx
|
|
i.scheduleCancel = cancel
|
|
i.scheduleUpdateLogRecord(ctx)
|
|
|
|
})
|
|
}
|
|
|
|
func (i *imlLogModule) initGateway(ctx context.Context, clusterId string, clientDriver gateway.IClientDriver) error {
|
|
drivers := log_driver.Drivers()
|
|
if len(drivers) < 1 {
|
|
return nil
|
|
}
|
|
|
|
for _, driver := range drivers {
|
|
factory, has := log_driver.GetFactory(driver)
|
|
if !has {
|
|
log_print.Errorf("driver %s not found", driver)
|
|
continue
|
|
}
|
|
info, err := i.service.GetLogSource(ctx, driver)
|
|
if err != nil {
|
|
log_print.Errorf("get log source %s error: %s", driver, err)
|
|
continue
|
|
}
|
|
d, c, err := factory.Create(info.Config)
|
|
if err != nil {
|
|
log_print.Errorf("create driver %s error: %s,config: %s", driver, err, info.Config)
|
|
continue
|
|
}
|
|
log_driver.SetDriver(driver, d)
|
|
dynamicClient, err := clientDriver.Dynamic(driver)
|
|
if err != nil {
|
|
log_print.Errorf("get dynamic client %s error: %s", driver, err)
|
|
continue
|
|
}
|
|
attr := make(map[string]interface{})
|
|
attr["driver"] = driver
|
|
attr["formatter"] = logFormatter
|
|
attr["labels"] = labels
|
|
attr["method"] = "POST"
|
|
for k, v := range c {
|
|
attr[k] = v
|
|
}
|
|
err = dynamicClient.Online(ctx, &gateway.DynamicRelease{
|
|
BasicItem: &gateway.BasicItem{
|
|
ID: driver,
|
|
Description: "collect access log",
|
|
Version: time.Now().Format("20060102150405"),
|
|
Resource: gateway.ProfessionOutput,
|
|
},
|
|
Attr: attr,
|
|
})
|
|
if err != nil {
|
|
log_print.Errorf("online driver %s error: %s", driver, err)
|
|
continue
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
const (
|
|
oneSecond = 1
|
|
oneMinute = 60
|
|
oneHour = 60 * oneMinute
|
|
oneDay = 24 * oneHour
|
|
)
|
|
|
|
// 定时更新历史记录
|
|
func (i *imlLogModule) scheduleUpdateLogRecord(ctx context.Context) {
|
|
driver, has := log_driver.GetDriver("loki")
|
|
if !has {
|
|
eosc_log.Error("driver loki not found")
|
|
return
|
|
}
|
|
info, err := i.service.GetLogSource(ctx, "loki")
|
|
if err != nil {
|
|
if !errors.Is(err, gorm.ErrRecordNotFound) {
|
|
eosc_log.Errorf("get log source loki error: %s", err)
|
|
return
|
|
}
|
|
return
|
|
}
|
|
now := time.Now()
|
|
before90Days := now.Add(-7 * 24 * time.Hour)
|
|
beginTime := before90Days
|
|
if info.LastPullTime.After(before90Days) {
|
|
before90Days = info.LastPullTime
|
|
}
|
|
pauseTime := now
|
|
historyFinish := false
|
|
go func() {
|
|
eosc_log.Infof("start update history log record,start time: %s", beginTime.Format("2006-01-02 15:04:05"))
|
|
ticket := time.NewTicker(1 * time.Minute)
|
|
defer ticket.Stop()
|
|
for {
|
|
now = time.Now()
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticket.C:
|
|
switch {
|
|
case now.Sub(beginTime) > oneDay:
|
|
endTime := beginTime.Add(oneDay)
|
|
err = i.updateLogRecord(ctx, driver, beginTime, endTime)
|
|
if err != nil {
|
|
eosc_log.Errorf("update log record error: %s", err)
|
|
continue
|
|
}
|
|
err = i.service.UpdateLogSource(ctx, "loki", &log.Save{
|
|
LastPullTime: &endTime,
|
|
})
|
|
if err != nil {
|
|
eosc_log.Errorf("update log source error: %s", err)
|
|
continue
|
|
}
|
|
beginTime = endTime
|
|
case now.Sub(pauseTime) <= oneDay:
|
|
endTime := pauseTime
|
|
err = i.updateLogRecord(ctx, driver, beginTime, endTime)
|
|
if err != nil {
|
|
eosc_log.Errorf("update log record error: %s", err)
|
|
historyFinish = true
|
|
return
|
|
}
|
|
historyFinish = true
|
|
err = i.service.UpdateLogSource(ctx, "loki", &log.Save{
|
|
LastPullTime: &endTime,
|
|
})
|
|
if err != nil {
|
|
eosc_log.Errorf("update log source error: %s", err)
|
|
return
|
|
}
|
|
eosc_log.Infof("update log record finish")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
go func() {
|
|
eosc_log.Infof("start update running log record,start time: %s", pauseTime.Format("2006-01-02 15:04:05"))
|
|
ticket := time.NewTicker(10 * time.Second)
|
|
defer ticket.Stop()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticket.C:
|
|
end := time.Now()
|
|
start := end.Add(-1 * time.Minute)
|
|
err = i.updateLogRecord(ctx, driver, start, end)
|
|
if err != nil {
|
|
eosc_log.Errorf("update log record error: %s", err)
|
|
continue
|
|
}
|
|
if historyFinish {
|
|
err = i.service.UpdateLogSource(ctx, "loki", &log.Save{
|
|
LastPullTime: &end,
|
|
})
|
|
if err != nil {
|
|
eosc_log.Errorf("update log source error: %s", err)
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
}
|
|
|
|
func (i *imlLogModule) updateLogRecord(ctx context.Context, driver log_driver.ILogDriver, start, end time.Time) error {
|
|
c, err := i.clusterService.Get(ctx, cluster.DefaultClusterID)
|
|
if err != nil {
|
|
return fmt.Errorf("cluster %s not found", cluster.DefaultClusterID)
|
|
}
|
|
logs, err := driver.LogRecords(c.Cluster, start, end)
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("get log records error: %s", err)
|
|
}
|
|
for _, l := range logs {
|
|
err = i.service.InsertLog(ctx, "loki", &log.InsertLog{
|
|
ID: l.ID,
|
|
Driver: "loki",
|
|
Strategy: l.Strategy,
|
|
API: l.API,
|
|
Service: l.Service,
|
|
Method: l.Method,
|
|
Url: l.Url,
|
|
RemoteIP: l.RemoteIP,
|
|
Consumer: l.Consumer,
|
|
Authorization: l.Authorization,
|
|
InputToken: l.InputToken,
|
|
OutputToken: l.OutputToken,
|
|
TotalToken: l.TotalToken,
|
|
AIProvider: l.AIProvider,
|
|
AIModel: l.AIModel,
|
|
StatusCode: l.StatusCode,
|
|
ResponseTime: l.ResponseTime,
|
|
Traffic: l.Traffic,
|
|
RecordTime: l.RecordTime,
|
|
})
|
|
if err != nil {
|
|
eosc_log.Errorf("insert log record error: %s,log id: %s", err, l.ID)
|
|
continue
|
|
}
|
|
}
|
|
return nil
|
|
}
|