diff --git a/common/format.go b/common/format.go new file mode 100644 index 00000000..e389aa1a --- /dev/null +++ b/common/format.go @@ -0,0 +1,62 @@ +package common + +import ( + "fmt" + "strconv" +) + +func FormatCount(count int64) string { + switch { + case count < 1000: + return strconv.FormatInt(count, 10) + case count < 1000000: + return fmt.Sprintf("%.1fK", float64(count)/1000) + case count < 1000000000: + return fmt.Sprintf("%.1fM", float64(count)/1000000) + case count < 1000000000000: + return fmt.Sprintf("%.1fB", float64(count)/1000000000) + default: + return fmt.Sprintf("%.1fT", float64(count)/1000000000000) + } +} + +func FormatTime(t int64) string { + if t < 1000 { + return strconv.FormatInt(t, 10) + "ms" + } + if t < 1000000 { + return fmt.Sprintf("%.1fs", float64(t)/1000) + } + if t < 1000000000 { + return fmt.Sprintf("%.1fmin", float64(t)/1000000) + } + if t < 1000000000000 { + return fmt.Sprintf("%.1fhour", float64(t)/1000000000) + } + return fmt.Sprintf("%.1D", float64(t)/1000000000000) +} + +func FormatByte(b int64) string { + const ( + KB = 1000 + MB = KB * 1000 + GB = MB * 1000 + TB = GB * 1000 + PB = TB * 1000 + ) + + switch { + case b < KB: + return fmt.Sprintf("%dB", b) + case b < MB: + return fmt.Sprintf("%.1fKB", float64(b)/KB) + case b < GB: + return fmt.Sprintf("%.1fMB", float64(b)/MB) + case b < TB: + return fmt.Sprintf("%.1fGB", float64(b)/GB) + case b < PB: + return fmt.Sprintf("%.1fTB", float64(b)/TB) + default: + return fmt.Sprintf("%.1fPB", float64(b)/PB) + } +} diff --git a/controller/service/iml.go b/controller/service/iml.go index e35f73d7..f5ed267c 100644 --- a/controller/service/iml.go +++ b/controller/service/iml.go @@ -59,10 +59,6 @@ import ( "github.com/google/uuid" ) -//var ( -// ollamaConfig = "{\n \"mirostat\": 0,\n \"mirostat_eta\": 0.1,\n \"mirostat_tau\": 5.0,\n \"num_ctx\": 4096,\n \"repeat_last_n\":64,\n \"repeat_penalty\": 1.1,\n \"temperature\": 0.7,\n \"seed\": 42,\n \"num_predict\": 42,\n \"top_k\": 40,\n \"top_p\": 0.9,\n \"min_p\": 0.5\n}\n" -//) - var ( _ IServiceController = (*imlServiceController)(nil) @@ -88,6 +84,58 @@ type imlServiceController struct { transaction store.ITransaction `autowired:""` } +func (i *imlServiceController) AILogs(ctx *gin.Context, serviceId string, start string, end string, page string, size string) ([]*service_dto.AILogItem, int64, error) { + s, e, err := formatTime(start, end) + if err != nil { + return nil, 0, err + } + if serviceId == "" { + return nil, 0, fmt.Errorf("service id is empty") + } + if page == "" { + page = "1" + } + if size == "" { + size = "20" + } + p, err := strconv.Atoi(page) + if err != nil { + return nil, 0, err + } + ps, err := strconv.Atoi(size) + if err != nil { + return nil, 0, err + } + + return i.module.AILogs(ctx, serviceId, s, e, p, ps) +} + +func (i *imlServiceController) RestLogs(ctx *gin.Context, serviceId string, start string, end string, page string, size string) ([]*service_dto.RestLogItem, int64, error) { + s, e, err := formatTime(start, end) + if err != nil { + return nil, 0, err + } + if serviceId == "" { + return nil, 0, fmt.Errorf("service id is empty") + } + if page == "" { + page = "1" + } + if size == "" { + size = "20" + } + p, err := strconv.Atoi(page) + if err != nil { + return nil, 0, err + } + ps, err := strconv.Atoi(size) + if err != nil { + return nil, 0, err + } + + return i.module.RestLogs(ctx, serviceId, s, e, p, ps) +} + func (i *imlServiceController) ServiceOverview(ctx *gin.Context, serviceId string) (*service_dto.Overview, error) { o, err := i.module.ServiceOverview(ctx, serviceId) if err != nil { diff --git a/controller/service/service.go b/controller/service/service.go index 796a8a2e..350a93a1 100644 --- a/controller/service/service.go +++ b/controller/service/service.go @@ -41,6 +41,10 @@ type IServiceController interface { RestChartOverview(ctx *gin.Context, serviceId string, start string, end string) (*monitor_dto.ServiceChartRestOverview, error) ServiceOverview(ctx *gin.Context, serviceId string) (*service_dto.Overview, error) + + AILogs(ctx *gin.Context, serviceId string, start string, end string, page string, size string) ([]*service_dto.AILogItem, int64, error) + + RestLogs(ctx *gin.Context, serviceId string, start string, end string, page string, size string) ([]*service_dto.RestLogItem, int64, error) } type IAppController interface { diff --git a/log-driver/driver.go b/log-driver/driver.go index f3f35329..d21132e6 100644 --- a/log-driver/driver.go +++ b/log-driver/driver.go @@ -10,6 +10,7 @@ type ILogDriver interface { LogInfo(clusterId string, id string) (*LogInfo, error) LogCount(clusterId string, conditions map[string]string, spendHour int64, group string) (map[string]int64, error) Logs(clusterId string, conditions map[string]string, start time.Time, end time.Time, limit int64, offset int64) ([]*Log, int64, error) + LogRecords(clusterId string, start time.Time, end time.Time) ([]*Log, error) } var ( diff --git a/log-driver/entity.go b/log-driver/entity.go index cad3edd0..05920c36 100644 --- a/log-driver/entity.go +++ b/log-driver/entity.go @@ -6,12 +6,22 @@ import ( type Log struct { ID string + Strategy string Service string + API string Method string Url string RemoteIP string Consumer string Authorization string + InputToken int64 + OutputToken int64 + TotalToken int64 + AIProvider string + AIModel string + StatusCode int64 + ResponseTime int64 + Traffic int64 RecordTime time.Time } diff --git a/log-driver/loki/entity.go b/log-driver/loki/entity.go index 391983a3..df0caad3 100644 --- a/log-driver/loki/entity.go +++ b/log-driver/loki/entity.go @@ -53,25 +53,30 @@ type LogInfo struct { } type LogDetail struct { - Api string `json:"api"` - Application string `json:"application"` - Strategy string `json:"strategy"` - ContentType string `json:"content_type"` - Cluster string `json:"cluster"` - Msec string `json:"msec"` - Node string `json:"node"` - RequestId string `json:"request_id"` - RequestMethod string `json:"request_method"` - RequestScheme string `json:"request_scheme"` - RequestTime string `json:"request_time"` - RequestUri string `json:"request_uri"` - RequestBody string `json:"request_body"` - ProxyBody string `json:"proxy_body"` - ResponseBody string `json:"response_body"` - ProxyResponseBody string `json:"proxy_response_body"` - Service string `json:"service"` - Provider string `json:"provider"` - Authorization string `json:"authorization"` - SrcIp string `json:"src_ip"` - Status string `json:"status"` + Api string `json:"api"` + Application string `json:"application"` + Strategy string `json:"strategy"` + ContentType string `json:"content_type"` + Cluster string `json:"cluster"` + Msec string `json:"msec"` + Node string `json:"node"` + RequestId string `json:"request_id"` + RequestMethod string `json:"request_method"` + RequestScheme string `json:"request_scheme"` + RequestTime string `json:"request_time"` + RequestUri string `json:"request_uri"` + RequestBody string `json:"request_body"` + ProxyBody string `json:"proxy_body"` + ResponseBody string `json:"response_body"` + ProxyResponseBody string `json:"proxy_response_body"` + Service string `json:"service"` + Provider string `json:"provider"` + Authorization string `json:"authorization"` + SrcIp string `json:"src_ip"` + Status string `json:"status"` + AIProvider string `json:"ai_provider"` + AIModel string `json:"ai_model"` + AIModelInputToken interface{} `json:"ai_model_input_token"` + AIModelOutputToken interface{} `json:"ai_model_output_token"` + AIModelTotalToken interface{} `json:"ai_model_total_token"` } diff --git a/log-driver/loki/loki.go b/log-driver/loki/loki.go index 9f490fbc..ffafaf40 100644 --- a/log-driver/loki/loki.go +++ b/log-driver/loki/loki.go @@ -132,6 +132,24 @@ func (d *Driver) LogCount(clusterId string, conditions map[string]string, spendH return result, nil } +func (d *Driver) LogRecords(clusterId string, start time.Time, end time.Time) ([]*log_driver.Log, error) { + if start.After(end) { + return nil, fmt.Errorf("start time is greater than end time") + } + queries := url.Values{} + queries.Set("query", fmt.Sprintf("{cluster=\"%s\"} | json", clusterId)) + queries.Set("direction", "backward") + queries.Set("start", strconv.FormatInt(start.UnixNano(), 10)) + queries.Set("end", strconv.FormatInt(end.UnixNano(), 10)) + log.Debug("query is ", queries.Get("query")) + logs, err := d.recuseLogs(queries, end, 1) + if err != nil { + return nil, err + } + + return logs, nil +} + func (d *Driver) Logs(clusterId string, conditions map[string]string, start time.Time, end time.Time, limit int64, offset int64) ([]*log_driver.Log, int64, error) { if start.After(end) { return nil, 0, fmt.Errorf("start time is greater than end time") @@ -205,15 +223,24 @@ func (d *Driver) recuseLogs(queries url.Values, end time.Time, offset int64) ([] } detail := l.Stream msec, _ := strconv.ParseInt(detail.Msec, 10, 64) - logs = append(logs, &log_driver.Log{ ID: detail.RequestId, + Strategy: detail.Strategy, Service: detail.Provider, + API: detail.Api, Method: detail.RequestMethod, Url: detail.RequestUri, RemoteIP: detail.SrcIp, Consumer: detail.Application, Authorization: detail.Authorization, + InputToken: parseToInt64(detail.AIModelInputToken), + OutputToken: parseToInt64(detail.AIModelOutputToken), + TotalToken: parseToInt64(detail.AIModelTotalToken), + AIProvider: detail.AIProvider, + AIModel: detail.AIModel, + StatusCode: parseToInt64(detail.Status), + ResponseTime: parseToInt64(detail.RequestTime), + Traffic: int64(len(detail.ResponseBody) + len(detail.RequestBody)), RecordTime: time.UnixMilli(msec), }) } @@ -223,6 +250,26 @@ func (d *Driver) recuseLogs(queries url.Values, end time.Time, offset int64) ([] return logs, nil } +func parseToInt64(v interface{}) int64 { + switch t := v.(type) { + case int: + return int64(t) + case int64: + return t + case string: + if v == "" { + return 0 + } + i, err := strconv.ParseInt(t, 10, 64) + if err != nil { + return 0 + } + return i + default: + return 0 + } +} + func (d *Driver) logCount(clusterId string, conditions map[string]string, start time.Time, end time.Time) (int64, error) { // 先查在这段时间内符合条件的日志数量 queries := url.Values{} diff --git a/module/log/iml.go b/module/log/iml.go index a6426e7a..71844ea3 100644 --- a/module/log/iml.go +++ b/module/log/iml.go @@ -4,9 +4,14 @@ import ( "context" "encoding/json" "errors" + "fmt" "time" + "github.com/eolinker/go-common/server" + log_driver "github.com/APIParkLab/APIPark/log-driver" + "github.com/eolinker/go-common/register" + "github.com/eolinker/go-common/utils" "github.com/APIParkLab/APIPark/gateway" @@ -16,11 +21,11 @@ import ( "github.com/APIParkLab/APIPark/service/cluster" - "github.com/eolinker/go-common/auto" - log_dto "github.com/APIParkLab/APIPark/module/log/dto" "github.com/APIParkLab/APIPark/service/log" + eosc_log "github.com/eolinker/eosc/log" log_print "github.com/eolinker/eosc/log" + "github.com/eolinker/go-common/auto" ) var _ ILogModule = (*imlLogModule)(nil) @@ -28,7 +33,10 @@ var _ ILogModule = (*imlLogModule)(nil) type imlLogModule struct { service log.ILogService `autowired:""` clusterService cluster.IClusterService `autowired:""` - transaction store.ITransaction `autowired:""` + + transaction store.ITransaction `autowired:""` + //scheduleCtx context.Context + scheduleCancel context.CancelFunc } var labels = map[string]string{ @@ -70,6 +78,11 @@ var logFormatter = map[string]interface{}{ "$authorization", "$response_body", "$proxy_response_body", + "$ai_provider", + "$ai_model", + "$ai_model_input_token", + "$ai_model_output_token", + "$ai_model_total_token", }, } @@ -135,6 +148,11 @@ func (i *imlLogModule) Save(ctx context.Context, driver string, input *log_dto.S return err } log_driver.SetDriver(driver, d) + newCtx, cancel := context.WithCancel(context.Background()) + newCtx = utils.SetUserId(newCtx, "admin") + i.scheduleCancel() + i.scheduleCancel = cancel + i.scheduleUpdateLogRecord(newCtx) return nil }) } @@ -164,8 +182,15 @@ func (i *imlLogModule) Get(ctx context.Context, driver string) (*log_dto.LogSour }, nil } -func (i *imlLogModule) OnComplete() { +func (i *imlLogModule) OnInit() { + register.Handle(func(v server.Server) { + ctx, cancel := context.WithCancel(context.Background()) + ctx = utils.SetUserId(ctx, "admin") + //i.scheduleCtx = ctx + i.scheduleCancel = cancel + i.scheduleUpdateLogRecord(ctx) + }) } func (i *imlLogModule) initGateway(ctx context.Context, clusterId string, clientDriver gateway.IClientDriver) error { @@ -222,3 +247,152 @@ func (i *imlLogModule) initGateway(ctx context.Context, clusterId string, client return nil } + +const ( + oneSecond = 1 + oneMinute = 60 + oneHour = 60 * oneMinute + oneDay = 24 * oneHour +) + +// 定时更新历史记录 +func (i *imlLogModule) scheduleUpdateLogRecord(ctx context.Context) { + driver, has := log_driver.GetDriver("loki") + if !has { + eosc_log.Error("driver loki not found") + return + } + info, err := i.service.GetLogSource(ctx, "loki") + if err != nil { + if !errors.Is(err, gorm.ErrRecordNotFound) { + eosc_log.Errorf("get log source loki error: %s", err) + return + } + return + } + now := time.Now() + before90Days := now.Add(-7 * 24 * time.Hour) + beginTime := before90Days + if info.LastPullTime.After(before90Days) { + before90Days = info.LastPullTime + } + pauseTime := now + historyFinish := false + go func() { + eosc_log.Infof("start update history log record,start time: %s", beginTime.Format("2006-01-02 15:04:05")) + ticket := time.NewTicker(1 * time.Minute) + defer ticket.Stop() + for { + now = time.Now() + select { + case <-ctx.Done(): + return + case <-ticket.C: + switch { + case now.Sub(beginTime) > oneDay: + endTime := beginTime.Add(oneDay) + err = i.updateLogRecord(ctx, driver, beginTime, endTime) + if err != nil { + eosc_log.Errorf("update log record error: %s", err) + continue + } + err = i.service.UpdateLogSource(ctx, "loki", &log.Save{ + LastPullTime: &endTime, + }) + if err != nil { + eosc_log.Errorf("update log source error: %s", err) + continue + } + beginTime = endTime + case now.Sub(pauseTime) <= oneDay: + endTime := pauseTime + err = i.updateLogRecord(ctx, driver, beginTime, endTime) + if err != nil { + eosc_log.Errorf("update log record error: %s", err) + historyFinish = true + return + } + historyFinish = true + err = i.service.UpdateLogSource(ctx, "loki", &log.Save{ + LastPullTime: &endTime, + }) + if err != nil { + eosc_log.Errorf("update log source error: %s", err) + return + } + eosc_log.Infof("update log record finish") + return + } + } + } + }() + go func() { + eosc_log.Infof("start update running log record,start time: %s", pauseTime.Format("2006-01-02 15:04:05")) + ticket := time.NewTicker(10 * time.Second) + defer ticket.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticket.C: + end := time.Now() + start := end.Add(-1 * time.Minute) + err = i.updateLogRecord(ctx, driver, start, end) + if err != nil { + eosc_log.Errorf("update log record error: %s", err) + continue + } + if historyFinish { + err = i.service.UpdateLogSource(ctx, "loki", &log.Save{ + LastPullTime: &end, + }) + if err != nil { + eosc_log.Errorf("update log source error: %s", err) + continue + } + } + } + } + }() + +} + +func (i *imlLogModule) updateLogRecord(ctx context.Context, driver log_driver.ILogDriver, start, end time.Time) error { + c, err := i.clusterService.Get(ctx, cluster.DefaultClusterID) + if err != nil { + return fmt.Errorf("cluster %s not found", cluster.DefaultClusterID) + } + logs, err := driver.LogRecords(c.Cluster, start, end) + if err != nil { + + return fmt.Errorf("get log records error: %s", err) + } + for _, l := range logs { + err = i.service.InsertLog(ctx, "loki", &log.InsertLog{ + ID: l.ID, + Driver: "loki", + Strategy: l.Strategy, + API: l.API, + Service: l.Service, + Method: l.Method, + Url: l.Url, + RemoteIP: l.RemoteIP, + Consumer: l.Consumer, + Authorization: l.Authorization, + InputToken: l.InputToken, + OutputToken: l.OutputToken, + TotalToken: l.TotalToken, + AIProvider: l.AIProvider, + AIModel: l.AIModel, + StatusCode: l.StatusCode, + ResponseTime: l.ResponseTime, + Traffic: l.Traffic, + RecordTime: l.RecordTime, + }) + if err != nil { + eosc_log.Errorf("insert log record error: %s,log id: %s", err, l.ID) + continue + } + } + return nil +} diff --git a/module/monitor/format.go b/module/monitor/format.go index e3440274..c391612f 100644 --- a/module/monitor/format.go +++ b/module/monitor/format.go @@ -1,51 +1,9 @@ package monitor import ( - "fmt" - "strconv" "time" ) -func formatCount(count int64) string { - switch { - case count < 1000: - return strconv.FormatInt(count, 10) - case count < 1000000: - return fmt.Sprintf("%.1fK", float64(count)/1000) - case count < 1000000000: - return fmt.Sprintf("%.1fM", float64(count)/1000000) - case count < 1000000000000: - return fmt.Sprintf("%.1fB", float64(count)/1000000000) - default: - return fmt.Sprintf("%.1fT", float64(count)/1000000000000) - } -} - -func formatByte(b int64) string { - const ( - KB = 1000 - MB = KB * 1000 - GB = MB * 1000 - TB = GB * 1000 - PB = TB * 1000 - ) - - switch { - case b < KB: - return fmt.Sprintf("%dB", b) - case b < MB: - return fmt.Sprintf("%.1fKB", float64(b)/KB) - case b < GB: - return fmt.Sprintf("%.1fMB", float64(b)/MB) - case b < TB: - return fmt.Sprintf("%.1fGB", float64(b)/GB) - case b < PB: - return fmt.Sprintf("%.1fTB", float64(b)/TB) - default: - return fmt.Sprintf("%.1fPB", float64(b)/PB) - } -} - const ( oneMinute = 60 oneHour = 3600 diff --git a/module/monitor/iml.go b/module/monitor/iml.go index 0d4bee98..a0d80433 100644 --- a/module/monitor/iml.go +++ b/module/monitor/iml.go @@ -9,6 +9,8 @@ import ( "sync" "time" + "github.com/APIParkLab/APIPark/common" + "github.com/APIParkLab/APIPark/gateway" "github.com/eolinker/eosc/log" "github.com/eolinker/go-common/auto" @@ -131,8 +133,8 @@ func (i *imlMonitorStatisticModule) AIChartOverview(ctx context.Context, service Status5xx: item.Status5xx, }) } - result.AvgRequestPerSubscriber = formatCount(summary.StatusTotal / subscriberNum) - result.RequestTotal = formatCount(summary.StatusTotal) + result.AvgRequestPerSubscriber = common.FormatCount(summary.StatusTotal / subscriberNum) + result.RequestTotal = common.FormatCount(summary.StatusTotal) }() go func() { @@ -169,11 +171,11 @@ func (i *imlMonitorStatisticModule) AIChartOverview(ctx context.Context, service }) } - result.AvgTokenPerSubscriber = formatCount(summary.TotalToken / subscriberNum) - result.MaxToken = fmt.Sprintf("%s/s", formatCount(maxToken/timeInterval)) - result.MinToken = fmt.Sprintf("%s/s", formatCount(minToken/timeInterval)) - result.AvgToken = fmt.Sprintf("%s/s", formatCount(summary.OutputToken/timeInterval)) - result.TokenTotal = formatCount(summary.TotalToken) + result.AvgTokenPerSubscriber = common.FormatCount(summary.TotalToken / subscriberNum) + result.MaxToken = fmt.Sprintf("%s/s", common.FormatCount(maxToken/timeInterval)) + result.MinToken = fmt.Sprintf("%s/s", common.FormatCount(minToken/timeInterval)) + result.AvgToken = fmt.Sprintf("%s/s", common.FormatCount(summary.OutputToken/timeInterval)) + result.TokenTotal = common.FormatCount(summary.TotalToken) }() go func() { wg.Wait() @@ -250,8 +252,8 @@ func (i *imlMonitorStatisticModule) RestChartOverview(ctx context.Context, servi Status5xx: item.Status5xx, }) } - result.AvgRequestPerSubscriber = formatCount(summary.StatusTotal / subscriberNum) - result.RequestTotal = formatCount(summary.StatusTotal) + result.AvgRequestPerSubscriber = common.FormatCount(summary.StatusTotal / subscriberNum) + result.RequestTotal = common.FormatCount(summary.StatusTotal) }() go func() { @@ -288,7 +290,7 @@ func (i *imlMonitorStatisticModule) RestChartOverview(ctx context.Context, servi }) result.AvgTrafficPerSubscriberOverview = append(result.AvgTrafficPerSubscriberOverview, item.StatusTotal/subscriberNum) } - result.AvgTrafficPerSubscriber = formatCount(summary.StatusTotal / subscriberNum) + result.AvgTrafficPerSubscriber = common.FormatCount(summary.StatusTotal / subscriberNum) }() go func() { wg.Wait() @@ -310,13 +312,13 @@ func generateTopN(id string, name string, item *monitor.TopN, apiKind string) *m n := &monitor_dto.TopN{ Id: id, Name: name, - Request: formatCount(item.Request), + Request: common.FormatCount(item.Request), } switch apiKind { case "rest": - n.Traffic = formatByte(item.Traffic) + n.Traffic = common.FormatByte(item.Traffic) case "ai": - n.Token = formatCount(item.Token) + n.Token = common.FormatCount(item.Token) } return n } diff --git a/module/service/dto/output.go b/module/service/dto/output.go index dc8694dc..1901729c 100644 --- a/module/service/dto/output.go +++ b/module/service/dto/output.go @@ -236,3 +236,26 @@ type Overview struct { Catalogue auto.Label `json:"catalogue" aolabel:"catalogue"` APINum int64 `json:"api_num"` } + +type AILogItem struct { + Id string `json:"id"` + API auto.Label `json:"api" aolabel:"api"` + Status int64 `json:"status"` + LogTime auto.TimeLabel `json:"log_time"` + Ip string `json:"ip"` + Token int64 `json:"token"` + TokenPerSecond int64 `json:"token_per_second"` + Consumer auto.Label `json:"consumer" aolabel:"service"` + Provider auto.Label `json:"provider" aolabel:"ai_provider"` + Model string `json:"model"` +} +type RestLogItem struct { + Id string `json:"id"` + API auto.Label `json:"api" aolabel:"api"` + Status int64 `json:"status"` + LogTime auto.TimeLabel `json:"log_time"` + Ip string `json:"ip"` + Consumer auto.Label `json:"consumer" aolabel:"service"` + ResponseTime string `json:"response_time"` + Traffic string `json:"traffic"` +} diff --git a/module/service/iml.go b/module/service/iml.go index 88423d74..861e58ce 100644 --- a/module/service/iml.go +++ b/module/service/iml.go @@ -9,6 +9,8 @@ import ( "strings" "time" + "github.com/APIParkLab/APIPark/common" + "github.com/mitchellh/mapstructure" "github.com/eolinker/go-common/register" @@ -27,6 +29,7 @@ import ( model_runtime "github.com/APIParkLab/APIPark/ai-provider/model-runtime" "github.com/APIParkLab/APIPark/resources/access" + log_service "github.com/APIParkLab/APIPark/service/log" "github.com/eolinker/eosc/log" "github.com/eolinker/go-common/server" @@ -87,10 +90,51 @@ type imlServiceModule struct { releaseService release.IReleaseService `autowired:""` serviceModelMappingService service_model_mapping.IServiceModelMappingService `autowired:""` + logService log_service.ILogService `autowired:""` transaction store.ITransaction `autowired:""` } +func (i *imlServiceModule) RestLogs(ctx context.Context, serviceId string, start int64, end int64, page int, size int) ([]*service_dto.RestLogItem, int64, error) { + list, total, err := i.logService.LogRecordsByService(ctx, serviceId, time.Unix(start, 0), time.Unix(end, 0), page, size) + if err != nil { + return nil, 0, err + } + return utils.SliceToSlice(list, func(s *log_service.Item) *service_dto.RestLogItem { + return &service_dto.RestLogItem{ + Id: s.ID, + API: auto.UUID(s.API), + Status: s.StatusCode, + LogTime: auto.TimeLabel(s.RecordTime), + Ip: s.RemoteIP, + Consumer: auto.UUID(s.Consumer), + ResponseTime: common.FormatTime(s.ResponseTime), + Traffic: common.FormatByte(s.Traffic), + } + }), total, nil +} + +func (i *imlServiceModule) AILogs(ctx context.Context, serviceId string, start int64, end int64, page int, size int) ([]*service_dto.AILogItem, int64, error) { + list, total, err := i.logService.LogRecordsByService(ctx, serviceId, time.Unix(start, 0), time.Unix(end, 0), page, size) + if err != nil { + return nil, 0, err + } + return utils.SliceToSlice(list, func(s *log_service.Item) *service_dto.AILogItem { + return &service_dto.AILogItem{ + Id: s.ID, + API: auto.UUID(s.API), + Status: s.StatusCode, + LogTime: auto.TimeLabel(s.RecordTime), + Ip: s.RemoteIP, + Token: s.TotalToken, + TokenPerSecond: s.TotalToken / s.ResponseTime, + Consumer: auto.UUID(s.Consumer), + Provider: auto.UUID(s.AIProvider), + Model: s.AIModel, + } + }), total, nil +} + func (i *imlServiceModule) ServiceOverview(ctx context.Context, id string) (*service_dto.Overview, error) { info, err := i.serviceService.Get(ctx, id) if err != nil { diff --git a/module/service/module.go b/module/service/module.go index b5f9df1b..b4f253ee 100644 --- a/module/service/module.go +++ b/module/service/module.go @@ -36,6 +36,12 @@ type IServiceModule interface { MySimple(ctx context.Context) ([]*service_dto.SimpleServiceItem, error) ServiceOverview(ctx context.Context, id string) (*service_dto.Overview, error) + ILogModule +} + +type ILogModule interface { + AILogs(ctx context.Context, serviceId string, start int64, end int64, page int, size int) ([]*service_dto.AILogItem, int64, error) + RestLogs(ctx context.Context, serviceId string, start int64, end int64, page int, size int) ([]*service_dto.RestLogItem, int64, error) } type IServiceDocModule interface { diff --git a/plugins/core/service.go b/plugins/core/service.go index 1a0726ae..44e5103e 100644 --- a/plugins/core/service.go +++ b/plugins/core/service.go @@ -44,5 +44,8 @@ func (p *plugin) ServiceApis() []pm3.Api { pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/overview/monitor/ai", []string{"context", "query:service", "query:start", "query:end"}, []string{"overview"}, p.serviceController.AIChartOverview, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView), pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/overview/monitor/rest", []string{"context", "query:service", "query:start", "query:end"}, []string{"overview"}, p.serviceController.RestChartOverview, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView), pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/overview/basic", []string{"context", "query:service"}, []string{"overview"}, p.serviceController.ServiceOverview, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView), + + pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/logs/ai", []string{"context", "query:service", "query:start", "query:end", "query:page", "query:page_size"}, []string{"logs", "total"}, p.serviceController.AILogs, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView), + pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/logs/rest", []string{"context", "query:service", "query:start", "query:end", "query:page", "query:page_size"}, []string{"logs", "total"}, p.serviceController.RestLogs, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView), } } diff --git a/service/log/iml.go b/service/log/iml.go index f514dc31..1c9dd9fb 100644 --- a/service/log/iml.go +++ b/service/log/iml.go @@ -23,7 +23,77 @@ var ( ) type imlLogService struct { - store log_source.ILogSourceStore `autowired:""` + store log_source.ILogSourceStore `autowired:""` + logRecordStore log_source.ILogRecordStore `autowired:""` +} + +func (i *imlLogService) LogRecordsByService(ctx context.Context, serviceId string, start time.Time, end time.Time, page int, size int) ([]*Item, int64, error) { + list, total, err := i.logRecordStore.ListPage(ctx, "`record_time` between ? and ? and `service` = ?", page, size, []interface{}{ + start, + end, + serviceId, + }, "record_time desc") + if err != nil { + return nil, 0, err + } + return utils.SliceToSlice(list, func(s *log_source.LogRecord) *Item { + return &Item{ + ID: s.UUID, + Strategy: s.Strategy, + Service: s.Service, + API: s.API, + Method: s.Method, + Url: s.Url, + RemoteIP: s.RemoteIP, + Consumer: s.Consumer, + Authorization: s.Authorization, + InputToken: s.InputToken, + OutputToken: s.OutputToken, + TotalToken: s.TotalToken, + AIProvider: s.AIProvider, + AIModel: s.AIModel, + StatusCode: s.StatusCode, + ResponseTime: s.ResponseTime, + Traffic: s.Traffic, + RecordTime: s.RecordTime, + } + }), total, nil + +} + +func (i *imlLogService) InsertLog(ctx context.Context, driver string, input *InsertLog) error { + // 判断日志是否已存在,若已存在,则不插入 + _, err := i.logRecordStore.First(ctx, map[string]interface{}{"uuid": input.ID}) + if err != nil { + if !errors.Is(err, gorm.ErrRecordNotFound) { + log_print.Errorf("get log record %s error: %s", input.ID, err) + return err + } + return i.logRecordStore.Insert(ctx, &log_source.LogRecord{ + UUID: input.ID, + Driver: input.Driver, + Service: input.Service, + API: input.API, + Strategy: input.Strategy, + Method: input.Method, + Url: input.Url, + RemoteIP: input.RemoteIP, + Consumer: input.Consumer, + Authorization: input.Authorization, + InputToken: input.InputToken, + OutputToken: input.OutputToken, + TotalToken: input.TotalToken, + AIProvider: input.AIProvider, + AIModel: input.AIModel, + StatusCode: input.StatusCode, + ResponseTime: input.ResponseTime, + + Traffic: input.Traffic, + RecordTime: input.RecordTime, + }) + } + return nil + } func (i *imlLogService) OnComplete() { @@ -67,9 +137,10 @@ func (i *imlLogService) UpdateLogSource(ctx context.Context, driver string, inpu if input.Config == nil || *input.Config == "" { return errors.New("config is required") } + now := time.Now() userId := utils.UserId(ctx) - s = &log_source.Log{ + s = &log_source.LogSource{ UUID: input.ID, Cluster: *input.Cluster, Driver: driver, @@ -79,11 +150,19 @@ func (i *imlLogService) UpdateLogSource(ctx context.Context, driver string, inpu CreateAt: now, UpdateAt: now, } + if input.LastPullTime == nil { + s.LastPullAt = time.Now().Add(-24 * time.Hour) + } else { + s.LastPullAt = *input.LastPullTime + } } else { if input.Config != nil && *input.Config != "" { s.Config = *input.Config } + if input.LastPullTime != nil { + s.LastPullAt = *input.LastPullTime + } s.Updater = utils.UserId(ctx) s.UpdateAt = time.Now() } @@ -129,6 +208,10 @@ func (i *imlLogService) Logs(ctx context.Context, driver string, cluster string, return result, count, nil } +func (i *imlLogService) LogRecords(ctx context.Context, driver string, keyword string, start time.Time, end time.Time) ([]*Item, int64, error) { + panic(errors.New("not implemented")) +} + func (i *imlLogService) LogCount(ctx context.Context, driver string, cluster string, conditions map[string]string, spendHour int64, group string) (map[string]int64, error) { d, has := log_driver.GetDriver(driver) if !has { diff --git a/service/log/model.go b/service/log/model.go index c42728ac..0496d23a 100644 --- a/service/log/model.go +++ b/service/log/model.go @@ -7,43 +7,78 @@ import ( ) type Save struct { - ID string - Cluster *string - Config *string + ID string + Cluster *string + Config *string + LastPullTime *time.Time } type Source struct { - ID string - Cluster string - Driver string - Config string - Creator string - Updater string - CreateAt time.Time - UpdateAt time.Time + ID string + Cluster string + Driver string + Config string + Creator string + Updater string + CreateAt time.Time + UpdateAt time.Time + LastPullTime time.Time } -func FromEntity(ov *log_source.Log) *Source { +func FromEntity(ov *log_source.LogSource) *Source { return &Source{ - ID: ov.UUID, - Cluster: ov.Cluster, - Driver: ov.Driver, - Config: ov.Config, - Creator: ov.Creator, - Updater: ov.Updater, - CreateAt: ov.CreateAt, - UpdateAt: ov.UpdateAt, + ID: ov.UUID, + Cluster: ov.Cluster, + Driver: ov.Driver, + Config: ov.Config, + Creator: ov.Creator, + Updater: ov.Updater, + LastPullTime: ov.LastPullAt, + CreateAt: ov.CreateAt, + UpdateAt: ov.UpdateAt, } } -type Item struct { +type InsertLog struct { ID string + Driver string + Strategy string Service string + API string Method string Url string RemoteIP string Consumer string Authorization string + InputToken int64 + OutputToken int64 + TotalToken int64 + AIProvider string + AIModel string + StatusCode int64 + ResponseTime int64 + Traffic int64 + RecordTime time.Time +} + +type Item struct { + ID string + Strategy string + Service string + API string + Method string + Url string + RemoteIP string + Consumer string + Authorization string + InputToken int64 + OutputToken int64 + TotalToken int64 + AIProvider string + AIModel string + StatusCode int64 + ResponseTime int64 + Traffic int64 RecordTime time.Time } diff --git a/service/log/service.go b/service/log/service.go index bedfbf28..60ae45e0 100644 --- a/service/log/service.go +++ b/service/log/service.go @@ -13,8 +13,14 @@ type ILogService interface { UpdateLogSource(ctx context.Context, driver string, input *Save) error GetLogSource(ctx context.Context, driver string) (*Source, error) Logs(ctx context.Context, driver string, cluster string, conditions map[string]string, start time.Time, end time.Time, limit int64, offset int64) ([]*Item, int64, error) + LogRecordsByService(ctx context.Context, serviceId string, start time.Time, end time.Time, page int, size int) ([]*Item, int64, error) LogCount(ctx context.Context, driver string, cluster string, conditions map[string]string, spendHour int64, group string) (map[string]int64, error) LogInfo(ctx context.Context, driver string, cluster string, id string) (*Info, error) + InsertLog(ctx context.Context, driver string, input *InsertLog) error +} + +type ILogUpdateService interface { + UpdateLogSource(ctx context.Context, driver string, input *Save) error } func init() { diff --git a/stores/log-source/model.go b/stores/log-source/model.go index b3214cb1..58ecb09e 100644 --- a/stores/log-source/model.go +++ b/stores/log-source/model.go @@ -2,22 +2,54 @@ package log_source import "time" -type Log struct { - Id int64 `gorm:"column:id;type:BIGINT(20);AUTO_INCREMENT;NOT NULL;comment:id;primary_key;comment:主键ID;"` - UUID string `gorm:"type:varchar(36);not null;column:uuid;uniqueIndex:uuid;comment:UUID;"` - Cluster string `gorm:"column:cluster;type:varchar(36);NOT NULL;comment:集群ID"` - Driver string `gorm:"column:driver;type:VARCHAR(36);NOT NULL;comment:驱动"` - Config string `gorm:"column:config;type:TEXT;NOT NULL;comment:配置"` - Creator string `gorm:"type:varchar(36);not null;column:creator;comment:creator" aovalue:"creator"` - Updater string `gorm:"type:varchar(36);not null;column:updater;comment:updater" aovalue:"updater"` - CreateAt time.Time `gorm:"type:timestamp;NOT NULL;DEFAULT:CURRENT_TIMESTAMP;column:create_at;comment:创建时间"` - UpdateAt time.Time `gorm:"type:timestamp;NOT NULL;DEFAULT:CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;column:update_at;comment:修改时间" json:"update_at"` +type LogSource struct { + Id int64 `gorm:"column:id;type:BIGINT(20);AUTO_INCREMENT;NOT NULL;comment:id;primary_key;comment:主键ID;"` + UUID string `gorm:"type:varchar(36);not null;column:uuid;uniqueIndex:uuid;comment:UUID;"` + Cluster string `gorm:"column:cluster;type:varchar(36);NOT NULL;comment:集群ID"` + Driver string `gorm:"column:driver;type:VARCHAR(36);NOT NULL;comment:驱动"` + Config string `gorm:"column:config;type:TEXT;NOT NULL;comment:配置"` + Creator string `gorm:"type:varchar(36);not null;column:creator;comment:creator" aovalue:"creator"` + Updater string `gorm:"type:varchar(36);not null;column:updater;comment:updater" aovalue:"updater"` + LastPullAt time.Time `gorm:"type:timestamp;NOT NULL;DEFAULT:CURRENT_TIMESTAMP;column:last_pull_at;comment:最后拉取时间"` + CreateAt time.Time `gorm:"type:timestamp;NOT NULL;DEFAULT:CURRENT_TIMESTAMP;column:create_at;comment:创建时间"` + UpdateAt time.Time `gorm:"type:timestamp;NOT NULL;DEFAULT:CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;column:update_at;comment:修改时间" json:"update_at"` } -func (c *Log) IdValue() int64 { +func (c *LogSource) IdValue() int64 { return c.Id } -func (c *Log) TableName() string { +func (c *LogSource) TableName() string { return "log" } + +type LogRecord struct { + Id int64 `gorm:"column:id;type:BIGINT(20);AUTO_INCREMENT;NOT NULL;comment:id;primary_key;comment:主键ID;"` + UUID string `gorm:"type:varchar(36);not null;column:uuid;uniqueIndex:uuid;comment:UUID;"` + Driver string `gorm:"column:driver;type:VARCHAR(36);NOT NULL;comment:驱动"` + Service string `gorm:"column:service;type:varchar(36);NOT NULL;comment:服务ID"` + API string `gorm:"column:api;type:varchar(36);NOT NULL;comment:接口ID"` + Strategy string `gorm:"column:strategy;type:varchar(36);NOT NULL;comment:策略ID"` + Method string `gorm:"column:method;type:varchar(36);NOT NULL;comment:请求方法"` + Url string `gorm:"column:url;type:varchar(255);NOT NULL;comment:请求URL"` + RemoteIP string `gorm:"column:remote_ip;type:varchar(255);NOT NULL;comment:请求IP"` + Consumer string `gorm:"column:consumer;type:varchar(255);NOT NULL;comment:消费者ID"` + Authorization string `gorm:"column:authorization;type:varchar(255);NOT NULL;comment:鉴权ID"` + InputToken int64 `gorm:"column:input_token;type:int(11);NOT NULL;comment:输入令牌"` + OutputToken int64 `gorm:"column:output_token;type:int(11);NOT NULL;comment:输出令牌"` + TotalToken int64 `gorm:"column:total_token;type:int(11);NOT NULL;comment:总令牌"` + AIProvider string `gorm:"column:ai_provider;type:varchar(255);NOT NULL;comment:AI提供商"` + AIModel string `gorm:"column:ai_model;type:varchar(255);NOT NULL;comment:AI模型"` + StatusCode int64 `gorm:"column:status_code;type:int(11);NOT NULL;comment:请求状态码"` + ResponseTime int64 `gorm:"column:response_time;type:int(11);NOT NULL;comment:响应时间"` + Traffic int64 `gorm:"column:traffic;type:BIGINT(20);NOT NULL;comment:流量"` + RecordTime time.Time `gorm:"column:record_time;type:timestamp;NOT NULL;DEFAULT:CURRENT_TIMESTAMP;comment:记录时间"` +} + +func (c *LogRecord) IdValue() int64 { + return c.Id +} + +func (c *LogRecord) TableName() string { + return "log_record" +} diff --git a/stores/log-source/store.go b/stores/log-source/store.go index 7ccfd599..db5800d9 100644 --- a/stores/log-source/store.go +++ b/stores/log-source/store.go @@ -8,15 +8,26 @@ import ( ) type ILogSourceStore interface { - store.IBaseStore[Log] + store.IBaseStore[LogSource] } type storeLogSource struct { - store.Store[Log] + store.Store[LogSource] +} + +type ILogRecordStore interface { + store.IBaseStore[LogRecord] +} + +type storeLogRecord struct { + store.Store[LogRecord] } func init() { autowire.Auto[ILogSourceStore](func() reflect.Value { return reflect.ValueOf(new(storeLogSource)) }) + autowire.Auto[ILogRecordStore](func() reflect.Value { + return reflect.ValueOf(new(storeLogRecord)) + }) }