finish log list

This commit is contained in:
Liujian
2025-04-29 17:44:53 +08:00
parent 771c86229d
commit a3bebde83c
19 changed files with 676 additions and 122 deletions
+62
View File
@@ -0,0 +1,62 @@
package common
import (
"fmt"
"strconv"
)
func FormatCount(count int64) string {
switch {
case count < 1000:
return strconv.FormatInt(count, 10)
case count < 1000000:
return fmt.Sprintf("%.1fK", float64(count)/1000)
case count < 1000000000:
return fmt.Sprintf("%.1fM", float64(count)/1000000)
case count < 1000000000000:
return fmt.Sprintf("%.1fB", float64(count)/1000000000)
default:
return fmt.Sprintf("%.1fT", float64(count)/1000000000000)
}
}
func FormatTime(t int64) string {
if t < 1000 {
return strconv.FormatInt(t, 10) + "ms"
}
if t < 1000000 {
return fmt.Sprintf("%.1fs", float64(t)/1000)
}
if t < 1000000000 {
return fmt.Sprintf("%.1fmin", float64(t)/1000000)
}
if t < 1000000000000 {
return fmt.Sprintf("%.1fhour", float64(t)/1000000000)
}
return fmt.Sprintf("%.1D", float64(t)/1000000000000)
}
func FormatByte(b int64) string {
const (
KB = 1000
MB = KB * 1000
GB = MB * 1000
TB = GB * 1000
PB = TB * 1000
)
switch {
case b < KB:
return fmt.Sprintf("%dB", b)
case b < MB:
return fmt.Sprintf("%.1fKB", float64(b)/KB)
case b < GB:
return fmt.Sprintf("%.1fMB", float64(b)/MB)
case b < TB:
return fmt.Sprintf("%.1fGB", float64(b)/GB)
case b < PB:
return fmt.Sprintf("%.1fTB", float64(b)/TB)
default:
return fmt.Sprintf("%.1fPB", float64(b)/PB)
}
}
+52 -4
View File
@@ -59,10 +59,6 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
) )
//var (
// ollamaConfig = "{\n \"mirostat\": 0,\n \"mirostat_eta\": 0.1,\n \"mirostat_tau\": 5.0,\n \"num_ctx\": 4096,\n \"repeat_last_n\":64,\n \"repeat_penalty\": 1.1,\n \"temperature\": 0.7,\n \"seed\": 42,\n \"num_predict\": 42,\n \"top_k\": 40,\n \"top_p\": 0.9,\n \"min_p\": 0.5\n}\n"
//)
var ( var (
_ IServiceController = (*imlServiceController)(nil) _ IServiceController = (*imlServiceController)(nil)
@@ -88,6 +84,58 @@ type imlServiceController struct {
transaction store.ITransaction `autowired:""` transaction store.ITransaction `autowired:""`
} }
func (i *imlServiceController) AILogs(ctx *gin.Context, serviceId string, start string, end string, page string, size string) ([]*service_dto.AILogItem, int64, error) {
s, e, err := formatTime(start, end)
if err != nil {
return nil, 0, err
}
if serviceId == "" {
return nil, 0, fmt.Errorf("service id is empty")
}
if page == "" {
page = "1"
}
if size == "" {
size = "20"
}
p, err := strconv.Atoi(page)
if err != nil {
return nil, 0, err
}
ps, err := strconv.Atoi(size)
if err != nil {
return nil, 0, err
}
return i.module.AILogs(ctx, serviceId, s, e, p, ps)
}
func (i *imlServiceController) RestLogs(ctx *gin.Context, serviceId string, start string, end string, page string, size string) ([]*service_dto.RestLogItem, int64, error) {
s, e, err := formatTime(start, end)
if err != nil {
return nil, 0, err
}
if serviceId == "" {
return nil, 0, fmt.Errorf("service id is empty")
}
if page == "" {
page = "1"
}
if size == "" {
size = "20"
}
p, err := strconv.Atoi(page)
if err != nil {
return nil, 0, err
}
ps, err := strconv.Atoi(size)
if err != nil {
return nil, 0, err
}
return i.module.RestLogs(ctx, serviceId, s, e, p, ps)
}
func (i *imlServiceController) ServiceOverview(ctx *gin.Context, serviceId string) (*service_dto.Overview, error) { func (i *imlServiceController) ServiceOverview(ctx *gin.Context, serviceId string) (*service_dto.Overview, error) {
o, err := i.module.ServiceOverview(ctx, serviceId) o, err := i.module.ServiceOverview(ctx, serviceId)
if err != nil { if err != nil {
+4
View File
@@ -41,6 +41,10 @@ type IServiceController interface {
RestChartOverview(ctx *gin.Context, serviceId string, start string, end string) (*monitor_dto.ServiceChartRestOverview, error) RestChartOverview(ctx *gin.Context, serviceId string, start string, end string) (*monitor_dto.ServiceChartRestOverview, error)
ServiceOverview(ctx *gin.Context, serviceId string) (*service_dto.Overview, error) ServiceOverview(ctx *gin.Context, serviceId string) (*service_dto.Overview, error)
AILogs(ctx *gin.Context, serviceId string, start string, end string, page string, size string) ([]*service_dto.AILogItem, int64, error)
RestLogs(ctx *gin.Context, serviceId string, start string, end string, page string, size string) ([]*service_dto.RestLogItem, int64, error)
} }
type IAppController interface { type IAppController interface {
+1
View File
@@ -10,6 +10,7 @@ type ILogDriver interface {
LogInfo(clusterId string, id string) (*LogInfo, error) LogInfo(clusterId string, id string) (*LogInfo, error)
LogCount(clusterId string, conditions map[string]string, spendHour int64, group string) (map[string]int64, error) LogCount(clusterId string, conditions map[string]string, spendHour int64, group string) (map[string]int64, error)
Logs(clusterId string, conditions map[string]string, start time.Time, end time.Time, limit int64, offset int64) ([]*Log, int64, error) Logs(clusterId string, conditions map[string]string, start time.Time, end time.Time, limit int64, offset int64) ([]*Log, int64, error)
LogRecords(clusterId string, start time.Time, end time.Time) ([]*Log, error)
} }
var ( var (
+10
View File
@@ -6,12 +6,22 @@ import (
type Log struct { type Log struct {
ID string ID string
Strategy string
Service string Service string
API string
Method string Method string
Url string Url string
RemoteIP string RemoteIP string
Consumer string Consumer string
Authorization string Authorization string
InputToken int64
OutputToken int64
TotalToken int64
AIProvider string
AIModel string
StatusCode int64
ResponseTime int64
Traffic int64
RecordTime time.Time RecordTime time.Time
} }
+26 -21
View File
@@ -53,25 +53,30 @@ type LogInfo struct {
} }
type LogDetail struct { type LogDetail struct {
Api string `json:"api"` Api string `json:"api"`
Application string `json:"application"` Application string `json:"application"`
Strategy string `json:"strategy"` Strategy string `json:"strategy"`
ContentType string `json:"content_type"` ContentType string `json:"content_type"`
Cluster string `json:"cluster"` Cluster string `json:"cluster"`
Msec string `json:"msec"` Msec string `json:"msec"`
Node string `json:"node"` Node string `json:"node"`
RequestId string `json:"request_id"` RequestId string `json:"request_id"`
RequestMethod string `json:"request_method"` RequestMethod string `json:"request_method"`
RequestScheme string `json:"request_scheme"` RequestScheme string `json:"request_scheme"`
RequestTime string `json:"request_time"` RequestTime string `json:"request_time"`
RequestUri string `json:"request_uri"` RequestUri string `json:"request_uri"`
RequestBody string `json:"request_body"` RequestBody string `json:"request_body"`
ProxyBody string `json:"proxy_body"` ProxyBody string `json:"proxy_body"`
ResponseBody string `json:"response_body"` ResponseBody string `json:"response_body"`
ProxyResponseBody string `json:"proxy_response_body"` ProxyResponseBody string `json:"proxy_response_body"`
Service string `json:"service"` Service string `json:"service"`
Provider string `json:"provider"` Provider string `json:"provider"`
Authorization string `json:"authorization"` Authorization string `json:"authorization"`
SrcIp string `json:"src_ip"` SrcIp string `json:"src_ip"`
Status string `json:"status"` Status string `json:"status"`
AIProvider string `json:"ai_provider"`
AIModel string `json:"ai_model"`
AIModelInputToken interface{} `json:"ai_model_input_token"`
AIModelOutputToken interface{} `json:"ai_model_output_token"`
AIModelTotalToken interface{} `json:"ai_model_total_token"`
} }
+48 -1
View File
@@ -132,6 +132,24 @@ func (d *Driver) LogCount(clusterId string, conditions map[string]string, spendH
return result, nil return result, nil
} }
func (d *Driver) LogRecords(clusterId string, start time.Time, end time.Time) ([]*log_driver.Log, error) {
if start.After(end) {
return nil, fmt.Errorf("start time is greater than end time")
}
queries := url.Values{}
queries.Set("query", fmt.Sprintf("{cluster=\"%s\"} | json", clusterId))
queries.Set("direction", "backward")
queries.Set("start", strconv.FormatInt(start.UnixNano(), 10))
queries.Set("end", strconv.FormatInt(end.UnixNano(), 10))
log.Debug("query is ", queries.Get("query"))
logs, err := d.recuseLogs(queries, end, 1)
if err != nil {
return nil, err
}
return logs, nil
}
func (d *Driver) Logs(clusterId string, conditions map[string]string, start time.Time, end time.Time, limit int64, offset int64) ([]*log_driver.Log, int64, error) { func (d *Driver) Logs(clusterId string, conditions map[string]string, start time.Time, end time.Time, limit int64, offset int64) ([]*log_driver.Log, int64, error) {
if start.After(end) { if start.After(end) {
return nil, 0, fmt.Errorf("start time is greater than end time") return nil, 0, fmt.Errorf("start time is greater than end time")
@@ -205,15 +223,24 @@ func (d *Driver) recuseLogs(queries url.Values, end time.Time, offset int64) ([]
} }
detail := l.Stream detail := l.Stream
msec, _ := strconv.ParseInt(detail.Msec, 10, 64) msec, _ := strconv.ParseInt(detail.Msec, 10, 64)
logs = append(logs, &log_driver.Log{ logs = append(logs, &log_driver.Log{
ID: detail.RequestId, ID: detail.RequestId,
Strategy: detail.Strategy,
Service: detail.Provider, Service: detail.Provider,
API: detail.Api,
Method: detail.RequestMethod, Method: detail.RequestMethod,
Url: detail.RequestUri, Url: detail.RequestUri,
RemoteIP: detail.SrcIp, RemoteIP: detail.SrcIp,
Consumer: detail.Application, Consumer: detail.Application,
Authorization: detail.Authorization, Authorization: detail.Authorization,
InputToken: parseToInt64(detail.AIModelInputToken),
OutputToken: parseToInt64(detail.AIModelOutputToken),
TotalToken: parseToInt64(detail.AIModelTotalToken),
AIProvider: detail.AIProvider,
AIModel: detail.AIModel,
StatusCode: parseToInt64(detail.Status),
ResponseTime: parseToInt64(detail.RequestTime),
Traffic: int64(len(detail.ResponseBody) + len(detail.RequestBody)),
RecordTime: time.UnixMilli(msec), RecordTime: time.UnixMilli(msec),
}) })
} }
@@ -223,6 +250,26 @@ func (d *Driver) recuseLogs(queries url.Values, end time.Time, offset int64) ([]
return logs, nil return logs, nil
} }
func parseToInt64(v interface{}) int64 {
switch t := v.(type) {
case int:
return int64(t)
case int64:
return t
case string:
if v == "" {
return 0
}
i, err := strconv.ParseInt(t, 10, 64)
if err != nil {
return 0
}
return i
default:
return 0
}
}
func (d *Driver) logCount(clusterId string, conditions map[string]string, start time.Time, end time.Time) (int64, error) { func (d *Driver) logCount(clusterId string, conditions map[string]string, start time.Time, end time.Time) (int64, error) {
// 先查在这段时间内符合条件的日志数量 // 先查在这段时间内符合条件的日志数量
queries := url.Values{} queries := url.Values{}
+178 -4
View File
@@ -4,9 +4,14 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt"
"time" "time"
"github.com/eolinker/go-common/server"
log_driver "github.com/APIParkLab/APIPark/log-driver" log_driver "github.com/APIParkLab/APIPark/log-driver"
"github.com/eolinker/go-common/register"
"github.com/eolinker/go-common/utils"
"github.com/APIParkLab/APIPark/gateway" "github.com/APIParkLab/APIPark/gateway"
@@ -16,11 +21,11 @@ import (
"github.com/APIParkLab/APIPark/service/cluster" "github.com/APIParkLab/APIPark/service/cluster"
"github.com/eolinker/go-common/auto"
log_dto "github.com/APIParkLab/APIPark/module/log/dto" log_dto "github.com/APIParkLab/APIPark/module/log/dto"
"github.com/APIParkLab/APIPark/service/log" "github.com/APIParkLab/APIPark/service/log"
eosc_log "github.com/eolinker/eosc/log"
log_print "github.com/eolinker/eosc/log" log_print "github.com/eolinker/eosc/log"
"github.com/eolinker/go-common/auto"
) )
var _ ILogModule = (*imlLogModule)(nil) var _ ILogModule = (*imlLogModule)(nil)
@@ -28,7 +33,10 @@ var _ ILogModule = (*imlLogModule)(nil)
type imlLogModule struct { type imlLogModule struct {
service log.ILogService `autowired:""` service log.ILogService `autowired:""`
clusterService cluster.IClusterService `autowired:""` clusterService cluster.IClusterService `autowired:""`
transaction store.ITransaction `autowired:""`
transaction store.ITransaction `autowired:""`
//scheduleCtx context.Context
scheduleCancel context.CancelFunc
} }
var labels = map[string]string{ var labels = map[string]string{
@@ -70,6 +78,11 @@ var logFormatter = map[string]interface{}{
"$authorization", "$authorization",
"$response_body", "$response_body",
"$proxy_response_body", "$proxy_response_body",
"$ai_provider",
"$ai_model",
"$ai_model_input_token",
"$ai_model_output_token",
"$ai_model_total_token",
}, },
} }
@@ -135,6 +148,11 @@ func (i *imlLogModule) Save(ctx context.Context, driver string, input *log_dto.S
return err return err
} }
log_driver.SetDriver(driver, d) log_driver.SetDriver(driver, d)
newCtx, cancel := context.WithCancel(context.Background())
newCtx = utils.SetUserId(newCtx, "admin")
i.scheduleCancel()
i.scheduleCancel = cancel
i.scheduleUpdateLogRecord(newCtx)
return nil return nil
}) })
} }
@@ -164,8 +182,15 @@ func (i *imlLogModule) Get(ctx context.Context, driver string) (*log_dto.LogSour
}, nil }, nil
} }
func (i *imlLogModule) OnComplete() { func (i *imlLogModule) OnInit() {
register.Handle(func(v server.Server) {
ctx, cancel := context.WithCancel(context.Background())
ctx = utils.SetUserId(ctx, "admin")
//i.scheduleCtx = ctx
i.scheduleCancel = cancel
i.scheduleUpdateLogRecord(ctx)
})
} }
func (i *imlLogModule) initGateway(ctx context.Context, clusterId string, clientDriver gateway.IClientDriver) error { func (i *imlLogModule) initGateway(ctx context.Context, clusterId string, clientDriver gateway.IClientDriver) error {
@@ -222,3 +247,152 @@ func (i *imlLogModule) initGateway(ctx context.Context, clusterId string, client
return nil return nil
} }
const (
oneSecond = 1
oneMinute = 60
oneHour = 60 * oneMinute
oneDay = 24 * oneHour
)
// 定时更新历史记录
func (i *imlLogModule) scheduleUpdateLogRecord(ctx context.Context) {
driver, has := log_driver.GetDriver("loki")
if !has {
eosc_log.Error("driver loki not found")
return
}
info, err := i.service.GetLogSource(ctx, "loki")
if err != nil {
if !errors.Is(err, gorm.ErrRecordNotFound) {
eosc_log.Errorf("get log source loki error: %s", err)
return
}
return
}
now := time.Now()
before90Days := now.Add(-7 * 24 * time.Hour)
beginTime := before90Days
if info.LastPullTime.After(before90Days) {
before90Days = info.LastPullTime
}
pauseTime := now
historyFinish := false
go func() {
eosc_log.Infof("start update history log record,start time: %s", beginTime.Format("2006-01-02 15:04:05"))
ticket := time.NewTicker(1 * time.Minute)
defer ticket.Stop()
for {
now = time.Now()
select {
case <-ctx.Done():
return
case <-ticket.C:
switch {
case now.Sub(beginTime) > oneDay:
endTime := beginTime.Add(oneDay)
err = i.updateLogRecord(ctx, driver, beginTime, endTime)
if err != nil {
eosc_log.Errorf("update log record error: %s", err)
continue
}
err = i.service.UpdateLogSource(ctx, "loki", &log.Save{
LastPullTime: &endTime,
})
if err != nil {
eosc_log.Errorf("update log source error: %s", err)
continue
}
beginTime = endTime
case now.Sub(pauseTime) <= oneDay:
endTime := pauseTime
err = i.updateLogRecord(ctx, driver, beginTime, endTime)
if err != nil {
eosc_log.Errorf("update log record error: %s", err)
historyFinish = true
return
}
historyFinish = true
err = i.service.UpdateLogSource(ctx, "loki", &log.Save{
LastPullTime: &endTime,
})
if err != nil {
eosc_log.Errorf("update log source error: %s", err)
return
}
eosc_log.Infof("update log record finish")
return
}
}
}
}()
go func() {
eosc_log.Infof("start update running log record,start time: %s", pauseTime.Format("2006-01-02 15:04:05"))
ticket := time.NewTicker(10 * time.Second)
defer ticket.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticket.C:
end := time.Now()
start := end.Add(-1 * time.Minute)
err = i.updateLogRecord(ctx, driver, start, end)
if err != nil {
eosc_log.Errorf("update log record error: %s", err)
continue
}
if historyFinish {
err = i.service.UpdateLogSource(ctx, "loki", &log.Save{
LastPullTime: &end,
})
if err != nil {
eosc_log.Errorf("update log source error: %s", err)
continue
}
}
}
}
}()
}
func (i *imlLogModule) updateLogRecord(ctx context.Context, driver log_driver.ILogDriver, start, end time.Time) error {
c, err := i.clusterService.Get(ctx, cluster.DefaultClusterID)
if err != nil {
return fmt.Errorf("cluster %s not found", cluster.DefaultClusterID)
}
logs, err := driver.LogRecords(c.Cluster, start, end)
if err != nil {
return fmt.Errorf("get log records error: %s", err)
}
for _, l := range logs {
err = i.service.InsertLog(ctx, "loki", &log.InsertLog{
ID: l.ID,
Driver: "loki",
Strategy: l.Strategy,
API: l.API,
Service: l.Service,
Method: l.Method,
Url: l.Url,
RemoteIP: l.RemoteIP,
Consumer: l.Consumer,
Authorization: l.Authorization,
InputToken: l.InputToken,
OutputToken: l.OutputToken,
TotalToken: l.TotalToken,
AIProvider: l.AIProvider,
AIModel: l.AIModel,
StatusCode: l.StatusCode,
ResponseTime: l.ResponseTime,
Traffic: l.Traffic,
RecordTime: l.RecordTime,
})
if err != nil {
eosc_log.Errorf("insert log record error: %s,log id: %s", err, l.ID)
continue
}
}
return nil
}
-42
View File
@@ -1,51 +1,9 @@
package monitor package monitor
import ( import (
"fmt"
"strconv"
"time" "time"
) )
func formatCount(count int64) string {
switch {
case count < 1000:
return strconv.FormatInt(count, 10)
case count < 1000000:
return fmt.Sprintf("%.1fK", float64(count)/1000)
case count < 1000000000:
return fmt.Sprintf("%.1fM", float64(count)/1000000)
case count < 1000000000000:
return fmt.Sprintf("%.1fB", float64(count)/1000000000)
default:
return fmt.Sprintf("%.1fT", float64(count)/1000000000000)
}
}
func formatByte(b int64) string {
const (
KB = 1000
MB = KB * 1000
GB = MB * 1000
TB = GB * 1000
PB = TB * 1000
)
switch {
case b < KB:
return fmt.Sprintf("%dB", b)
case b < MB:
return fmt.Sprintf("%.1fKB", float64(b)/KB)
case b < GB:
return fmt.Sprintf("%.1fMB", float64(b)/MB)
case b < TB:
return fmt.Sprintf("%.1fGB", float64(b)/GB)
case b < PB:
return fmt.Sprintf("%.1fTB", float64(b)/TB)
default:
return fmt.Sprintf("%.1fPB", float64(b)/PB)
}
}
const ( const (
oneMinute = 60 oneMinute = 60
oneHour = 3600 oneHour = 3600
+15 -13
View File
@@ -9,6 +9,8 @@ import (
"sync" "sync"
"time" "time"
"github.com/APIParkLab/APIPark/common"
"github.com/APIParkLab/APIPark/gateway" "github.com/APIParkLab/APIPark/gateway"
"github.com/eolinker/eosc/log" "github.com/eolinker/eosc/log"
"github.com/eolinker/go-common/auto" "github.com/eolinker/go-common/auto"
@@ -131,8 +133,8 @@ func (i *imlMonitorStatisticModule) AIChartOverview(ctx context.Context, service
Status5xx: item.Status5xx, Status5xx: item.Status5xx,
}) })
} }
result.AvgRequestPerSubscriber = formatCount(summary.StatusTotal / subscriberNum) result.AvgRequestPerSubscriber = common.FormatCount(summary.StatusTotal / subscriberNum)
result.RequestTotal = formatCount(summary.StatusTotal) result.RequestTotal = common.FormatCount(summary.StatusTotal)
}() }()
go func() { go func() {
@@ -169,11 +171,11 @@ func (i *imlMonitorStatisticModule) AIChartOverview(ctx context.Context, service
}) })
} }
result.AvgTokenPerSubscriber = formatCount(summary.TotalToken / subscriberNum) result.AvgTokenPerSubscriber = common.FormatCount(summary.TotalToken / subscriberNum)
result.MaxToken = fmt.Sprintf("%s/s", formatCount(maxToken/timeInterval)) result.MaxToken = fmt.Sprintf("%s/s", common.FormatCount(maxToken/timeInterval))
result.MinToken = fmt.Sprintf("%s/s", formatCount(minToken/timeInterval)) result.MinToken = fmt.Sprintf("%s/s", common.FormatCount(minToken/timeInterval))
result.AvgToken = fmt.Sprintf("%s/s", formatCount(summary.OutputToken/timeInterval)) result.AvgToken = fmt.Sprintf("%s/s", common.FormatCount(summary.OutputToken/timeInterval))
result.TokenTotal = formatCount(summary.TotalToken) result.TokenTotal = common.FormatCount(summary.TotalToken)
}() }()
go func() { go func() {
wg.Wait() wg.Wait()
@@ -250,8 +252,8 @@ func (i *imlMonitorStatisticModule) RestChartOverview(ctx context.Context, servi
Status5xx: item.Status5xx, Status5xx: item.Status5xx,
}) })
} }
result.AvgRequestPerSubscriber = formatCount(summary.StatusTotal / subscriberNum) result.AvgRequestPerSubscriber = common.FormatCount(summary.StatusTotal / subscriberNum)
result.RequestTotal = formatCount(summary.StatusTotal) result.RequestTotal = common.FormatCount(summary.StatusTotal)
}() }()
go func() { go func() {
@@ -288,7 +290,7 @@ func (i *imlMonitorStatisticModule) RestChartOverview(ctx context.Context, servi
}) })
result.AvgTrafficPerSubscriberOverview = append(result.AvgTrafficPerSubscriberOverview, item.StatusTotal/subscriberNum) result.AvgTrafficPerSubscriberOverview = append(result.AvgTrafficPerSubscriberOverview, item.StatusTotal/subscriberNum)
} }
result.AvgTrafficPerSubscriber = formatCount(summary.StatusTotal / subscriberNum) result.AvgTrafficPerSubscriber = common.FormatCount(summary.StatusTotal / subscriberNum)
}() }()
go func() { go func() {
wg.Wait() wg.Wait()
@@ -310,13 +312,13 @@ func generateTopN(id string, name string, item *monitor.TopN, apiKind string) *m
n := &monitor_dto.TopN{ n := &monitor_dto.TopN{
Id: id, Id: id,
Name: name, Name: name,
Request: formatCount(item.Request), Request: common.FormatCount(item.Request),
} }
switch apiKind { switch apiKind {
case "rest": case "rest":
n.Traffic = formatByte(item.Traffic) n.Traffic = common.FormatByte(item.Traffic)
case "ai": case "ai":
n.Token = formatCount(item.Token) n.Token = common.FormatCount(item.Token)
} }
return n return n
} }
+23
View File
@@ -236,3 +236,26 @@ type Overview struct {
Catalogue auto.Label `json:"catalogue" aolabel:"catalogue"` Catalogue auto.Label `json:"catalogue" aolabel:"catalogue"`
APINum int64 `json:"api_num"` APINum int64 `json:"api_num"`
} }
type AILogItem struct {
Id string `json:"id"`
API auto.Label `json:"api" aolabel:"api"`
Status int64 `json:"status"`
LogTime auto.TimeLabel `json:"log_time"`
Ip string `json:"ip"`
Token int64 `json:"token"`
TokenPerSecond int64 `json:"token_per_second"`
Consumer auto.Label `json:"consumer" aolabel:"service"`
Provider auto.Label `json:"provider" aolabel:"ai_provider"`
Model string `json:"model"`
}
type RestLogItem struct {
Id string `json:"id"`
API auto.Label `json:"api" aolabel:"api"`
Status int64 `json:"status"`
LogTime auto.TimeLabel `json:"log_time"`
Ip string `json:"ip"`
Consumer auto.Label `json:"consumer" aolabel:"service"`
ResponseTime string `json:"response_time"`
Traffic string `json:"traffic"`
}
+44
View File
@@ -9,6 +9,8 @@ import (
"strings" "strings"
"time" "time"
"github.com/APIParkLab/APIPark/common"
"github.com/mitchellh/mapstructure" "github.com/mitchellh/mapstructure"
"github.com/eolinker/go-common/register" "github.com/eolinker/go-common/register"
@@ -27,6 +29,7 @@ import (
model_runtime "github.com/APIParkLab/APIPark/ai-provider/model-runtime" model_runtime "github.com/APIParkLab/APIPark/ai-provider/model-runtime"
"github.com/APIParkLab/APIPark/resources/access" "github.com/APIParkLab/APIPark/resources/access"
log_service "github.com/APIParkLab/APIPark/service/log"
"github.com/eolinker/eosc/log" "github.com/eolinker/eosc/log"
"github.com/eolinker/go-common/server" "github.com/eolinker/go-common/server"
@@ -87,10 +90,51 @@ type imlServiceModule struct {
releaseService release.IReleaseService `autowired:""` releaseService release.IReleaseService `autowired:""`
serviceModelMappingService service_model_mapping.IServiceModelMappingService `autowired:""` serviceModelMappingService service_model_mapping.IServiceModelMappingService `autowired:""`
logService log_service.ILogService `autowired:""`
transaction store.ITransaction `autowired:""` transaction store.ITransaction `autowired:""`
} }
func (i *imlServiceModule) RestLogs(ctx context.Context, serviceId string, start int64, end int64, page int, size int) ([]*service_dto.RestLogItem, int64, error) {
list, total, err := i.logService.LogRecordsByService(ctx, serviceId, time.Unix(start, 0), time.Unix(end, 0), page, size)
if err != nil {
return nil, 0, err
}
return utils.SliceToSlice(list, func(s *log_service.Item) *service_dto.RestLogItem {
return &service_dto.RestLogItem{
Id: s.ID,
API: auto.UUID(s.API),
Status: s.StatusCode,
LogTime: auto.TimeLabel(s.RecordTime),
Ip: s.RemoteIP,
Consumer: auto.UUID(s.Consumer),
ResponseTime: common.FormatTime(s.ResponseTime),
Traffic: common.FormatByte(s.Traffic),
}
}), total, nil
}
func (i *imlServiceModule) AILogs(ctx context.Context, serviceId string, start int64, end int64, page int, size int) ([]*service_dto.AILogItem, int64, error) {
list, total, err := i.logService.LogRecordsByService(ctx, serviceId, time.Unix(start, 0), time.Unix(end, 0), page, size)
if err != nil {
return nil, 0, err
}
return utils.SliceToSlice(list, func(s *log_service.Item) *service_dto.AILogItem {
return &service_dto.AILogItem{
Id: s.ID,
API: auto.UUID(s.API),
Status: s.StatusCode,
LogTime: auto.TimeLabel(s.RecordTime),
Ip: s.RemoteIP,
Token: s.TotalToken,
TokenPerSecond: s.TotalToken / s.ResponseTime,
Consumer: auto.UUID(s.Consumer),
Provider: auto.UUID(s.AIProvider),
Model: s.AIModel,
}
}), total, nil
}
func (i *imlServiceModule) ServiceOverview(ctx context.Context, id string) (*service_dto.Overview, error) { func (i *imlServiceModule) ServiceOverview(ctx context.Context, id string) (*service_dto.Overview, error) {
info, err := i.serviceService.Get(ctx, id) info, err := i.serviceService.Get(ctx, id)
if err != nil { if err != nil {
+6
View File
@@ -36,6 +36,12 @@ type IServiceModule interface {
MySimple(ctx context.Context) ([]*service_dto.SimpleServiceItem, error) MySimple(ctx context.Context) ([]*service_dto.SimpleServiceItem, error)
ServiceOverview(ctx context.Context, id string) (*service_dto.Overview, error) ServiceOverview(ctx context.Context, id string) (*service_dto.Overview, error)
ILogModule
}
type ILogModule interface {
AILogs(ctx context.Context, serviceId string, start int64, end int64, page int, size int) ([]*service_dto.AILogItem, int64, error)
RestLogs(ctx context.Context, serviceId string, start int64, end int64, page int, size int) ([]*service_dto.RestLogItem, int64, error)
} }
type IServiceDocModule interface { type IServiceDocModule interface {
+3
View File
@@ -44,5 +44,8 @@ func (p *plugin) ServiceApis() []pm3.Api {
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/overview/monitor/ai", []string{"context", "query:service", "query:start", "query:end"}, []string{"overview"}, p.serviceController.AIChartOverview, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView), pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/overview/monitor/ai", []string{"context", "query:service", "query:start", "query:end"}, []string{"overview"}, p.serviceController.AIChartOverview, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView),
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/overview/monitor/rest", []string{"context", "query:service", "query:start", "query:end"}, []string{"overview"}, p.serviceController.RestChartOverview, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView), pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/overview/monitor/rest", []string{"context", "query:service", "query:start", "query:end"}, []string{"overview"}, p.serviceController.RestChartOverview, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView),
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/overview/basic", []string{"context", "query:service"}, []string{"overview"}, p.serviceController.ServiceOverview, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView), pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/overview/basic", []string{"context", "query:service"}, []string{"overview"}, p.serviceController.ServiceOverview, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView),
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/logs/ai", []string{"context", "query:service", "query:start", "query:end", "query:page", "query:page_size"}, []string{"logs", "total"}, p.serviceController.AILogs, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView),
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/logs/rest", []string{"context", "query:service", "query:start", "query:end", "query:page", "query:page_size"}, []string{"logs", "total"}, p.serviceController.RestLogs, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView),
} }
} }
+85 -2
View File
@@ -23,7 +23,77 @@ var (
) )
type imlLogService struct { type imlLogService struct {
store log_source.ILogSourceStore `autowired:""` store log_source.ILogSourceStore `autowired:""`
logRecordStore log_source.ILogRecordStore `autowired:""`
}
func (i *imlLogService) LogRecordsByService(ctx context.Context, serviceId string, start time.Time, end time.Time, page int, size int) ([]*Item, int64, error) {
list, total, err := i.logRecordStore.ListPage(ctx, "`record_time` between ? and ? and `service` = ?", page, size, []interface{}{
start,
end,
serviceId,
}, "record_time desc")
if err != nil {
return nil, 0, err
}
return utils.SliceToSlice(list, func(s *log_source.LogRecord) *Item {
return &Item{
ID: s.UUID,
Strategy: s.Strategy,
Service: s.Service,
API: s.API,
Method: s.Method,
Url: s.Url,
RemoteIP: s.RemoteIP,
Consumer: s.Consumer,
Authorization: s.Authorization,
InputToken: s.InputToken,
OutputToken: s.OutputToken,
TotalToken: s.TotalToken,
AIProvider: s.AIProvider,
AIModel: s.AIModel,
StatusCode: s.StatusCode,
ResponseTime: s.ResponseTime,
Traffic: s.Traffic,
RecordTime: s.RecordTime,
}
}), total, nil
}
func (i *imlLogService) InsertLog(ctx context.Context, driver string, input *InsertLog) error {
// 判断日志是否已存在,若已存在,则不插入
_, err := i.logRecordStore.First(ctx, map[string]interface{}{"uuid": input.ID})
if err != nil {
if !errors.Is(err, gorm.ErrRecordNotFound) {
log_print.Errorf("get log record %s error: %s", input.ID, err)
return err
}
return i.logRecordStore.Insert(ctx, &log_source.LogRecord{
UUID: input.ID,
Driver: input.Driver,
Service: input.Service,
API: input.API,
Strategy: input.Strategy,
Method: input.Method,
Url: input.Url,
RemoteIP: input.RemoteIP,
Consumer: input.Consumer,
Authorization: input.Authorization,
InputToken: input.InputToken,
OutputToken: input.OutputToken,
TotalToken: input.TotalToken,
AIProvider: input.AIProvider,
AIModel: input.AIModel,
StatusCode: input.StatusCode,
ResponseTime: input.ResponseTime,
Traffic: input.Traffic,
RecordTime: input.RecordTime,
})
}
return nil
} }
func (i *imlLogService) OnComplete() { func (i *imlLogService) OnComplete() {
@@ -67,9 +137,10 @@ func (i *imlLogService) UpdateLogSource(ctx context.Context, driver string, inpu
if input.Config == nil || *input.Config == "" { if input.Config == nil || *input.Config == "" {
return errors.New("config is required") return errors.New("config is required")
} }
now := time.Now() now := time.Now()
userId := utils.UserId(ctx) userId := utils.UserId(ctx)
s = &log_source.Log{ s = &log_source.LogSource{
UUID: input.ID, UUID: input.ID,
Cluster: *input.Cluster, Cluster: *input.Cluster,
Driver: driver, Driver: driver,
@@ -79,11 +150,19 @@ func (i *imlLogService) UpdateLogSource(ctx context.Context, driver string, inpu
CreateAt: now, CreateAt: now,
UpdateAt: now, UpdateAt: now,
} }
if input.LastPullTime == nil {
s.LastPullAt = time.Now().Add(-24 * time.Hour)
} else {
s.LastPullAt = *input.LastPullTime
}
} else { } else {
if input.Config != nil && *input.Config != "" { if input.Config != nil && *input.Config != "" {
s.Config = *input.Config s.Config = *input.Config
} }
if input.LastPullTime != nil {
s.LastPullAt = *input.LastPullTime
}
s.Updater = utils.UserId(ctx) s.Updater = utils.UserId(ctx)
s.UpdateAt = time.Now() s.UpdateAt = time.Now()
} }
@@ -129,6 +208,10 @@ func (i *imlLogService) Logs(ctx context.Context, driver string, cluster string,
return result, count, nil return result, count, nil
} }
func (i *imlLogService) LogRecords(ctx context.Context, driver string, keyword string, start time.Time, end time.Time) ([]*Item, int64, error) {
panic(errors.New("not implemented"))
}
func (i *imlLogService) LogCount(ctx context.Context, driver string, cluster string, conditions map[string]string, spendHour int64, group string) (map[string]int64, error) { func (i *imlLogService) LogCount(ctx context.Context, driver string, cluster string, conditions map[string]string, spendHour int64, group string) (map[string]int64, error) {
d, has := log_driver.GetDriver(driver) d, has := log_driver.GetDriver(driver)
if !has { if !has {
+56 -21
View File
@@ -7,43 +7,78 @@ import (
) )
type Save struct { type Save struct {
ID string ID string
Cluster *string Cluster *string
Config *string Config *string
LastPullTime *time.Time
} }
type Source struct { type Source struct {
ID string ID string
Cluster string Cluster string
Driver string Driver string
Config string Config string
Creator string Creator string
Updater string Updater string
CreateAt time.Time CreateAt time.Time
UpdateAt time.Time UpdateAt time.Time
LastPullTime time.Time
} }
func FromEntity(ov *log_source.Log) *Source { func FromEntity(ov *log_source.LogSource) *Source {
return &Source{ return &Source{
ID: ov.UUID, ID: ov.UUID,
Cluster: ov.Cluster, Cluster: ov.Cluster,
Driver: ov.Driver, Driver: ov.Driver,
Config: ov.Config, Config: ov.Config,
Creator: ov.Creator, Creator: ov.Creator,
Updater: ov.Updater, Updater: ov.Updater,
CreateAt: ov.CreateAt, LastPullTime: ov.LastPullAt,
UpdateAt: ov.UpdateAt, CreateAt: ov.CreateAt,
UpdateAt: ov.UpdateAt,
} }
} }
type Item struct { type InsertLog struct {
ID string ID string
Driver string
Strategy string
Service string Service string
API string
Method string Method string
Url string Url string
RemoteIP string RemoteIP string
Consumer string Consumer string
Authorization string Authorization string
InputToken int64
OutputToken int64
TotalToken int64
AIProvider string
AIModel string
StatusCode int64
ResponseTime int64
Traffic int64
RecordTime time.Time
}
type Item struct {
ID string
Strategy string
Service string
API string
Method string
Url string
RemoteIP string
Consumer string
Authorization string
InputToken int64
OutputToken int64
TotalToken int64
AIProvider string
AIModel string
StatusCode int64
ResponseTime int64
Traffic int64
RecordTime time.Time RecordTime time.Time
} }
+6
View File
@@ -13,8 +13,14 @@ type ILogService interface {
UpdateLogSource(ctx context.Context, driver string, input *Save) error UpdateLogSource(ctx context.Context, driver string, input *Save) error
GetLogSource(ctx context.Context, driver string) (*Source, error) GetLogSource(ctx context.Context, driver string) (*Source, error)
Logs(ctx context.Context, driver string, cluster string, conditions map[string]string, start time.Time, end time.Time, limit int64, offset int64) ([]*Item, int64, error) Logs(ctx context.Context, driver string, cluster string, conditions map[string]string, start time.Time, end time.Time, limit int64, offset int64) ([]*Item, int64, error)
LogRecordsByService(ctx context.Context, serviceId string, start time.Time, end time.Time, page int, size int) ([]*Item, int64, error)
LogCount(ctx context.Context, driver string, cluster string, conditions map[string]string, spendHour int64, group string) (map[string]int64, error) LogCount(ctx context.Context, driver string, cluster string, conditions map[string]string, spendHour int64, group string) (map[string]int64, error)
LogInfo(ctx context.Context, driver string, cluster string, id string) (*Info, error) LogInfo(ctx context.Context, driver string, cluster string, id string) (*Info, error)
InsertLog(ctx context.Context, driver string, input *InsertLog) error
}
type ILogUpdateService interface {
UpdateLogSource(ctx context.Context, driver string, input *Save) error
} }
func init() { func init() {
+44 -12
View File
@@ -2,22 +2,54 @@ package log_source
import "time" import "time"
type Log struct { type LogSource struct {
Id int64 `gorm:"column:id;type:BIGINT(20);AUTO_INCREMENT;NOT NULL;comment:id;primary_key;comment:主键ID;"` Id int64 `gorm:"column:id;type:BIGINT(20);AUTO_INCREMENT;NOT NULL;comment:id;primary_key;comment:主键ID;"`
UUID string `gorm:"type:varchar(36);not null;column:uuid;uniqueIndex:uuid;comment:UUID;"` UUID string `gorm:"type:varchar(36);not null;column:uuid;uniqueIndex:uuid;comment:UUID;"`
Cluster string `gorm:"column:cluster;type:varchar(36);NOT NULL;comment:集群ID"` Cluster string `gorm:"column:cluster;type:varchar(36);NOT NULL;comment:集群ID"`
Driver string `gorm:"column:driver;type:VARCHAR(36);NOT NULL;comment:驱动"` Driver string `gorm:"column:driver;type:VARCHAR(36);NOT NULL;comment:驱动"`
Config string `gorm:"column:config;type:TEXT;NOT NULL;comment:配置"` Config string `gorm:"column:config;type:TEXT;NOT NULL;comment:配置"`
Creator string `gorm:"type:varchar(36);not null;column:creator;comment:creator" aovalue:"creator"` Creator string `gorm:"type:varchar(36);not null;column:creator;comment:creator" aovalue:"creator"`
Updater string `gorm:"type:varchar(36);not null;column:updater;comment:updater" aovalue:"updater"` Updater string `gorm:"type:varchar(36);not null;column:updater;comment:updater" aovalue:"updater"`
CreateAt time.Time `gorm:"type:timestamp;NOT NULL;DEFAULT:CURRENT_TIMESTAMP;column:create_at;comment:创建时间"` LastPullAt time.Time `gorm:"type:timestamp;NOT NULL;DEFAULT:CURRENT_TIMESTAMP;column:last_pull_at;comment:最后拉取时间"`
UpdateAt time.Time `gorm:"type:timestamp;NOT NULL;DEFAULT:CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;column:update_at;comment:修改时间" json:"update_at"` CreateAt time.Time `gorm:"type:timestamp;NOT NULL;DEFAULT:CURRENT_TIMESTAMP;column:create_at;comment:创建时间"`
UpdateAt time.Time `gorm:"type:timestamp;NOT NULL;DEFAULT:CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;column:update_at;comment:修改时间" json:"update_at"`
} }
func (c *Log) IdValue() int64 { func (c *LogSource) IdValue() int64 {
return c.Id return c.Id
} }
func (c *Log) TableName() string { func (c *LogSource) TableName() string {
return "log" return "log"
} }
type LogRecord struct {
Id int64 `gorm:"column:id;type:BIGINT(20);AUTO_INCREMENT;NOT NULL;comment:id;primary_key;comment:主键ID;"`
UUID string `gorm:"type:varchar(36);not null;column:uuid;uniqueIndex:uuid;comment:UUID;"`
Driver string `gorm:"column:driver;type:VARCHAR(36);NOT NULL;comment:驱动"`
Service string `gorm:"column:service;type:varchar(36);NOT NULL;comment:服务ID"`
API string `gorm:"column:api;type:varchar(36);NOT NULL;comment:接口ID"`
Strategy string `gorm:"column:strategy;type:varchar(36);NOT NULL;comment:策略ID"`
Method string `gorm:"column:method;type:varchar(36);NOT NULL;comment:请求方法"`
Url string `gorm:"column:url;type:varchar(255);NOT NULL;comment:请求URL"`
RemoteIP string `gorm:"column:remote_ip;type:varchar(255);NOT NULL;comment:请求IP"`
Consumer string `gorm:"column:consumer;type:varchar(255);NOT NULL;comment:消费者ID"`
Authorization string `gorm:"column:authorization;type:varchar(255);NOT NULL;comment:鉴权ID"`
InputToken int64 `gorm:"column:input_token;type:int(11);NOT NULL;comment:输入令牌"`
OutputToken int64 `gorm:"column:output_token;type:int(11);NOT NULL;comment:输出令牌"`
TotalToken int64 `gorm:"column:total_token;type:int(11);NOT NULL;comment:总令牌"`
AIProvider string `gorm:"column:ai_provider;type:varchar(255);NOT NULL;comment:AI提供商"`
AIModel string `gorm:"column:ai_model;type:varchar(255);NOT NULL;comment:AI模型"`
StatusCode int64 `gorm:"column:status_code;type:int(11);NOT NULL;comment:请求状态码"`
ResponseTime int64 `gorm:"column:response_time;type:int(11);NOT NULL;comment:响应时间"`
Traffic int64 `gorm:"column:traffic;type:BIGINT(20);NOT NULL;comment:流量"`
RecordTime time.Time `gorm:"column:record_time;type:timestamp;NOT NULL;DEFAULT:CURRENT_TIMESTAMP;comment:记录时间"`
}
func (c *LogRecord) IdValue() int64 {
return c.Id
}
func (c *LogRecord) TableName() string {
return "log_record"
}
+13 -2
View File
@@ -8,15 +8,26 @@ import (
) )
type ILogSourceStore interface { type ILogSourceStore interface {
store.IBaseStore[Log] store.IBaseStore[LogSource]
} }
type storeLogSource struct { type storeLogSource struct {
store.Store[Log] store.Store[LogSource]
}
type ILogRecordStore interface {
store.IBaseStore[LogRecord]
}
type storeLogRecord struct {
store.Store[LogRecord]
} }
func init() { func init() {
autowire.Auto[ILogSourceStore](func() reflect.Value { autowire.Auto[ILogSourceStore](func() reflect.Value {
return reflect.ValueOf(new(storeLogSource)) return reflect.ValueOf(new(storeLogSource))
}) })
autowire.Auto[ILogRecordStore](func() reflect.Value {
return reflect.ValueOf(new(storeLogRecord))
})
} }