Compare commits

...

27 Commits

Author SHA1 Message Date
Dot.L f77bd76a14 Merge pull request #301 from APIParkLab/feature/liujian-1.8
Feature/liujian 1.8
2025-05-06 18:53:11 +08:00
ningyv cef548ce7d Merge pull request #299 from APIParkLab/feature/1.8-cx
Feature/1.8 cx
2025-05-06 18:52:12 +08:00
Liujian 00ef4d2cfc update build.sh 2025-05-06 16:01:51 +08:00
Liujian ddd70b0ff5 fix: log detail bug 2025-05-06 15:31:07 +08:00
Liujian 5a1baadf3b fix bug 2025-05-06 14:38:54 +08:00
Liujian cbea45e6e0 update data 2025-05-06 14:11:36 +08:00
Liujian f05457fd2c update data 2025-05-06 12:04:11 +08:00
Liujian bc6875fe9f Merge remote-tracking branch 'origin/feature/1.8-cx' into feature/liujian-1.8 2025-05-06 11:18:15 +08:00
Liujian ef1c48e395 Modify the monitoring table to return field types 2025-05-06 10:43:09 +08:00
Liujian 9572c4157e Merge remote-tracking branch 'github-pro/feature/1.8-cx' into feature/liujian-1.8 2025-04-30 18:55:09 +08:00
Liujian fef49eb32c tmp commit 2025-04-30 18:55:01 +08:00
Liujian 9c1b19a1c7 Merge remote-tracking branch 'github-pro/feature/1.8-cx' into feature/liujian-1.8 2025-04-30 15:57:59 +08:00
Liujian 0cf7f952e2 update service logs 2025-04-30 15:52:48 +08:00
Liujian 94d881cc18 Optimize chart data 2025-04-30 15:05:47 +08:00
Liujian 8927211ea2 fix:ai token monitor bug 2025-04-30 13:59:50 +08:00
Liujian 1d36f4b821 fix monitor bug 2025-04-30 00:24:02 +08:00
Liujian 4e459168df Merge remote-tracking branch 'origin/feature/1.8-cx' into feature/liujian-1.8 2025-04-29 22:56:28 +08:00
Liujian e5f0423a90 Log information returns the newly added Body 2025-04-29 19:23:35 +08:00
Liujian 7dc8d65235 fix bug 2025-04-29 19:16:21 +08:00
Liujian 604a8312ef finish service log module 2025-04-29 19:08:02 +08:00
Liujian a3bebde83c finish log list 2025-04-29 17:44:53 +08:00
Liujian 771c86229d update service overview 2025-04-29 10:18:02 +08:00
Liujian cff536710e finish: monitor overview 2025-04-29 00:34:58 +08:00
Dot.L a22759136e Merge pull request #298 from APIParkLab/feature/1.7-liujian
update docker build script
2025-04-24 16:03:14 +08:00
Liujian b8ebbac2b8 update docker build script 2025-04-24 16:02:32 +08:00
Dot.L 9c4590db07 Merge pull request #297 from APIParkLab/feature/1.7-liujian
Fix: Apikey getting md5 when calling MCP Server at service level
2025-04-22 18:08:51 +08:00
Liujian 7ba8a57793 Fix: Apikey getting md5 when calling MCP Server at service level 2025-04-22 18:08:24 +08:00
51 changed files with 4229 additions and 903 deletions
+2 -1
View File
@@ -7,4 +7,5 @@
/.vscode/
.air.toml
/tmp/
/work
/work
/cmd/
+77
View File
@@ -0,0 +1,77 @@
package common
import (
"fmt"
"strconv"
)
func FormatCountInt64(count int64) string {
switch {
case count < 1000:
return strconv.FormatInt(count, 10)
case count < 1000000:
return fmt.Sprintf("%.1fK", float64(count)/1000)
case count < 1000000000:
return fmt.Sprintf("%.1fM", float64(count)/1000000)
case count < 1000000000000:
return fmt.Sprintf("%.1fB", float64(count)/1000000000)
default:
return fmt.Sprintf("%.1fT", float64(count)/1000000000000)
}
}
func FormatCountFloat64(count float64) string {
switch {
case count < 1000:
return fmt.Sprintf("%.1f", count)
case count < 1000000:
return fmt.Sprintf("%.1fK", count/1000)
case count < 1000000000:
return fmt.Sprintf("%.1fM", count/1000000)
case count < 1000000000000:
return fmt.Sprintf("%.1fB", count/1000000000)
default:
return fmt.Sprintf("%.1fT", count/1000000000000)
}
}
func FormatTime(t int64) string {
if t < 1000 {
return strconv.FormatInt(t, 10) + "ms"
}
if t < 1000000 {
return fmt.Sprintf("%.1fs", float64(t)/1000)
}
if t < 1000000000 {
return fmt.Sprintf("%.1fmin", float64(t)/1000000)
}
if t < 1000000000000 {
return fmt.Sprintf("%.1fhour", float64(t)/1000000000)
}
return fmt.Sprintf("%.1D", float64(t)/1000000000000)
}
func FormatByte(b int64) string {
const (
KB = 1000
MB = KB * 1000
GB = MB * 1000
TB = GB * 1000
PB = TB * 1000
)
switch {
case b < KB:
return fmt.Sprintf("%dB", b)
case b < MB:
return fmt.Sprintf("%.1fKB", float64(b)/KB)
case b < GB:
return fmt.Sprintf("%.1fMB", float64(b)/MB)
case b < TB:
return fmt.Sprintf("%.1fGB", float64(b)/GB)
case b < PB:
return fmt.Sprintf("%.1fTB", float64(b)/TB)
default:
return fmt.Sprintf("%.1fPB", float64(b)/PB)
}
}
+2
View File
@@ -29,6 +29,8 @@ func FmtIntFromInterface(val interface{}) int64 {
return int64(ret)
case int:
return int64(ret)
case float64:
return int64(ret)
default:
return 0
}
+61
View File
@@ -2,6 +2,7 @@ package monitor
import (
"fmt"
"strconv"
"time"
"github.com/APIParkLab/APIPark/module/monitor"
@@ -17,6 +18,66 @@ type imlMonitorStatisticController struct {
module monitor.IMonitorStatisticModule `autowired:""`
}
func (i *imlMonitorStatisticController) ChartRestOverview(ctx *gin.Context, start string, end string) (*monitor_dto.ChartRestOverview, error) {
s, e, err := formatTime(start, end)
if err != nil {
return nil, err
}
return i.module.RestChartOverview(ctx, "", s, e)
}
func (i *imlMonitorStatisticController) ChartAIOverview(ctx *gin.Context, start string, end string) (*monitor_dto.ChartAIOverview, error) {
s, e, err := formatTime(start, end)
if err != nil {
return nil, err
}
return i.module.AIChartOverview(ctx, "", s, e)
}
func (i *imlMonitorStatisticController) AITopN(ctx *gin.Context, start string, end string, limit string) ([]*monitor_dto.TopN, []*monitor_dto.TopN, error) {
s, e, err := formatTime(start, end)
if err != nil {
return nil, nil, err
}
l, err := strconv.Atoi(limit)
if err != nil {
if limit == "" {
l = 10
} else {
return nil, nil, fmt.Errorf("parse limit %s error: %w", limit, err)
}
}
return i.module.Top(ctx, "", s, e, l, "ai")
}
func formatTime(start string, end string) (int64, int64, error) {
s, err := strconv.ParseInt(start, 10, 64)
if err != nil {
return 0, 0, fmt.Errorf("parse start time %s error: %w", start, err)
}
e, err := strconv.ParseInt(end, 10, 64)
if err != nil {
return 0, 0, fmt.Errorf("parse end time %s error: %w", end, err)
}
return s, e, nil
}
func (i *imlMonitorStatisticController) RestTopN(ctx *gin.Context, start string, end string, limit string) ([]*monitor_dto.TopN, []*monitor_dto.TopN, error) {
s, e, err := formatTime(start, end)
if err != nil {
return nil, nil, err
}
l, err := strconv.Atoi(limit)
if err != nil {
if limit == "" {
l = 10
} else {
return nil, nil, fmt.Errorf("parse limit %s error: %w", limit, err)
}
}
return i.module.Top(ctx, "", s, e, l, "rest")
}
func (i *imlMonitorStatisticController) Statistics(ctx *gin.Context, dataType string, input *monitor_dto.StatisticInput) (interface{}, error) {
switch dataType {
case monitor_dto.DataTypeApi:
+5
View File
@@ -22,6 +22,11 @@ type IMonitorStatisticController interface {
InvokeTrendInner(ctx *gin.Context, dataType string, typ string, api string, provider string, subscriber string, input *monitor_dto.CommonInput) (*monitor_dto.MonInvokeCountTrend, string, error)
StatisticsInner(ctx *gin.Context, dataType string, typ string, id string, input *monitor_dto.StatisticInput) (interface{}, error)
ChartRestOverview(ctx *gin.Context, start string, end string) (*monitor_dto.ChartRestOverview, error)
ChartAIOverview(ctx *gin.Context, start string, end string) (*monitor_dto.ChartAIOverview, error)
AITopN(ctx *gin.Context, start string, end string, limit string) ([]*monitor_dto.TopN, []*monitor_dto.TopN, error)
RestTopN(ctx *gin.Context, start string, end string, limit string) ([]*monitor_dto.TopN, []*monitor_dto.TopN, error)
}
type IMonitorConfigController interface {
+208 -18
View File
@@ -5,9 +5,13 @@ import (
"fmt"
"io"
"net/http"
"strconv"
"strings"
"time"
"github.com/APIParkLab/APIPark/module/monitor"
monitor_dto "github.com/APIParkLab/APIPark/module/monitor/dto"
ai_provider_local "github.com/APIParkLab/APIPark/ai-provider/local"
subscribe_dto "github.com/APIParkLab/APIPark/module/subscribe/dto"
@@ -55,10 +59,6 @@ import (
"github.com/google/uuid"
)
//var (
// ollamaConfig = "{\n \"mirostat\": 0,\n \"mirostat_eta\": 0.1,\n \"mirostat_tau\": 5.0,\n \"num_ctx\": 4096,\n \"repeat_last_n\":64,\n \"repeat_penalty\": 1.1,\n \"temperature\": 0.7,\n \"seed\": 42,\n \"num_predict\": 42,\n \"top_k\": 40,\n \"top_p\": 0.9,\n \"min_p\": 0.5\n}\n"
//)
var (
_ IServiceController = (*imlServiceController)(nil)
@@ -66,20 +66,210 @@ var (
)
type imlServiceController struct {
module service.IServiceModule `autowired:""`
docModule service.IServiceDocModule `autowired:""`
subscribeModule subscribe.ISubscribeModule `autowired:""`
aiAPIModule ai_api.IAPIModule `autowired:""`
routerModule router.IRouterModule `autowired:""`
apiDocModule api_doc.IAPIDocModule `autowired:""`
providerModule ai.IProviderModule `autowired:""`
aiLocalModel ai_local.ILocalModelModule `autowired:""`
appModule service.IAppModule `autowired:""`
upstreamModule upstream.IUpstreamModule `autowired:""`
settingModule system.ISettingModule `autowired:""`
teamModule team.ITeamModule `autowired:""`
catalogueModule catalogue.ICatalogueModule `autowired:""`
transaction store.ITransaction `autowired:""`
module service.IServiceModule `autowired:""`
docModule service.IServiceDocModule `autowired:""`
subscribeModule subscribe.ISubscribeModule `autowired:""`
aiAPIModule ai_api.IAPIModule `autowired:""`
routerModule router.IRouterModule `autowired:""`
apiDocModule api_doc.IAPIDocModule `autowired:""`
providerModule ai.IProviderModule `autowired:""`
aiLocalModel ai_local.ILocalModelModule `autowired:""`
appModule service.IAppModule `autowired:""`
upstreamModule upstream.IUpstreamModule `autowired:""`
settingModule system.ISettingModule `autowired:""`
teamModule team.ITeamModule `autowired:""`
catalogueModule catalogue.ICatalogueModule `autowired:""`
monitorModule monitor.IMonitorStatisticModule `autowired:""`
monitorConfigModule monitor.IMonitorConfigModule `autowired:""`
transaction store.ITransaction `autowired:""`
}
func (i *imlServiceController) RestLogInfo(ctx *gin.Context, serviceId string, logId string) (*service_dto.RestLogInfo, error) {
return i.module.RestLogInfo(ctx, serviceId, logId)
}
func (i *imlServiceController) AILogInfo(ctx *gin.Context, serviceId string, logId string) (*service_dto.AILogInfo, error) {
return i.module.AILogInfo(ctx, serviceId, logId)
}
func (i *imlServiceController) AILogs(ctx *gin.Context, serviceId string, start string, end string, page string, size string) ([]*service_dto.AILogItem, int64, error) {
s, e, err := formatTime(start, end)
if err != nil {
return nil, 0, err
}
if serviceId == "" {
return nil, 0, fmt.Errorf("service id is empty")
}
if page == "" {
page = "1"
}
if size == "" {
size = "20"
}
p, err := strconv.Atoi(page)
if err != nil {
return nil, 0, err
}
ps, err := strconv.Atoi(size)
if err != nil {
return nil, 0, err
}
return i.module.AILogs(ctx, serviceId, s, e, p, ps)
}
func (i *imlServiceController) RestLogs(ctx *gin.Context, serviceId string, start string, end string, page string, size string) ([]*service_dto.RestLogItem, int64, error) {
s, e, err := formatTime(start, end)
if err != nil {
return nil, 0, err
}
if serviceId == "" {
return nil, 0, fmt.Errorf("service id is empty")
}
if page == "" {
page = "1"
}
if size == "" {
size = "20"
}
p, err := strconv.Atoi(page)
if err != nil {
return nil, 0, err
}
ps, err := strconv.Atoi(size)
if err != nil {
return nil, 0, err
}
return i.module.RestLogs(ctx, serviceId, s, e, p, ps)
}
func (i *imlServiceController) ServiceOverview(ctx *gin.Context, serviceId string) (*service_dto.Overview, error) {
o, err := i.module.ServiceOverview(ctx, serviceId)
if err != nil {
return nil, err
}
cfg, err := i.monitorConfigModule.GetMonitorConfig(ctx)
if err != nil {
return nil, err
}
if len(cfg.Config) < 1 {
return o, nil
}
statistics, err := i.monitorModule.ProviderStatistics(ctx, &monitor_dto.StatisticInput{
Services: []string{serviceId},
CommonInput: &monitor_dto.CommonInput{
Start: time.Now().Add(-24 * 30 * time.Hour).Unix(),
End: time.Now().Unix(),
},
})
if err != nil {
return nil, err
}
if len(statistics) < 1 {
return o, nil
}
o.InvokeNum = statistics[0].RequestTotal
return o, nil
}
func (i *imlServiceController) AIChartOverview(ctx *gin.Context, serviceId string, start string, end string) (*monitor_dto.ServiceChartAIOverview, error) {
s, e, err := formatTime(start, end)
if err != nil {
return nil, err
}
if serviceId == "" {
return nil, fmt.Errorf("service is required")
}
so, err := i.module.ServiceOverview(ctx, serviceId)
if err != nil {
return nil, err
}
result := &monitor_dto.ServiceChartAIOverview{
EnableMCP: so.EnableMCP,
SubscriberNum: so.SubscriberNum,
APINum: so.APINum,
ServiceKind: so.ServiceKind,
}
cfg, err := i.monitorConfigModule.GetMonitorConfig(ctx)
if err != nil {
return nil, err
}
if len(cfg.Config) < 1 {
return result, nil
}
o, err := i.monitorModule.AIChartOverview(ctx, serviceId, s, e)
if err != nil {
return nil, err
}
result.AvailableMonitor = true
result.ChartAIOverview = o
return result, nil
}
func (i *imlServiceController) RestChartOverview(ctx *gin.Context, serviceId string, start string, end string) (*monitor_dto.ServiceChartRestOverview, error) {
s, e, err := formatTime(start, end)
if err != nil {
return nil, err
}
if serviceId == "" {
return nil, fmt.Errorf("service is required")
}
so, err := i.module.ServiceOverview(ctx, serviceId)
if err != nil {
return nil, err
}
result := &monitor_dto.ServiceChartRestOverview{
EnableMCP: so.EnableMCP,
SubscriberNum: so.SubscriberNum,
APINum: so.APINum,
ServiceKind: so.ServiceKind,
}
cfg, err := i.monitorConfigModule.GetMonitorConfig(ctx)
if err != nil {
return nil, err
}
if len(cfg.Config) < 1 {
return result, nil
}
o, err := i.monitorModule.RestChartOverview(ctx, serviceId, s, e)
if err != nil {
return nil, err
}
result.AvailableMonitor = true
result.ChartRestOverview = o
return result, nil
}
func formatTime(start string, end string) (int64, int64, error) {
s, err := strconv.ParseInt(start, 10, 64)
if err != nil {
return 0, 0, fmt.Errorf("parse start time %s error: %w", start, err)
}
e, err := strconv.ParseInt(end, 10, 64)
if err != nil {
return 0, 0, fmt.Errorf("parse end time %s error: %w", end, err)
}
return s, e, nil
}
func (i *imlServiceController) Top10(ctx *gin.Context, serviceId string, start string, end string) ([]*monitor_dto.TopN, []*monitor_dto.TopN, error) {
if serviceId == "" {
return nil, nil, fmt.Errorf("serviceId is required")
}
info, err := i.module.Get(ctx, serviceId)
if err != nil {
return nil, nil, err
}
s, e, err := formatTime(start, end)
if err != nil {
return nil, nil, err
}
return i.monitorModule.Top(ctx, serviceId, s, e, 10, info.ServiceKind)
}
func (i *imlServiceController) QuickCreateAIService(ctx *gin.Context, input *service_dto.QuickCreateAIService) error {
+15
View File
@@ -3,6 +3,8 @@ package service
import (
"reflect"
monitor_dto "github.com/APIParkLab/APIPark/module/monitor/dto"
service_dto "github.com/APIParkLab/APIPark/module/service/dto"
"github.com/gin-gonic/gin"
@@ -32,6 +34,19 @@ type IServiceController interface {
Swagger(ctx *gin.Context)
ExportSwagger(ctx *gin.Context)
Top10(ctx *gin.Context, serviceId string, start string, end string) ([]*monitor_dto.TopN, []*monitor_dto.TopN, error)
AIChartOverview(ctx *gin.Context, serviceId string, start string, end string) (*monitor_dto.ServiceChartAIOverview, error)
RestChartOverview(ctx *gin.Context, serviceId string, start string, end string) (*monitor_dto.ServiceChartRestOverview, error)
ServiceOverview(ctx *gin.Context, serviceId string) (*service_dto.Overview, error)
AILogs(ctx *gin.Context, serviceId string, start string, end string, page string, size string) ([]*service_dto.AILogItem, int64, error)
RestLogs(ctx *gin.Context, serviceId string, start string, end string, page string, size string) ([]*service_dto.RestLogItem, int64, error)
RestLogInfo(ctx *gin.Context, serviceId string, logId string) (*service_dto.RestLogInfo, error)
AILogInfo(ctx *gin.Context, serviceId string, logId string) (*service_dto.AILogInfo, error)
}
type IAppController interface {
+1 -1
View File
@@ -7,7 +7,7 @@ toolchain go1.23.6
require (
github.com/eolinker/ap-account v1.0.15
github.com/eolinker/eosc v0.18.3
github.com/eolinker/go-common v1.1.6
github.com/eolinker/go-common v1.1.7
github.com/gabriel-vasile/mimetype v1.4.4
github.com/getkin/kin-openapi v0.127.0
github.com/gin-contrib/gzip v1.0.1
+2 -2
View File
@@ -32,8 +32,8 @@ github.com/eolinker/ap-account v1.0.15 h1:n6DJeL6RHZ8eLlZUcY2U3H4d/GPaA5oelAx3R0
github.com/eolinker/ap-account v1.0.15/go.mod h1:zm/Ivs6waJ/M/nEszhpPmM6g50y/MKO+5eABFAdeD0g=
github.com/eolinker/eosc v0.18.3 h1:3IK5HkAPnJRfLbQ0FR7kWsZr6Y/OiqqGazvN1q2BL5A=
github.com/eolinker/eosc v0.18.3/go.mod h1:O9PQQXFCpB6fjHf+oFt/LN6EOAv779ItbMixMKCfTfk=
github.com/eolinker/go-common v1.1.6 h1:s+NaQL0InjX/MwWY53+8y8qzAgsULIUc4U6nWXWQ2Nw=
github.com/eolinker/go-common v1.1.6/go.mod h1:Kb/jENMN1mApnodvRgV4YwO9FJby1Jkt2EUjrBjvSX4=
github.com/eolinker/go-common v1.1.7 h1:bi7wDmlCYQGjS3k8Bz/o+Mo9aMJAzmPsBLXWurxPfwk=
github.com/eolinker/go-common v1.1.7/go.mod h1:Kb/jENMN1mApnodvRgV4YwO9FJby1Jkt2EUjrBjvSX4=
github.com/gabriel-vasile/mimetype v1.4.4 h1:QjV6pZ7/XZ7ryI2KuyeEDE8wnh7fHP9YnQy+R0LnH8I=
github.com/gabriel-vasile/mimetype v1.4.4/go.mod h1:JwLei5XPtWdGiMFB5Pjle1oEeoSeEuJfJE+TtfvdB/s=
github.com/getkin/kin-openapi v0.127.0 h1:Mghqi3Dhryf3F8vR370nN67pAERW+3a95vomb3MAREY=
+2 -1
View File
@@ -9,7 +9,8 @@ import (
type ILogDriver interface {
LogInfo(clusterId string, id string) (*LogInfo, error)
LogCount(clusterId string, conditions map[string]string, spendHour int64, group string) (map[string]int64, error)
Logs(clusterId string, conditions map[string]string, start time.Time, end time.Time, limit int64, offset int64) ([]*Log, int64, error)
Logs(clusterId string, conditions map[string]string, start time.Time, end time.Time, limit int64, offset int64) ([]*LogItem, int64, error)
LogRecords(clusterId string, start time.Time, end time.Time) ([]*LogItem, error)
}
var (
+14 -2
View File
@@ -4,22 +4,34 @@ import (
"time"
)
type Log struct {
type LogItem struct {
ID string
Strategy string
Service string
API string
Method string
Url string
RemoteIP string
Consumer string
Authorization string
InputToken int64
OutputToken int64
TotalToken int64
AIProvider string
AIModel string
StatusCode int64
ResponseTime int64
Traffic int64
RecordTime time.Time
}
type LogInfo struct {
ID string
*LogItem
ContentType string
RequestBody string
ProxyBody string
ProxyResponseBody string
ResponseBody string
RequestHeader string
ResponseHeader string
}
+34 -19
View File
@@ -49,29 +49,44 @@ type LogCount struct {
}
type LogInfo struct {
Stream *LogDetail `json:"stream"`
Stream *LogDetail `json:"stream"`
Values []interface{} `json:"values"`
}
type LogDetail struct {
Api string `json:"api"`
Application string `json:"application"`
Strategy string `json:"strategy"`
ContentType string `json:"content_type"`
Cluster string `json:"cluster"`
Msec string `json:"msec"`
Node string `json:"node"`
RequestId string `json:"request_id"`
RequestMethod string `json:"request_method"`
RequestScheme string `json:"request_scheme"`
RequestTime string `json:"request_time"`
RequestUri string `json:"request_uri"`
type LogBodyDetail struct {
RequestBody string `json:"request_body"`
ProxyBody string `json:"proxy_body"`
ResponseBody string `json:"response_body"`
ProxyResponseBody string `json:"proxy_response_body"`
Service string `json:"service"`
Provider string `json:"provider"`
Authorization string `json:"authorization"`
SrcIp string `json:"src_ip"`
Status string `json:"status"`
}
type LogDetail struct {
Api string `json:"api"`
Application string `json:"application"`
Strategy string `json:"strategy"`
ContentType string `json:"content_type"`
Cluster string `json:"cluster"`
Msec string `json:"msec"`
Node string `json:"node"`
RequestId string `json:"request_id"`
RequestMethod string `json:"request_method"`
RequestScheme string `json:"request_scheme"`
RequestHeader string `json:"request_header"`
RequestTime string `json:"request_time"`
RequestUri string `json:"request_uri"`
RequestBody string `json:"request_body"`
ProxyBody string `json:"proxy_body"`
ResponseBody string `json:"response_body"`
ResponseHeader string `json:"response_header"`
ProxyResponseBody string `json:"proxy_response_body"`
Service string `json:"service"`
Provider string `json:"provider"`
Authorization string `json:"authorization"`
SrcIp string `json:"src_ip"`
Status string `json:"status"`
AIProvider string `json:"ai_provider"`
AIModel string `json:"ai_model"`
AIModelInputToken interface{} `json:"ai_model_input_token"`
AIModelOutputToken interface{} `json:"ai_model_output_token"`
AIModelTotalToken interface{} `json:"ai_model_total_token"`
}
+97 -21
View File
@@ -81,13 +81,39 @@ func (d *Driver) LogInfo(clusterId string, id string) (*log_driver.LogInfo, erro
return nil, fmt.Errorf("no log found")
}
stream := list[0].Stream
requestBody := stream.RequestBody
proxyRequestBody := stream.ProxyBody
proxyResponseBody := stream.ProxyResponseBody
responseBody := stream.ResponseBody
if len(list[0].Values) > 0 {
switch t := list[0].Values[0].(type) {
case []interface{}:
if len(t) > 1 {
v, ok := t[1].(string)
if !ok {
break
}
var tmp LogBodyDetail
err = json.Unmarshal([]byte(v), &tmp)
if err == nil {
requestBody = tmp.RequestBody
proxyRequestBody = tmp.ProxyBody
responseBody = tmp.ResponseBody
proxyResponseBody = tmp.ProxyBody
}
}
}
}
msec, _ := strconv.ParseInt(stream.Msec, 10, 64)
return &log_driver.LogInfo{
ID: stream.RequestId,
LogItem: ToLogItem(stream, msec),
ContentType: stream.ContentType,
RequestBody: stream.RequestBody,
ProxyBody: stream.ProxyBody,
ProxyResponseBody: stream.ProxyResponseBody,
ResponseBody: stream.ResponseBody,
RequestBody: requestBody,
ProxyBody: proxyRequestBody,
ProxyResponseBody: proxyResponseBody,
ResponseBody: responseBody,
RequestHeader: stream.RequestHeader,
ResponseHeader: stream.ResponseHeader,
}, nil
}
@@ -132,7 +158,25 @@ func (d *Driver) LogCount(clusterId string, conditions map[string]string, spendH
return result, nil
}
func (d *Driver) Logs(clusterId string, conditions map[string]string, start time.Time, end time.Time, limit int64, offset int64) ([]*log_driver.Log, int64, error) {
func (d *Driver) LogRecords(clusterId string, start time.Time, end time.Time) ([]*log_driver.LogItem, error) {
if start.After(end) {
return nil, fmt.Errorf("start time is greater than end time")
}
queries := url.Values{}
queries.Set("query", fmt.Sprintf("{cluster=\"%s\"} | json", clusterId))
queries.Set("direction", "backward")
queries.Set("start", strconv.FormatInt(start.UnixNano(), 10))
queries.Set("end", strconv.FormatInt(end.UnixNano(), 10))
log.Debug("query is ", queries.Get("query"))
logs, err := d.recuseLogs(queries, end, 1)
if err != nil {
return nil, err
}
return logs, nil
}
func (d *Driver) Logs(clusterId string, conditions map[string]string, start time.Time, end time.Time, limit int64, offset int64) ([]*log_driver.LogItem, int64, error) {
if start.After(end) {
return nil, 0, fmt.Errorf("start time is greater than end time")
}
@@ -177,7 +221,30 @@ func (d *Driver) Logs(clusterId string, conditions map[string]string, start time
return logs, count, nil
}
func (d *Driver) recuseLogs(queries url.Values, end time.Time, offset int64) ([]*log_driver.Log, error) {
func ToLogItem(detail *LogDetail, msec int64) *log_driver.LogItem {
return &log_driver.LogItem{
ID: detail.RequestId,
Strategy: detail.Strategy,
Service: detail.Provider,
API: detail.Api,
Method: detail.RequestMethod,
Url: detail.RequestUri,
RemoteIP: detail.SrcIp,
Consumer: detail.Application,
Authorization: detail.Authorization,
InputToken: parseToInt64(detail.AIModelInputToken),
OutputToken: parseToInt64(detail.AIModelOutputToken),
TotalToken: parseToInt64(detail.AIModelTotalToken),
AIProvider: detail.AIProvider,
AIModel: detail.AIModel,
StatusCode: parseToInt64(detail.Status),
ResponseTime: parseToInt64(detail.RequestTime),
Traffic: int64(len(detail.ResponseBody) + len(detail.RequestBody)),
RecordTime: time.UnixMilli(msec),
}
}
func (d *Driver) recuseLogs(queries url.Values, end time.Time, offset int64) ([]*log_driver.LogItem, error) {
queries.Set("end", strconv.FormatInt(end.UnixNano(), 10))
list, err := send[LogInfo](http.MethodGet, fmt.Sprintf("%s/loki/api/v1/query_range", d.url), d.headers, queries, "")
if err != nil {
@@ -198,24 +265,13 @@ func (d *Driver) recuseLogs(queries url.Values, end time.Time, offset int64) ([]
}
return d.recuseLogs(queries, time.UnixMilli(msec), offset-1)
}
logs := make([]*log_driver.Log, 0, len(list))
logs := make([]*log_driver.LogItem, 0, len(list))
for _, l := range list {
if l.Stream == nil {
continue
}
detail := l.Stream
msec, _ := strconv.ParseInt(detail.Msec, 10, 64)
logs = append(logs, &log_driver.Log{
ID: detail.RequestId,
Service: detail.Provider,
Method: detail.RequestMethod,
Url: detail.RequestUri,
RemoteIP: detail.SrcIp,
Consumer: detail.Application,
Authorization: detail.Authorization,
RecordTime: time.UnixMilli(msec),
})
msec, _ := strconv.ParseInt(l.Stream.Msec, 10, 64)
logs = append(logs, ToLogItem(l.Stream, msec))
}
sort.Slice(logs, func(i, j int) bool {
return logs[i].RecordTime.After(logs[j].RecordTime)
@@ -223,6 +279,26 @@ func (d *Driver) recuseLogs(queries url.Values, end time.Time, offset int64) ([]
return logs, nil
}
func parseToInt64(v interface{}) int64 {
switch t := v.(type) {
case int:
return int64(t)
case int64:
return t
case string:
if v == "" {
return 0
}
i, err := strconv.ParseInt(t, 10, 64)
if err != nil {
return 0
}
return i
default:
return 0
}
}
func (d *Driver) logCount(clusterId string, conditions map[string]string, start time.Time, end time.Time) (int64, error) {
// 先查在这段时间内符合条件的日志数量
queries := url.Values{}
+5 -5
View File
@@ -44,12 +44,12 @@ func TestLoki(t *testing.T) {
// if err != nil {
// t.Fatalf("failed to send request: %v", err)
// }
// t.Log(time.Now().Sub(a))
// t.LogItem(time.Now().Sub(a))
// data, err := json.Marshal(result)
// if err != nil {
// t.Fatalf("failed to marshal data: %v", err)
// }
// t.Log(string(data))
// t.LogItem(string(data))
//}
//
//func TestLokiLogCount(t *testing.T) {
@@ -67,7 +67,7 @@ func TestLoki(t *testing.T) {
// if err != nil {
// t.Fatalf("failed to marshal data: %v", err)
// }
// t.Log(string(data))
// t.LogItem(string(data))
//}
//
//func TestLokiLogs(t *testing.T) {
@@ -83,7 +83,7 @@ func TestLoki(t *testing.T) {
// queries.Set("limit", "1")
// now = time.Now()
// result, err := send[map[string]interface{}](http.MethodGet, "http://localhost:3100/loki/api/v1/query_range", headers, queries, "")
// t.Log(time.Now().Sub(now))
// t.LogItem(time.Now().Sub(now))
// if err != nil {
// t.Fatalf("failed to send request: %v", err)
// }
@@ -91,5 +91,5 @@ func TestLoki(t *testing.T) {
// if err != nil {
// t.Fatalf("failed to marshal data: %v", err)
// }
// t.Log(string(data))
// t.LogItem(string(data))
//}
+1 -1
View File
@@ -124,7 +124,7 @@ func (t *Tool) RegisterMCP(s *server.MCPServer) {
}
apikey := utils.Label(ctx, "apikey")
if apikey != "" {
req.Header.Set("Authorization", utils.Md5(apikey))
req.Header.Set("Authorization", apikey)
}
resp, err := client.Do(req)
+2 -1
View File
@@ -718,7 +718,8 @@ func (i *imlProviderModule) getAiProviders(ctx context.Context) ([]*gateway.Dyna
}
model, has := driver.GetModel(l.DefaultLLM)
if !has {
return nil, fmt.Errorf("model not found: %s", l.DefaultLLM)
continue
//return nil, fmt.Errorf("model not found: %s", l.DefaultLLM)
}
cfg := make(map[string]interface{})
cfg["provider"] = l.Id
+179 -4
View File
@@ -4,9 +4,14 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
"github.com/eolinker/go-common/server"
log_driver "github.com/APIParkLab/APIPark/log-driver"
"github.com/eolinker/go-common/register"
"github.com/eolinker/go-common/utils"
"github.com/APIParkLab/APIPark/gateway"
@@ -16,11 +21,11 @@ import (
"github.com/APIParkLab/APIPark/service/cluster"
"github.com/eolinker/go-common/auto"
log_dto "github.com/APIParkLab/APIPark/module/log/dto"
"github.com/APIParkLab/APIPark/service/log"
eosc_log "github.com/eolinker/eosc/log"
log_print "github.com/eolinker/eosc/log"
"github.com/eolinker/go-common/auto"
)
var _ ILogModule = (*imlLogModule)(nil)
@@ -28,7 +33,10 @@ var _ ILogModule = (*imlLogModule)(nil)
type imlLogModule struct {
service log.ILogService `autowired:""`
clusterService cluster.IClusterService `autowired:""`
transaction store.ITransaction `autowired:""`
transaction store.ITransaction `autowired:""`
//scheduleCtx context.Context
scheduleCancel context.CancelFunc
}
var labels = map[string]string{
@@ -54,6 +62,7 @@ var logFormatter = map[string]interface{}{
"$proxy_host",
"$proxy_header",
"$proxy_addr",
"$response_header",
"$response_headers",
"$status",
"$content_type",
@@ -70,6 +79,11 @@ var logFormatter = map[string]interface{}{
"$authorization",
"$response_body",
"$proxy_response_body",
"$ai_provider",
"$ai_model",
"$ai_model_input_token",
"$ai_model_output_token",
"$ai_model_total_token",
},
}
@@ -135,6 +149,11 @@ func (i *imlLogModule) Save(ctx context.Context, driver string, input *log_dto.S
return err
}
log_driver.SetDriver(driver, d)
newCtx, cancel := context.WithCancel(context.Background())
newCtx = utils.SetUserId(newCtx, "admin")
i.scheduleCancel()
i.scheduleCancel = cancel
i.scheduleUpdateLogRecord(newCtx)
return nil
})
}
@@ -164,8 +183,15 @@ func (i *imlLogModule) Get(ctx context.Context, driver string) (*log_dto.LogSour
}, nil
}
func (i *imlLogModule) OnComplete() {
func (i *imlLogModule) OnInit() {
register.Handle(func(v server.Server) {
ctx, cancel := context.WithCancel(context.Background())
ctx = utils.SetUserId(ctx, "admin")
//i.scheduleCtx = ctx
i.scheduleCancel = cancel
i.scheduleUpdateLogRecord(ctx)
})
}
func (i *imlLogModule) initGateway(ctx context.Context, clusterId string, clientDriver gateway.IClientDriver) error {
@@ -222,3 +248,152 @@ func (i *imlLogModule) initGateway(ctx context.Context, clusterId string, client
return nil
}
const (
oneSecond = 1
oneMinute = 60
oneHour = 60 * oneMinute
oneDay = 24 * oneHour
)
// 定时更新历史记录
func (i *imlLogModule) scheduleUpdateLogRecord(ctx context.Context) {
driver, has := log_driver.GetDriver("loki")
if !has {
eosc_log.Error("driver loki not found")
return
}
info, err := i.service.GetLogSource(ctx, "loki")
if err != nil {
if !errors.Is(err, gorm.ErrRecordNotFound) {
eosc_log.Errorf("get log source loki error: %s", err)
return
}
return
}
now := time.Now()
before90Days := now.Add(-7 * 24 * time.Hour)
beginTime := before90Days
if info.LastPullTime.After(before90Days) {
before90Days = info.LastPullTime
}
pauseTime := now
historyFinish := false
go func() {
eosc_log.Infof("start update history log record,start time: %s", beginTime.Format("2006-01-02 15:04:05"))
ticket := time.NewTicker(1 * time.Minute)
defer ticket.Stop()
for {
now = time.Now()
select {
case <-ctx.Done():
return
case <-ticket.C:
switch {
case now.Sub(beginTime) > oneDay:
endTime := beginTime.Add(oneDay)
err = i.updateLogRecord(ctx, driver, beginTime, endTime)
if err != nil {
eosc_log.Errorf("update log record error: %s", err)
continue
}
err = i.service.UpdateLogSource(ctx, "loki", &log.Save{
LastPullTime: &endTime,
})
if err != nil {
eosc_log.Errorf("update log source error: %s", err)
continue
}
beginTime = endTime
case now.Sub(pauseTime) <= oneDay:
endTime := pauseTime
err = i.updateLogRecord(ctx, driver, beginTime, endTime)
if err != nil {
eosc_log.Errorf("update log record error: %s", err)
historyFinish = true
return
}
historyFinish = true
err = i.service.UpdateLogSource(ctx, "loki", &log.Save{
LastPullTime: &endTime,
})
if err != nil {
eosc_log.Errorf("update log source error: %s", err)
return
}
eosc_log.Infof("update log record finish")
return
}
}
}
}()
go func() {
eosc_log.Infof("start update running log record,start time: %s", pauseTime.Format("2006-01-02 15:04:05"))
ticket := time.NewTicker(10 * time.Second)
defer ticket.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticket.C:
end := time.Now()
start := end.Add(-1 * time.Minute)
err = i.updateLogRecord(ctx, driver, start, end)
if err != nil {
eosc_log.Errorf("update log record error: %s", err)
continue
}
if historyFinish {
err = i.service.UpdateLogSource(ctx, "loki", &log.Save{
LastPullTime: &end,
})
if err != nil {
eosc_log.Errorf("update log source error: %s", err)
continue
}
}
}
}
}()
}
func (i *imlLogModule) updateLogRecord(ctx context.Context, driver log_driver.ILogDriver, start, end time.Time) error {
c, err := i.clusterService.Get(ctx, cluster.DefaultClusterID)
if err != nil {
return fmt.Errorf("cluster %s not found", cluster.DefaultClusterID)
}
logs, err := driver.LogRecords(c.Cluster, start, end)
if err != nil {
return fmt.Errorf("get log records error: %s", err)
}
for _, l := range logs {
err = i.service.InsertLog(ctx, "loki", &log.InsertLog{
ID: l.ID,
Driver: "loki",
Strategy: l.Strategy,
API: l.API,
Service: l.Service,
Method: l.Method,
Url: l.Url,
RemoteIP: l.RemoteIP,
Consumer: l.Consumer,
Authorization: l.Authorization,
InputToken: l.InputToken,
OutputToken: l.OutputToken,
TotalToken: l.TotalToken,
AIProvider: l.AIProvider,
AIModel: l.AIModel,
StatusCode: l.StatusCode,
ResponseTime: l.ResponseTime,
Traffic: l.Traffic,
RecordTime: l.RecordTime,
})
if err != nil {
eosc_log.Errorf("insert log record error: %s,log id: %s", err, l.ID)
continue
}
}
return nil
}
+1 -1
View File
@@ -270,7 +270,7 @@ func (i *imlMcpModule) Invoke(ctx context.Context, req mcp.CallToolRequest) (*mc
queryParam.Add(k, value)
}
case float64:
queryParam.Add(k, strconv.FormatFloat(v, 'e', -1, 64))
queryParam.Add(k, strconv.FormatFloat(v, 'f', -1, 64))
default:
return nil, fmt.Errorf("invalid query param type: %T", v)
}
+22
View File
@@ -21,4 +21,26 @@ type IExecutor interface {
InvokeTrend(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) (*monitor.MonInvokeCountTrend, string, error)
ProxyTrend(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) (*monitor.MonInvokeCountTrend, string, error)
MessageTrend(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) (*monitor.MonMessageTrend, string, error)
IBasicOverview
IRestOverview
IAIOverview
}
type IBasicOverview interface {
RequestOverview(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.StatusCodeOverview, []*monitor.StatusCodeOverview, error)
TopN(ctx context.Context, start time.Time, end time.Time, limit int, groupBy string, wheres []monitor.MonWhereItem) ([]*monitor.TopN, error)
ConsumerOverview(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) (int64, map[time.Time]int64, error)
}
type IRestOverview interface {
TrafficOverviewByStatusCode(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.StatusCodeOverview, []*monitor.StatusCodeOverview, error)
AvgResponseTimeOverview(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.Aggregate, []int64, error)
SumResponseTimeOverview(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.Aggregate, []int64, error)
}
type IAIOverview interface {
TokenOverview(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.TokenOverview, []*monitor.TokenOverview, error)
}
+331 -6
View File
@@ -3,6 +3,7 @@ package influxdb_v2
import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
@@ -23,6 +24,9 @@ import (
"github.com/APIParkLab/APIPark/service/monitor"
)
var _ driver.IAIOverview = (*executor)(nil)
var _ driver.IRestOverview = (*executor)(nil)
func newExecutor(cfg string, fluxQuery flux.IFluxQuery) (driver.IExecutor, error) {
var data InfluxdbV2Config
err := json.Unmarshal([]byte(cfg), &data)
@@ -147,7 +151,7 @@ func (e *executor) MessageTrend(ctx context.Context, start time.Time, end time.T
fieldsConditions := []string{"request", "response"}
dates, groupValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "request", filters, fieldsConditions, every, windowOffset)
dates, groupValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "request", filters, fieldsConditions, every, windowOffset, flux.SumFn)
if err != nil {
return nil, "", err
}
@@ -166,9 +170,9 @@ func (e *executor) ProxyTrend(ctx context.Context, start time.Time, end time.Tim
filters := formatFilter(wheres)
proxyConditions := []string{"p_total", "p_success", "p_s4xx", "p_s5xx"}
proxyConditions := []string{"p_total", "p_success", "p_s2xx", "p_s4xx", "p_s5xx"}
dates, proxyValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "proxy", filters, proxyConditions, every, windowOffset)
dates, proxyValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "proxy", filters, proxyConditions, every, windowOffset, flux.SumFn)
if err != nil {
return nil, "", err
}
@@ -200,9 +204,9 @@ func (e *executor) InvokeTrend(ctx context.Context, start time.Time, end time.Ti
newStartTime, every, windowOffset, bucket := getTimeIntervalAndBucket(start, end)
filters := formatFilter(wheres)
requestConditions := []string{"total", "success", "s4xx", "s5xx"}
requestConditions := []string{"total", "success", "2xx", "s4xx", "s5xx"}
dates, requestValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "request", filters, requestConditions, every, windowOffset)
dates, requestValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "request", filters, requestConditions, every, windowOffset, flux.SumFn)
if err != nil {
return nil, "", err
}
@@ -221,7 +225,7 @@ func (e *executor) InvokeTrend(ctx context.Context, start time.Time, end time.Ti
proxyConditions := []string{"p_total", "p_success"}
_, proxyValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "proxy", filters, proxyConditions, every, windowOffset)
_, proxyValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "proxy", filters, proxyConditions, every, windowOffset, flux.SumFn)
if err != nil {
return nil, "", err
}
@@ -359,5 +363,326 @@ func (e *executor) CommonStatistics(ctx context.Context, start, end time.Time, g
}
return resultMap, nil
}
func (e *executor) overviewByStatusCode(ctx context.Context, start, end time.Time, table string, wheres []monitor.MonWhereItem, statusCode []string, dataFields []string, fn flux.AggregateFn) ([]time.Time, map[string][]int64, error) {
newStartTime, every, windowOffset, bucket := getTimeIntervalAndBucket(start, end)
var returnDates []time.Time
var returnResult = make(map[string][]int64)
for _, s := range statusCode {
newWheres := make([]monitor.MonWhereItem, 0, len(wheres)+1)
newWheres = append(newWheres, wheres...)
newWheres = append(newWheres, monitor.MonWhereItem{
Key: "status_code",
Operation: "=",
Values: []string{s},
})
dates, result, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, table, formatFilter(newWheres), dataFields, every, windowOffset, fn)
if err != nil {
return nil, nil, err
}
if len(dates) > 0 {
returnDates = dates
}
for _, v := range dataFields {
key := fmt.Sprintf("%s_%s", s, v)
if _, ok := returnResult[key]; !ok {
returnResult[key] = make([]int64, 0, len(returnDates))
}
returnResult[key] = append(returnResult[key], result[v]...)
}
}
return returnDates, returnResult, nil
}
func (e *executor) TrafficOverviewByStatusCode(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.StatusCodeOverview, []*monitor.StatusCodeOverview, error) {
fieldsConditions := []string{"request", "response"}
statusFilters := []string{"2xx", "4xx", "5xx"}
dates, overview, err := e.overviewByStatusCode(ctx, start, end, "request", wheres, statusFilters, fieldsConditions, flux.SumFn)
if err != nil {
return nil, nil, nil, err
}
s2xxRequest := overview["2xx_request"]
s2xxRequestLen := len(s2xxRequest)
s4xxRequest := overview["4xx_request"]
s4xxRequestLen := len(s4xxRequest)
s5xxRequest := overview["5xx_request"]
s5xxRequestLen := len(s5xxRequest)
s2xxResponse := overview["2xx_response"]
s2xxResponseLen := len(s2xxResponse)
s4xxResponse := overview["4xx_response"]
s4xxResponseLen := len(s4xxResponse)
s5xxResponse := overview["5xx_response"]
s5xxResponseLen := len(s5xxResponse)
totalOverview := new(monitor.StatusCodeOverview)
result := make([]*monitor.StatusCodeOverview, 0, len(dates))
for i := range dates {
r := new(monitor.StatusCodeOverview)
if s2xxRequestLen > i {
r.Status2xx = s2xxRequest[i]
}
if s4xxRequestLen > i {
r.Status4xx = s4xxRequest[i]
}
if s5xxRequestLen > i {
r.Status5xx = s5xxRequest[i]
}
if s2xxResponseLen > i {
r.Status2xx += s2xxResponse[i]
}
if s4xxResponseLen > i {
r.Status4xx += s4xxResponse[i]
}
if s5xxResponseLen > i {
r.Status5xx += s5xxResponse[i]
}
r.StatusTotal += r.Status2xx + r.Status4xx + r.Status5xx
totalOverview.Status2xx += r.Status2xx
totalOverview.Status4xx += r.Status4xx
totalOverview.Status5xx += r.Status5xx
totalOverview.StatusTotal += r.StatusTotal
result = append(result, r)
}
return dates, totalOverview, result, nil
}
func (e *executor) aggregateSummary(ctx context.Context, start time.Time, end time.Time, measurement string, bucket string, filters string, fields []string) (map[string]*monitor.Aggregate, error) {
if len(fields) == 0 {
return nil, fmt.Errorf("fields is empty")
}
maxFields := make([]string, 0, len(fields))
minFields := make([]string, 0, len(fields))
avgFields := make([]string, 0, len(fields))
for _, field := range fields {
maxFields = append(maxFields, field+"_max")
minFields = append(minFields, field+"_min")
avgFields = append(avgFields, field+"_avg")
}
maxRes, err := e.fluxQuery.CommonQueryOnce(ctx, e.openApi, start, end, bucket, filters, &flux.StatisticsFilterConf{
Measurement: measurement,
AggregateFn: "max()",
Fields: maxFields,
})
if err != nil {
return nil, err
}
minRes, err := e.fluxQuery.CommonQueryOnce(ctx, e.openApi, start, end, bucket, filters, &flux.StatisticsFilterConf{
Measurement: measurement,
AggregateFn: "min()",
Fields: minFields,
})
if err != nil {
return nil, err
}
avgRes, err := e.fluxQuery.CommonQueryOnce(ctx, e.openApi, start, end, bucket, filters, &flux.StatisticsFilterConf{
Measurement: measurement,
AggregateFn: "mean()",
Fields: avgFields,
})
if err != nil {
return nil, err
}
result := make(map[string]*monitor.Aggregate)
for _, field := range fields {
a := new(monitor.Aggregate)
if v, ok := avgRes[field+"_avg"]; ok {
a.Avg = int64(v.(float64))
}
if v, ok := maxRes[field+"_max"]; ok {
a.Max = v.(int64)
}
if v, ok := minRes[field+"_min"]; ok {
a.Min = v.(int64)
}
result[field] = a
}
return result, nil
}
func (e *executor) SumResponseTimeOverview(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.Aggregate, []int64, error) {
newStartTime, every, windowOffset, bucket := getTimeIntervalAndBucket(start, end)
filters := formatFilter(wheres)
fieldsConditions := []string{"timing"}
agg, err := e.aggregateSummary(ctx, newStartTime, end, "request", bucket, filters, []string{"timing"})
if err != nil {
return nil, nil, nil, err
}
dates, groupValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "request", filters, fieldsConditions, every, windowOffset, flux.SumFn)
if err != nil {
return nil, nil, nil, err
}
timing := groupValues["timing"]
timingLen := len(timing)
result := make([]int64, 0, len(dates))
for i := range dates {
if timingLen > i {
result = append(result, timing[i])
}
}
return dates, agg["timing"], result, nil
}
func (e *executor) AvgResponseTimeOverview(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.Aggregate, []int64, error) {
newStartTime, every, windowOffset, bucket := getTimeIntervalAndBucket(start, end)
filters := formatFilter(wheres)
fieldsConditions := []string{"timing_avg"}
agg, err := e.aggregateSummary(ctx, newStartTime, end, "request", bucket, filters, []string{"timing"})
if err != nil {
return nil, nil, nil, err
}
dates, groupValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "request", filters, fieldsConditions, every, windowOffset, flux.AvgFn)
if err != nil {
return nil, nil, nil, err
}
timingAvg := groupValues["timing_avg"]
timingAvgLen := len(timingAvg)
result := make([]int64, 0, len(dates))
for i := range dates {
if timingAvgLen > i {
result = append(result, timingAvg[i])
}
}
return dates, agg["timing"], result, nil
}
func (e *executor) RequestOverview(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.StatusCodeOverview, []*monitor.StatusCodeOverview, error) {
newStartTime, every, windowOffset, bucket := getTimeIntervalAndBucket(start, end)
filters := formatFilter(wheres)
requestConditions := []string{"total", "s2xx", "s4xx", "s5xx"}
dates, requestValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "request", filters, requestConditions, every, windowOffset, flux.SumFn)
if err != nil {
return nil, nil, nil, err
}
total := requestValues["total"]
totalLen := len(total)
s2xx := requestValues["s2xx"]
s2xxLen := len(s2xx)
s4xx := requestValues["s4xx"]
s4xxLen := len(s4xx)
s5xx := requestValues["s5xx"]
s5xxLen := len(s5xx)
totalOverview := new(monitor.StatusCodeOverview)
result := make([]*monitor.StatusCodeOverview, 0, len(dates))
for i := range dates {
r := new(monitor.StatusCodeOverview)
if totalLen > i {
r.StatusTotal = total[i]
totalOverview.StatusTotal += r.StatusTotal
}
if s2xxLen > i {
r.Status2xx = s2xx[i]
totalOverview.Status2xx += r.Status2xx
}
if s4xxLen > i {
r.Status4xx = s4xx[i]
totalOverview.Status4xx += r.Status4xx
}
if s5xxLen > i {
r.Status5xx = s5xx[i]
totalOverview.Status5xx += r.Status5xx
}
result = append(result, r)
}
return dates, totalOverview, result, nil
}
func (e *executor) TopN(ctx context.Context, start time.Time, end time.Time, limit int, groupBy string, wheres []monitor.MonWhereItem) ([]*monitor.TopN, error) {
filters := formatFilter(wheres)
newStartTime, _, _, bucket := getTimeIntervalAndBucket(start, end)
statisticsConf := []*flux.StatisticsFilterConf{
{
Measurement: "request",
AggregateFn: "sum()",
Fields: []string{"total", "request", "response", "input_token", "output_token"},
},
{
Measurement: "proxy",
AggregateFn: "sum()",
Fields: []string{"p_total"},
},
}
results, err := e.fluxQuery.CommonStatistics(ctx, e.openApi, newStartTime, end, bucket, groupBy, filters, statisticsConf, limit)
if err != nil {
return nil, err
}
topN := make([]*monitor.TopN, 0, len(results))
for key, result := range results {
n := new(monitor.TopN)
n.Key = key
n.Request = result.Total
n.Token = result.TotalToken
n.Traffic = result.TotalRequest + result.TotalResponse
topN = append(topN, n)
}
return topN, nil
}
func (e *executor) TokenOverview(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.TokenOverview, []*monitor.TokenOverview, error) {
newStartTime, every, windowOffset, bucket := getTimeIntervalAndBucket(start, end)
filters := formatFilter(wheres)
requestConditions := []string{"total_token", "input_token", "output_token"}
dates, requestValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "request", filters, requestConditions, every, windowOffset, flux.SumFn)
if err != nil {
return nil, nil, nil, err
}
//total := requestValues["total_token"]
//totalLen := len(total)
input := requestValues["input_token"]
inputLen := len(input)
output := requestValues["output_token"]
outputLen := len(output)
totalOverview := new(monitor.TokenOverview)
result := make([]*monitor.TokenOverview, 0, len(dates))
for i := range dates {
r := new(monitor.TokenOverview)
if inputLen > i {
r.InputToken = input[i]
}
if outputLen > i {
r.OutputToken = output[i]
}
r.TotalToken = r.InputToken + r.OutputToken
totalOverview.InputToken += r.InputToken
totalOverview.OutputToken += r.OutputToken
totalOverview.TotalToken += r.TotalToken
result = append(result, r)
}
return dates, totalOverview, result, nil
}
func (e *executor) ConsumerOverview(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) (int64, map[time.Time]int64, error) {
newStartTime, every, offset, bucket := getTimeIntervalAndBucket(start, end)
filters := formatFilter(wheres)
return e.fluxQuery.CommonTendencyTag(ctx, e.openApi, newStartTime, end, bucket, "request", filters, every, offset, "app")
}
+97 -22
View File
@@ -14,7 +14,8 @@ import (
type IFluxQuery interface {
CommonStatistics(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, groupBy, filters string, statisticsConf []*StatisticsFilterConf, limit int) (map[string]*FluxStatistics, error)
CommonProxyStatistics(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, groupBy, filters string, statisticsConf []*StatisticsFilterConf, limit int) (map[string]*FluxStatistics, error)
CommonTendency(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, table, filters string, dataFields []string, every, windowOffset string) ([]time.Time, map[string][]int64, error)
CommonTendency(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, table, filters string, dataFields []string, every, windowOffset string, fn AggregateFn) ([]time.Time, map[string][]int64, error)
CommonTendencyTag(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, table, filters, every, offset, tag string) (int64, map[time.Time]int64, error)
// CommonQueryOnce 查询只返回一条结果
CommonQueryOnce(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, filters string, fieldsConf *StatisticsFilterConf) (map[string]interface{}, error)
CommonWarnStatistics(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, groupBy, filters string, statisticsConf *StatisticsFilterConf) (map[string]*FluxWarnStatistics, error)
@@ -61,18 +62,30 @@ func (f *fluxQuery) CommonStatistics(ctx context.Context, queryApi api.QueryAPI,
totalRequest := common.FmtIntFromInterface(maps["request"])
maxRequest := common.FmtIntFromInterface(maps["request_max"])
minRequest := common.FmtIntFromInterface(maps["request_min"])
totalResponse := common.FmtIntFromInterface(maps["response"])
maxResponse := common.FmtIntFromInterface(maps["response_max"])
minResponse := common.FmtIntFromInterface(maps["response_min"])
inputToken := common.FmtIntFromInterface(maps["input_token"])
outputToken := common.FmtIntFromInterface(maps["output_token"])
//totalToken := common.FmtIntFromInterface(maps["total_token"])
//maxToken := common.FmtIntFromInterface(maps["total_token_max"])
//minToken := common.FmtIntFromInterface(maps["total_token_min"])
resultMap[key] = &FluxStatistics{
Total: total,
Success: success,
ProxyTotal: pTotal,
ProxySuccess: pSuccess,
TotalTiming: totalTiming,
MaxTiming: maxMinTiming,
MinTiming: minTiming,
TotalRequest: totalRequest,
RequestMax: maxRequest,
RequestMin: minRequest,
Total: total,
Success: success,
ProxyTotal: pTotal,
ProxySuccess: pSuccess,
TotalTiming: totalTiming,
MaxTiming: maxMinTiming,
MinTiming: minTiming,
TotalRequest: totalRequest,
RequestMax: maxRequest,
RequestMin: minRequest,
TotalResponse: totalResponse,
ResponseMax: maxResponse,
ResponseMin: minResponse,
TotalToken: inputToken + outputToken,
}
}
@@ -128,10 +141,10 @@ func (f *fluxQuery) CommonProxyStatistics(ctx context.Context, queryApi api.Quer
return resultMap, nil
}
func (f *fluxQuery) CommonTendency(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, table, filters string, dataFields []string, every, windowOffset string) ([]time.Time, map[string][]int64, error) {
func (f *fluxQuery) CommonTendency(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, table, filters string, dataFields []string, every, windowOffset string, fn AggregateFn) ([]time.Time, map[string][]int64, error) {
fieldConditions := f.assembleTendencyFieldCondition(dataFields)
//拼装请求
query := f.assembleTendencyFlux(start, end, bucket, table, filters, fieldConditions, every, windowOffset)
query := f.assembleTendencyFlux(start, end, bucket, table, filters, fieldConditions, every, windowOffset, fn)
log.Info("flux sql=", query)
result, err := queryApi.Query(ctx, query)
@@ -148,21 +161,46 @@ func (f *fluxQuery) CommonTendency(ctx context.Context, queryApi api.QueryAPI, s
//初始返回内容
dates := make([]time.Time, 0, len(resultList))
resultMap := make(map[string][]int64, len(dataFields))
for _, field := range dataFields {
resultMap[field] = make([]int64, 0, len(resultList))
}
for _, res := range resultList {
for _, field := range dataFields {
resultMap[field] = append(resultMap[field], common.FmtIntFromInterface(res[field]))
}
t, _ := res["_time"].(time.Time)
dates = append(dates, t)
dates = append(dates, t.In(time.Local))
}
return dates, resultMap, nil
}
func (f *fluxQuery) CommonTendencyTag(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, table, filters, every, offset, tag string) (int64, map[time.Time]int64, error) {
query := f.assembleTendencyTagFlux(start, end, bucket, table, filters, every, offset, tag)
log.Info("flux sql=", query)
result, err := queryApi.Query(ctx, query)
if err != nil {
log.Error("flux err=", err)
return 0, nil, err
}
dateMap := map[time.Time]map[string]struct{}{}
tagMap := make(map[string]struct{})
defer result.Close()
for result.Next() {
date := result.Record().Values()["_start"].(time.Time).In(time.Local)
if _, ok := dateMap[date]; !ok {
dateMap[date] = map[string]struct{}{}
}
if vv, ok := result.Record().Values()[tag]; ok {
v := vv.(string)
tagMap[v] = struct{}{}
dateMap[date][v] = struct{}{}
}
}
returnMap := make(map[time.Time]int64)
for k, v := range dateMap {
returnMap[k] = int64(len(v))
}
return int64(len(tagMap)), returnMap, nil
}
func (f *fluxQuery) CommonQueryOnce(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, filters string, fieldsConf *StatisticsFilterConf) (map[string]interface{}, error) {
query := f.getCircularMapFlux(start, end, bucket, filters, fieldsConf)
@@ -172,6 +210,7 @@ func (f *fluxQuery) CommonQueryOnce(ctx context.Context, queryApi api.QueryAPI,
log.Error("flux err=", err)
return nil, err
}
defer result.Close()
for result.Next() {
return result.Record().Values(), nil
@@ -270,7 +309,7 @@ from(bucket: "%s")
}
return fmt.Sprintf(`
union(tables: [
union(tables: [
%s
])
|> pivot(rowKey: ["%s"], columnKey: ["_field"], valueColumn: "_value")
@@ -278,23 +317,59 @@ union(tables: [
`, strings.Join(streams, ",\n"), groupBy, limitStr)
}
func (f *fluxQuery) assembleTendencyFlux(start, end time.Time, bucket, table, filters, fieldConditions, every string, windowOffset string) string {
type AggregateFn string
const (
SumFn AggregateFn = "sum"
MaxFn AggregateFn = "max"
MinFn AggregateFn = "min"
AvgFn AggregateFn = "mean"
)
var (
fns = map[AggregateFn]struct{}{
SumFn: {},
MaxFn: {},
MinFn: {},
}
)
func (f *fluxQuery) assembleTendencyFlux(start, end time.Time, bucket, table, filters, fieldConditions, every, windowOffset string, fn AggregateFn) string {
windowOffsetFlux := ""
if windowOffset != "" {
windowOffsetFlux = fmt.Sprintf(", offset: %s", windowOffset)
}
if _, ok := fns[fn]; !ok {
fn = SumFn
}
return fmt.Sprintf(`from(bucket: "%s")
|> range(start: %d, stop: %d)
|> filter(fn: (r) => r["_measurement"] == "%s")
%s
%s
|> group(columns: ["_field"])
|> aggregateWindow(every: %s, fn: sum, location: {offset: 0ns, zone: "Asia/Shanghai"}, timeSrc: "_start"%s)
|> aggregateWindow(every: %s, fn: %s, location: {offset: 0ns, zone: "Asia/Shanghai"}, timeSrc: "_start"%s)
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")`, bucket, start.Unix(), end.Unix(), table,
filters, fieldConditions, every, windowOffsetFlux)
filters, fieldConditions, every, string(fn), windowOffsetFlux)
}
func (f *fluxQuery) assembleTendencyTagFlux(start, end time.Time, bucket, table, filters string, every, offset, tag string) string {
windowOffset := ""
if len(offset) > 0 {
windowOffset = fmt.Sprintf(", offset: %s", offset)
}
return fmt.Sprintf(`
from(bucket: "%s")
|> range(start: %d, stop: %d)
|> filter(fn: (r) => r["_measurement"] == "%s")
%s
|> keep(columns: ["_time", "%s"])
|> window(every: %s%s)
|> distinct(column: "%s")`, bucket, start.Unix(), end.Unix(), table, filters, tag, every, windowOffset, tag)
}
// assembleTendencyFieldCondition 封装趋势图需要的Field数据
func (f *fluxQuery) assembleTendencyFieldCondition(fieldConditions []string) string {
/*
@@ -2,22 +2,30 @@ package flux
// FluxStatistics flux统计通用字段
type FluxStatistics struct {
Total int64 `json:"total"` //总数
Success int64 `json:"success"` //成功数
ProxyTotal int64 `json:"p_total"` //转发总数
ProxySuccess int64 `json:"p_success"` //转发成功
TotalTiming int64 `json:"timing"` //平均响应时间
MaxTiming int64 `json:"timing_max"` //最大响应时间
MinTiming int64 `json:"timing_min"` //最响应时间
TotalRequest int64 `json:"request"` //总请求流量
RequestMax int64 `json:"request_max"` //最大流量
RequestMin int64 `json:"request_min"` //最流量
Total int64 `json:"total"` //总数
Success int64 `json:"success"` //成功数
S2xx int64 `json:"s2xx"` //2xx
ProxyTotal int64 `json:"p_total"` //转发
ProxySuccess int64 `json:"p_success"` //转发成功数
TotalTiming int64 `json:"timing"` //平均响应时间
MaxTiming int64 `json:"timing_max"` //最响应时间
MinTiming int64 `json:"timing_min"` //最小响应时间
TotalRequest int64 `json:"request"` //总请求流量
RequestMax int64 `json:"request_max"` //最流量
RequestMin int64 `json:"request_min"` //最小流量
TotalResponse int64 `json:"response"` //总请求流量
ResponseMax int64 `json:"response_max"` //最大流量
ResponseMin int64 `json:"response_min"` //最小流量
TotalToken int64 `json:"total_token"` //总token流量
TokenMax int64 `json:"total_token_max"` //最大token流量
TokenMin int64 `json:"total_token_min"` //最小token流量
}
// FluxWarnStatistics flux统计告警通用字段
type FluxWarnStatistics struct {
Total int64 `json:"total"` //总数
Success int64 `json:"success"` //成功数
S2xx int64 `json:"s2xx"`
S4xx int64 `json:"s4xx"`
S5xx int64 `json:"s5xx"`
ProxyTotal int64 `json:"p_total"` //转发总数
@@ -0,0 +1,309 @@
-
task_name: "apinto_day_request_v1"
cron: "0 0 * * *"
offset: "2m30s"
flux: |
from(bucket: "apinto/hour")
|> range(start: -1d)
|> filter(fn: (r) => r._measurement == "request")
|> filter(
fn: (r) =>
r._field == "total" or r._field == "success" or r._field == "s2xx" or r._field == "s4xx" or r._field == "s5xx"
or
r._field == "timing" or r._field == "request" or r._field == "response" or r._field
==
"retry"
or r._field == "total_token" or r._field == "input_token" or r._field == "output_token",
)
|> group(
columns: [
"api",
"app",
"upstream",
"method",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
"_field",
],
)
|> sum()
|> set(key: "_measurement", value: "request")
|> to(
bucket: "apinto/day",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
],
)
from(bucket: "apinto/hour")
|> range(start: -1d)
|> filter(fn: (r) => r._measurement == "request")
|> filter(
fn: (r) =>
r._field == "timing_max" or r._field == "request_max" or r._field == "response_max"
or
r._field == "retry_max"
or
r._field == "input_token_max" or r._field == "output_token_max" or r._field == "total_token_max",
)
|> group(
columns: [
"api",
"app",
"upstream",
"method",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
"_field",
],
)
|> max()
|> set(key: "_measurement", value: "request")
|> to(
bucket: "apinto/day",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
],
)
from(bucket: "apinto/hour")
|> range(start: -1d)
|> filter(fn: (r) => r._measurement == "request")
|> filter(
fn: (r) =>
r._field == "timing_min" or r._field == "request_min" or r._field == "response_min"
or
r._field == "retry_min"
or
r._field == "input_token_min" or r._field == "output_token_min" or r._field == "total_token_min",
)
|> group(
columns: [
"api",
"app",
"upstream",
"method",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
"_field",
],
)
|> min()
|> set(key: "_measurement", value: "request")
|> to(
bucket: "apinto/day",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
],
)
from(bucket: "apinto/hour")
|> range(start: -1d)
|> filter(fn: (r) => r._measurement == "request")
|> filter(
fn: (r) =>
r._field == "timing_avg" or r._field == "request_avg" or r._field == "response_avg"
or
r._field == "input_token_avg" or r._field == "output_token_avg" or r._field == "total_token_avg",
)
|> group(
columns: [
"api",
"app",
"upstream",
"method",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
"_field",
],
)
|> mean()
|> set(key: "_measurement", value: "request")
|> to(
bucket: "apinto/day",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
],
)
-
task_name: "apinto_day_proxy_v1"
cron: "0 0 * * *"
offset: "2m45s"
flux: |
from(bucket: "apinto/hour")
|> range(start: -1d)
|> filter(fn: (r) => r._measurement == "proxy")
|> filter(
fn: (r) =>
r._field == "p_total" or r._field == "p_success" or r._field == "p_s2xx" or r._field == "p_s4xx" or r._field
==
"p_s5xx" or r._field == "p_timing" or r._field == "p_request" or r._field
==
"p_response" or r._field == "p_retry",
)
|> group(
columns: [
"api",
"app",
"upstream",
"addr",
"method",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
"_field",
],
)
|> sum()
|> set(key: "_measurement", value: "proxy")
|> to(
bucket: "apinto/day",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"addr",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
],
)
from(bucket: "apinto/hour")
|> range(start: -1d)
|> filter(fn: (r) => r._measurement == "proxy")
|> filter(
fn: (r) =>
r._field == "p_timing_max" or r._field == "p_request_max" or r._field
==
"p_response_max" or r._field == "p_retry_max",
)
|> group(
columns: [
"api",
"app",
"upstream",
"addr",
"method",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
"_field",
],
)
|> max()
|> set(key: "_measurement", value: "proxy")
|> to(
bucket: "apinto/day",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"addr",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
],
)
from(bucket: "apinto/hour")
|> range(start: -1d)
|> filter(fn: (r) => r._measurement == "proxy")
|> filter(
fn: (r) =>
r._field == "p_timing_min" or r._field == "p_request_min" or r._field
==
"p_response_min" or r._field == "p_retry_min",
)
|> group(
columns: [
"api",
"app",
"upstream",
"addr",
"method",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
"_field",
],
)
|> min()
|> set(key: "_measurement", value: "proxy")
|> to(
bucket: "apinto/day",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"addr",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
],
)
@@ -0,0 +1,309 @@
-
task_name: "apinto_hour_request_v1"
cron: "0 * * * *"
offset: "1m30s"
flux: |
from(bucket: "apinto/minute")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "request")
|> filter(
fn: (r) =>
r._field == "total" or r._field == "success" or r._field == "s2xx" or r._field == "s4xx" or r._field == "s5xx"
or
r._field == "timing" or r._field == "request" or r._field == "response" or r._field
==
"retry"
or r._field == "total_token" or r._field == "input_token" or r._field == "output_token",
)
|> group(
columns: [
"api",
"app",
"upstream",
"method",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
"_field",
"_measurement",
],
)
|> sum()
|> to(
bucket: "apinto/hour",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
],
)
from(bucket: "apinto/minute")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "request")
|> filter(
fn: (r) =>
r._field == "timing_max" or r._field == "request_max" or r._field == "response_max"
or
r._field == "retry_max"
or
r._field == "input_token_max" or r._field == "output_token_max" or r._field == "total_token_max",
)
|> group(
columns: [
"api",
"app",
"upstream",
"method",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
"_field",
"_measurement",
],
)
|> max()
|> to(
bucket: "apinto/hour",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
],
)
from(bucket: "apinto/minute")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "request")
|> filter(
fn: (r) =>
r._field == "timing_min" or r._field == "request_min" or r._field == "response_min"
or
r._field == "retry_min"
or
r._field == "input_token_min" or r._field == "output_token_min" or r._field == "total_token_min",
)
|> group(
columns: [
"api",
"app",
"upstream",
"method",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
"_field",
"_measurement",
],
)
|> min()
|> to(
bucket: "apinto/hour",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
],
)
from(bucket: "apinto/minute")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "request")
|> filter(
fn: (r) =>
r._field == "timing_avg" or r._field == "request_avg" or r._field == "response_avg"
or
r._field == "input_token_avg" or r._field == "output_token_avg" or r._field == "total_token_avg",
)
|> group(
columns: [
"api",
"app",
"upstream",
"method",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
"_field",
"_measurement",
],
)
|> mean()
|> to(
bucket: "apinto/hour",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
],
)
-
task_name: "apinto_hour_proxy_v1"
cron: "0 * * * *"
offset: "1m45s"
flux: |
from(bucket: "apinto/minute")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "proxy")
|> filter(
fn: (r) =>
r._field == "p_total" or r._field == "p_success" or r._field == "p_s2xx" or r._field == "p_s4xx" or r._field
==
"p_s5xx" or r._field == "p_timing" or r._field == "p_request" or r._field
==
"p_response" or r._field == "p_retry",
)
|> group(
columns: [
"api",
"app",
"upstream",
"addr",
"method",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
"_field",
"_measurement",
],
)
|> sum()
|> to(
bucket: "apinto/hour",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"addr",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
],
)
from(bucket: "apinto/minute")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "proxy")
|> filter(
fn: (r) =>
r._field == "p_timing_max" or r._field == "p_request_max" or r._field
==
"p_response_max" or r._field == "p_retry_max",
)
|> group(
columns: [
"api",
"app",
"upstream",
"addr",
"method",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
"_field",
"_measurement",
],
)
|> max()
|> to(
bucket: "apinto/hour",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"addr",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
],
)
from(bucket: "apinto/minute")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "proxy")
|> filter(
fn: (r) =>
r._field == "p_timing_min" or r._field == "p_request_min" or r._field
==
"p_response_min" or r._field == "p_retry_min",
)
|> group(
columns: [
"api",
"app",
"upstream",
"addr",
"method",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
"_field",
"_measurement",
],
)
|> min()
|> to(
bucket: "apinto/hour",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"addr",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
],
)
@@ -0,0 +1,307 @@
-
task_name: "apinto_week_request_v1"
cron: "0 0 * * 1"
offset: "3m30s"
flux: |
from(bucket: "apinto/day")
|> range(start: -1w)
|> filter(fn: (r) => r._measurement == "request")
|> filter(
fn: (r) =>
r._field == "total" or r._field == "success" or r._field == "s2xx" or r._field == "s4xx" or r._field == "s5xx"
or
r._field == "timing" or r._field == "request" or r._field == "response" or r._field
==
"retry"
or r._field == "total_token" or r._field == "input_token" or r._field == "output_token",
)
|> group(
columns: [
"api",
"app",
"upstream",
"method",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
"_field",
],
)
|> sum()
|> set(key: "_measurement", value: "request")
|> to(
bucket: "apinto/week",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
],
)
from(bucket: "apinto/day")
|> range(start: -1w)
|> filter(fn: (r) => r._measurement == "request")
|> filter(
fn: (r) =>
r._field == "timing_max" or r._field == "request_max" or r._field == "response_max"
or
r._field == "retry_max"
or
r._field == "input_token_max" or r._field == "output_token_max" or r._field == "total_token_max",
)
|> group(
columns: [
"api",
"app",
"upstream",
"method",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
"_field",
],
)
|> max()
|> set(key: "_measurement", value: "request")
|> to(
bucket: "apinto/week",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
],
)
from(bucket: "apinto/day")
|> range(start: -1w)
|> filter(fn: (r) => r._measurement == "request")
|> filter(
fn: (r) =>
r._field == "timing_min" or r._field == "request_min" or r._field == "response_min"
or
r._field == "retry_min"
or
r._field == "input_token_min" or r._field == "output_token_min" or r._field == "total_token_min",
)
|> group(
columns: [
"api",
"app",
"upstream",
"method",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
"_field",
],
)
|> min()
|> set(key: "_measurement", value: "request")
|> to(
bucket: "apinto/week",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
],
)
from(bucket: "apinto/day")
|> range(start: -1w)
|> filter(fn: (r) => r._measurement == "request")
|> filter(
fn: (r) =>
r._field == "timing_avg" or r._field == "request_avg" or r._field == "response_avg"
or
r._field == "input_token_avg" or r._field == "output_token_avg" or r._field == "total_token_avg",
)
|> group(
columns: [
"api",
"app",
"upstream",
"method",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
"_field",
],
)
|> mean()
|> set(key: "_measurement", value: "request")
|> to(
bucket: "apinto/week",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
],
)
-
task_name: "apinto_week_proxy_v1"
cron: "0 0 * * 1"
offset: "3m45s"
flux: |
from(bucket: "apinto/day")
|> range(start: -1w)
|> filter(fn: (r) => r._measurement == "proxy")
|> filter(
fn: (r) =>
r._field == "p_total" or r._field == "p_success" or r._field == "p_s2xx" or r._field == "p_s4xx" or r._field
==
"p_s5xx" or r._field == "p_timing" or r._field == "p_request" or r._field
==
"p_response" or r._field == "p_retry",
)
|> group(
columns: [
"api",
"app",
"upstream",
"addr",
"method",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
"_field",
],
)
|> sum()
|> set(key: "_measurement", value: "proxy")
|> to(
bucket: "apinto/week",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"addr",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
],
)
from(bucket: "apinto/day")
|> range(start: -1w)
|> filter(fn: (r) => r._measurement == "proxy")
|> filter(
fn: (r) =>
r._field == "p_timing_max" or r._field == "p_request_max" or r._field
==
"p_response_max" or r._field == "p_retry_max",
)
|> group(
columns: [
"api",
"app",
"upstream",
"addr",
"method",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
"_field",
],
)
|> max()
|> set(key: "_measurement", value: "proxy")
|> to(
bucket: "apinto/week",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"addr",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
],
)
from(bucket: "apinto/day")
|> range(start: -1w)
|> filter(fn: (r) => r._measurement == "proxy")
|> filter(
fn: (r) =>
r._field == "p_timing_min" or r._field == "p_request_min" or r._field
==
"p_response_min" or r._field == "p_retry_min",
)
|> group(
columns: [
"api",
"app",
"upstream",
"addr",
"method",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
"_field",
],
)
|> min()
|> set(key: "_measurement", value: "proxy")
|> to(
bucket: "apinto/week",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"addr",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
],
)
@@ -1,13 +1,18 @@
package flux
import (
"embed"
_ "embed"
"fmt"
"strings"
"gopkg.in/yaml.v3"
"github.com/eolinker/eosc/log"
yaml "gopkg.in/yaml.v3"
)
//go:embed influxdb_config/tasks.yaml
var tasksData []byte
//go:embed tasks/*.yaml
var taskReader embed.FS
var (
taskList []*TaskConf
@@ -22,9 +27,28 @@ type TaskConf struct {
func initTasksConfig() {
conf := make([]*TaskConf, 0, 15)
err := yaml.Unmarshal(tasksData, &conf)
files, err := taskReader.ReadDir("tasks")
if err != nil {
panic(err)
panic(fmt.Sprintf("read tasks dir error: %v", err))
}
for _, file := range files {
if file.IsDir() || !strings.HasSuffix(file.Name(), ".yaml") {
continue
}
name := fmt.Sprintf("tasks/%s", file.Name())
data, err := taskReader.ReadFile(name)
if err != nil {
log.Errorf("read file(%s) error: %v", name, err)
continue
}
tmp := make([]*TaskConf, 0, 15)
err = yaml.Unmarshal(data, &tmp)
if err != nil {
log.Errorf("unmarshal file(%s) error: %v", name, err)
continue
}
conf = append(conf, tmp...)
}
taskList = conf
}
+14 -6
View File
@@ -15,9 +15,9 @@ const (
tenDay = 10 * oneDay
oneYear = 365 * oneDay
bucketMinuteRetention = (7 - 1) * oneDay
bucketHourRetention = (90 - 1) * oneDay
bucketDayRetention = (5*365 - 1) * oneDay
bucketMinuteRetention = (7) * oneDay
bucketHourRetention = (90) * oneDay
bucketDayRetention = (5 * 365) * oneDay
bucketMinute = "apinto/minute"
bucketHour = "apinto/hour"
@@ -127,11 +127,11 @@ func getTimeIntervalAndBucket(startTime, endTime time.Time) (time.Time, string,
switch minimumBucket {
case bucketMinute:
offset := ""
offsetTime := startTime.Minute() % 5
offsetTime := startTime.Minute() % 10
if offsetTime != 0 {
offset = fmt.Sprintf("%dm", offsetTime)
}
return startTime, "5m", offset, bucketMinute
return startTime, "10m", offset, bucketMinute
case bucketHour:
newStart := formatStartTimeHour(startTime, location)
@@ -148,7 +148,15 @@ func getTimeIntervalAndBucket(startTime, endTime time.Time) (time.Time, string,
} else if diff <= tenDay {
switch minimumBucket {
case bucketMinute, bucketHour:
case bucketMinute:
offset := ""
offsetTime := startTime.Minute()
if offsetTime != 0 {
offset = fmt.Sprintf("%dm", offsetTime)
}
return startTime, "1h", offset, bucketMinute
case bucketHour:
newStart := formatStartTimeHour(startTime, location)
return newStart, "1h", "", bucketHour
case bucketDay:
+98
View File
@@ -138,3 +138,101 @@ type MonitorCluster struct {
Name string `json:"name"`
Enable bool `json:"enable"`
}
type ChartOverview struct {
}
type StatusCodeOverview struct {
Status2xx int64 `json:"2xx"` //状态码2xx数
Status4xx int64 `json:"4xx"`
Status5xx int64 `json:"5xx"` //状态码5xx数
}
type TokenOverview struct {
TotalToken int64 `json:"total_token"` //总token流量
OutputToken int64 `json:"output_token"`
InputToken int64 `json:"input_token"` //最小token流量
}
type TokenFloatOverview struct {
TotalToken float64 `json:"total_token"` //总token流量
OutputToken float64 `json:"output_token"`
InputToken float64 `json:"input_token"` //最小token流量
}
type ChartAIOverview struct {
RequestOverview []*StatusCodeOverview `json:"request_overview"`
AvgRequestPerSubscriberOverview []float64 `json:"avg_request_per_subscriber_overview"` //平均响应时间概况
MaxRequestPerSubscriber float64 `json:"max_request_per_subscriber"`
MinRequestPerSubscriber float64 `json:"min_request_per_subscriber"`
RequestTotal int64 `json:"request_total"`
Request2xxTotal int64 `json:"request_2xx_total"`
Request4xxTotal int64 `json:"request_4xx_total"`
Request5xxTotal int64 `json:"request_5xx_total"`
TokenTotal int64 `json:"token_total"` //总token流量
InputTokenTotal int64 `json:"input_token_total"`
OutputTokenTotal int64 `json:"output_token_total"` //最大token流量
TokenOverview []*TokenOverview `json:"token_overview"` //token概况
AvgTokenOverview []float64 `json:"avg_token_overview"`
AvgTokenPerSubscriberOverview []*TokenFloatOverview `json:"avg_token_per_subscriber_overview"`
AvgToken float64 `json:"avg_token"`
MaxToken float64 `json:"max_token"`
MinToken float64 `json:"min_token"`
Date []string `json:"date"`
MaxTokenPerSubscriber float64 `json:"max_token_per_subscriber"`
MinTokenPerSubscriber float64 `json:"min_token_per_subscriber"`
}
type ChartRestOverview struct {
RequestOverview []*StatusCodeOverview `json:"request_overview"` //请求概况
AvgRequestPerSubscriberOverview []float64 `json:"avg_request_per_subscriber_overview"` //平均响应时间概况
MaxRequestPerSubscriber float64 `json:"max_request_per_subscriber"`
MinRequestPerSubscriber float64 `json:"min_request_per_subscriber"`
RequestTotal int64 `json:"request_total"`
Request2xxTotal int64 `json:"request_2xx_total"`
Request4xxTotal int64 `json:"request_4xx_total"`
Request5xxTotal int64 `json:"request_5xx_total"`
TrafficOverview []*StatusCodeOverview `json:"traffic_overview"` //流量概况
Traffic2xxTotal int64 `json:"traffic_2xx_total"`
Traffic4xxTotal int64 `json:"traffic_4xx_total"` //流量概况
Traffic5xxTotal int64 `json:"traffic_5xx_total"` //流量概况
AvgResponseTimeOverview []int64 `json:"avg_response_time_overview"` //平均响应时间概况
AvgTrafficPerSubscriberOverview []float64 `json:"avg_traffic_per_subscriber_overview"`
TrafficTotal int64 `json:"traffic_total"`
AvgResponseTime int64 `json:"avg_response_time"` //平均响应时间
MaxResponseTime int64 `json:"max_response_time"` //最大响应时间
MinResponseTime int64 `json:"min_response_time"` //最小响应时间
Date []string `json:"date"`
MaxTrafficPerSubscriber float64 `json:"max_traffic_per_subscriber"`
MinTrafficPerSubscriber float64 `json:"min_traffic_per_subscriber"`
}
type ServiceChartRestOverview struct {
EnableMCP bool `json:"enable_mcp"`
SubscriberNum int64 `json:"subscriber_num"`
APINum int64 `json:"api_num"`
ServiceKind string `json:"service_kind"`
AvailableMonitor bool `json:"available_monitor"`
*ChartRestOverview
}
type ServiceChartAIOverview struct {
EnableMCP bool `json:"enable_mcp"`
SubscriberNum int64 `json:"subscriber_num"`
APINum int64 `json:"api_num"`
ServiceKind string `json:"service_kind"`
AvailableMonitor bool `json:"available_monitor"`
*ChartAIOverview
}
type TopN struct {
Id string `json:"id"`
Name string `json:"name"`
Request string `json:"request"`
Traffic string `json:"traffic,omitempty"`
Token string `json:"token,omitempty"`
}
+471 -17
View File
@@ -5,9 +5,13 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"sort"
"sync"
"time"
"github.com/APIParkLab/APIPark/common"
"github.com/APIParkLab/APIPark/gateway"
"github.com/eolinker/eosc/log"
"github.com/eolinker/go-common/auto"
@@ -43,6 +47,439 @@ type imlMonitorStatisticModule struct {
apiService api.IAPIService `autowired:""`
}
func (i *imlMonitorStatisticModule) genOverviewWhere(ctx context.Context, serviceId string, apiKind []string) ([]monitor.MonWhereItem, error) {
clusterId := cluster.DefaultClusterID
_, err := i.clusterService.Get(ctx, clusterId)
if err != nil {
return nil, err
}
wheres, err := i.genCommonWheres(ctx, clusterId)
if err != nil {
return nil, err
}
if serviceId != "" {
wheres = append(wheres, monitor.MonWhereItem{
Key: "provider",
Operation: "=",
Values: []string{serviceId},
})
}
if len(apiKind) > 0 {
wheres = append(wheres, monitor.MonWhereItem{
Key: "api_kind",
Operation: "in",
Values: apiKind,
})
}
return wheres, nil
}
func (i *imlMonitorStatisticModule) AIChartOverview(ctx context.Context, serviceId string, start int64, end int64) (*monitor_dto.ChartAIOverview, error) {
wheres, err := i.genOverviewWhere(ctx, serviceId, []string{"ai"})
if err != nil {
return nil, err
}
executor, err := i.getExecutor(ctx, cluster.DefaultClusterID)
if err != nil {
return nil, err
}
_, consumerMap, err := executor.ConsumerOverview(ctx, formatTimeByMinute(start), formatTimeByMinute(end), wheres)
if err != nil {
return nil, err
}
var wg sync.WaitGroup
wg.Add(3)
errChan := make(chan error, 3)
result := new(monitor_dto.ChartAIOverview)
go func() {
defer wg.Done()
date, summary, items, err := executor.RequestOverview(ctx, formatTimeByMinute(start), formatTimeByMinute(end), wheres)
if err != nil {
errChan <- err
return
}
result.Date = utils.SliceToSlice(date, func(t time.Time) string {
return t.Format("2006/01/02 15:04")
})
result.AvgRequestPerSubscriberOverview = make([]float64, 0, len(items))
result.RequestOverview = make([]*monitor_dto.StatusCodeOverview, 0, len(items))
for index, item := range items {
consumerNum := consumerMap[date[index]]
avgRequestPerSubscriber := 0.0
if consumerNum != 0 {
avgRequestPerSubscriber = float64(item.StatusTotal) / float64(consumerNum)
if avgRequestPerSubscriber > result.MaxRequestPerSubscriber {
result.MaxRequestPerSubscriber = avgRequestPerSubscriber
}
if result.MinRequestPerSubscriber == 0 || result.MinRequestPerSubscriber > avgRequestPerSubscriber {
result.MinRequestPerSubscriber = avgRequestPerSubscriber
}
}
result.AvgRequestPerSubscriberOverview = append(result.AvgRequestPerSubscriberOverview, avgRequestPerSubscriber)
result.RequestOverview = append(result.RequestOverview, &monitor_dto.StatusCodeOverview{
Status2xx: item.Status2xx,
Status4xx: item.Status4xx,
Status5xx: item.Status5xx,
})
}
result.RequestTotal = summary.StatusTotal
result.Request2xxTotal = summary.Status2xx
result.Request4xxTotal = summary.Status4xx
result.Request5xxTotal = summary.Status5xx
}()
sumResponseTimes := make([]int64, 0)
go func() {
defer wg.Done()
_, _, items, err := executor.SumResponseTimeOverview(ctx, formatTimeByMinute(start), formatTimeByMinute(end), wheres)
if err != nil {
errChan <- err
return
}
for _, item := range items {
sumResponseTimes = append(sumResponseTimes, item)
}
}()
totalTokens := make([]int64, 0)
go func() {
defer wg.Done()
startTime := formatTimeByMinute(start)
endTime := formatTimeByMinute(end)
date, summary, items, err := executor.TokenOverview(ctx, startTime, endTime, wheres)
if err != nil {
errChan <- err
return
}
result.TokenOverview = make([]*monitor_dto.TokenOverview, 0, len(items))
result.AvgTokenOverview = make([]float64, 0, len(items))
result.AvgTokenPerSubscriberOverview = make([]*monitor_dto.TokenFloatOverview, 0, len(items))
var maxToken, minToken int64 = 0, 0
for index, item := range items {
if maxToken < item.TotalToken {
maxToken = item.TotalToken
}
if minToken == 0 || minToken > item.TotalToken {
minToken = item.TotalToken
}
result.TokenOverview = append(result.TokenOverview, &monitor_dto.TokenOverview{
TotalToken: item.TotalToken,
OutputToken: item.OutputToken,
InputToken: item.InputToken,
})
totalTokens = append(totalTokens, item.TotalToken)
consumerNum := consumerMap[date[index]]
avgTotalPerSubscriber := 0.0
avgOutputPerSubscriber := 0.0
avgInputPerSubscriber := 0.0
if consumerNum != 0 {
avgTotalPerSubscriber = float64(item.TotalToken) / float64(consumerNum)
avgOutputPerSubscriber = float64(item.OutputToken) / float64(consumerNum)
avgInputPerSubscriber = float64(item.InputToken) / float64(consumerNum)
if avgTotalPerSubscriber > result.MaxTokenPerSubscriber {
result.MaxTokenPerSubscriber = avgTotalPerSubscriber
}
if result.MinTokenPerSubscriber == 0 || result.MinTokenPerSubscriber > avgTotalPerSubscriber {
result.MinTokenPerSubscriber = avgTotalPerSubscriber
}
}
result.AvgTokenPerSubscriberOverview = append(result.AvgTokenPerSubscriberOverview, &monitor_dto.TokenFloatOverview{
TotalToken: avgTotalPerSubscriber,
OutputToken: avgOutputPerSubscriber,
InputToken: avgInputPerSubscriber,
})
}
result.TokenTotal = summary.TotalToken
result.InputTokenTotal = summary.InputToken
result.OutputTokenTotal = summary.OutputToken
}()
go func() {
wg.Wait()
close(errChan)
}()
errs := make([]error, 0, 3)
// 收集错误
for err := range errChan {
errs = append(errs, err)
}
if len(errs) > 0 {
return nil, fmt.Errorf("errors occurred: %v", errs)
}
sumResponseTime := 0.0
var maxTokenPerSecond, minTokenPerSecond float64 = 0, 0
for index, token := range totalTokens {
var p float64 = 0
if len(sumResponseTimes) > index && sumResponseTimes[index] > 0 {
p = math.Round(float64(token)*1000*100/float64(sumResponseTimes[index])) / 100
sumResponseTime += float64(sumResponseTimes[index])
}
result.AvgTokenOverview = append(result.AvgTokenOverview, p)
if maxTokenPerSecond < p {
maxTokenPerSecond = p
}
if p > 0 && (minTokenPerSecond == 0 || minTokenPerSecond > p) {
minTokenPerSecond = p
}
}
if sumResponseTime > 0 {
result.AvgToken = math.Round(float64(result.TokenTotal)*1000*100/sumResponseTime) / 100
}
result.MaxToken = maxTokenPerSecond
result.MinToken = minTokenPerSecond
return result, nil
}
func (i *imlMonitorStatisticModule) RestChartOverview(ctx context.Context, serviceId string, start int64, end int64) (*monitor_dto.ChartRestOverview, error) {
wheres, err := i.genOverviewWhere(ctx, serviceId, []string{"rest"})
if err != nil {
return nil, err
}
executor, err := i.getExecutor(ctx, cluster.DefaultClusterID)
if err != nil {
return nil, err
}
_, consumerMap, err := executor.ConsumerOverview(ctx, formatTimeByMinute(start), formatTimeByMinute(end), wheres)
if err != nil {
return nil, err
}
var wg sync.WaitGroup
wg.Add(3)
errChan := make(chan error, 2)
result := new(monitor_dto.ChartRestOverview)
go func() {
defer wg.Done()
date, summary, items, err := executor.RequestOverview(ctx, formatTimeByMinute(start), formatTimeByMinute(end), wheres)
if err != nil {
errChan <- err
return
}
result.Date = utils.SliceToSlice(date, func(t time.Time) string {
return t.Format("2006/01/02 15:04")
})
result.AvgRequestPerSubscriberOverview = make([]float64, 0, len(items))
result.RequestOverview = make([]*monitor_dto.StatusCodeOverview, 0, len(items))
for index, item := range items {
consumerNum := consumerMap[date[index]]
avgRequestPerSubscriber := 0.0
if consumerNum != 0 {
avgRequestPerSubscriber = float64(item.StatusTotal) / float64(consumerNum)
if avgRequestPerSubscriber > result.MaxRequestPerSubscriber {
result.MaxRequestPerSubscriber = avgRequestPerSubscriber
}
if result.MinRequestPerSubscriber == 0 || avgRequestPerSubscriber < result.MinRequestPerSubscriber {
result.MinRequestPerSubscriber = avgRequestPerSubscriber
}
}
result.AvgRequestPerSubscriberOverview = append(result.AvgRequestPerSubscriberOverview, avgRequestPerSubscriber)
result.RequestOverview = append(result.RequestOverview, &monitor_dto.StatusCodeOverview{
Status2xx: item.Status2xx,
Status4xx: item.Status4xx,
Status5xx: item.Status5xx,
})
}
result.RequestTotal = summary.StatusTotal
result.Request2xxTotal = summary.Status2xx
result.Request4xxTotal = summary.Status4xx
result.Request5xxTotal = summary.Status5xx
}()
go func() {
defer wg.Done()
startTime := formatTimeByMinute(start)
endTime := formatTimeByMinute(end)
_, summary, items, err := executor.AvgResponseTimeOverview(ctx, startTime, endTime, wheres)
if err != nil {
errChan <- err
return
}
for _, item := range items {
if item > result.MaxResponseTime {
result.MaxResponseTime = item
}
if item > 0 && (result.MinResponseTime == 0 || item < result.MinResponseTime) {
result.MinResponseTime = item
}
}
result.AvgResponseTimeOverview = items
result.AvgResponseTime = summary.Avg
}()
go func() {
defer wg.Done()
startTime := formatTimeByMinute(start)
endTime := formatTimeByMinute(end)
date, summary, items, err := executor.TrafficOverviewByStatusCode(ctx, startTime, endTime, wheres)
if err != nil {
errChan <- err
return
}
result.TrafficOverview = make([]*monitor_dto.StatusCodeOverview, 0, len(items))
result.AvgTrafficPerSubscriberOverview = make([]float64, 0, len(items))
for index, item := range items {
result.TrafficOverview = append(result.TrafficOverview, &monitor_dto.StatusCodeOverview{
Status2xx: item.Status2xx,
Status4xx: item.Status4xx,
Status5xx: item.Status5xx,
})
consumerNum := consumerMap[date[index]]
avgTrafficPerSubscriber := 0.0
if consumerNum != 0 {
avgTrafficPerSubscriber = float64(item.StatusTotal) / float64(consumerNum)
if avgTrafficPerSubscriber > result.MaxTrafficPerSubscriber {
result.MaxTrafficPerSubscriber = avgTrafficPerSubscriber
}
if result.MinTrafficPerSubscriber == 0 || result.MinTrafficPerSubscriber > avgTrafficPerSubscriber {
result.MinTrafficPerSubscriber = avgTrafficPerSubscriber
}
}
result.AvgTrafficPerSubscriberOverview = append(result.AvgTrafficPerSubscriberOverview, avgTrafficPerSubscriber)
}
result.TrafficTotal = summary.StatusTotal
result.Traffic2xxTotal = summary.Status2xx
result.Traffic4xxTotal = summary.Status4xx
result.Traffic5xxTotal = summary.Status5xx
}()
go func() {
wg.Wait()
close(errChan)
}()
errs := make([]error, 0, 3)
// 收集错误
for err := range errChan {
errs = append(errs, err)
}
if len(errs) > 0 {
return nil, fmt.Errorf("errors occurred: %v", errs)
}
return result, nil
}
func generateTopN(id string, name string, item *monitor.TopN, apiKind string) *monitor_dto.TopN {
n := &monitor_dto.TopN{
Id: id,
Name: name,
Request: common.FormatCountInt64(item.Request),
}
switch apiKind {
case "rest":
n.Traffic = common.FormatByte(item.Traffic)
case "ai":
n.Token = common.FormatCountInt64(item.Token)
}
return n
}
func (i *imlMonitorStatisticModule) Top(ctx context.Context, serviceId string, start int64, end int64, limit int, apiKind string) ([]*monitor_dto.TopN, []*monitor_dto.TopN, error) {
wheres, err := i.genOverviewWhere(ctx, serviceId, []string{apiKind})
if err != nil {
return nil, nil, err
}
executor, err := i.getExecutor(ctx, cluster.DefaultClusterID)
if err != nil {
return nil, nil, err
}
errChan := make(chan error, 2)
var wg sync.WaitGroup
apisResult, consumersResult := make([]*monitor_dto.TopN, 0), make([]*monitor_dto.TopN, 0)
var errs []error
wg.Add(2)
go func() {
defer wg.Done()
result, err := executor.TopN(ctx, formatTimeByMinute(start), formatTimeByMinute(end), limit, "api", wheres)
if err != nil {
errChan <- err
return
}
if len(result) < 1 {
return
}
apiIds := utils.SliceToSlice(result, func(t *monitor.TopN) string {
return t.Key
})
apis, err := i.apiService.ListInfo(ctx, apiIds...)
if err != nil {
errChan <- err
return
}
apiMap := utils.SliceToMap(apis, func(t *api.Info) string {
return t.UUID
})
for _, item := range result {
if v, ok := apiMap[item.Key]; ok {
apisResult = append(apisResult, generateTopN(v.UUID, v.Name, item, apiKind))
}
}
}()
go func() {
defer wg.Done()
result, err := executor.TopN(ctx, formatTimeByMinute(start), formatTimeByMinute(end), limit, "app", wheres)
if err != nil {
errChan <- err
return
}
if len(result) < 1 {
return
}
appIds := utils.SliceToSlice(result, func(t *monitor.TopN) string {
return t.Key
})
apps, err := i.serviceService.AppList(ctx, appIds...)
if err != nil {
errChan <- err
return
}
appMap := utils.SliceToMap(apps, func(t *service.Service) string {
return t.Id
})
appMap["apipark-global"] = &service.Service{
Id: "apipark-global",
Name: "System Consumer",
}
for _, item := range result {
if v, ok := appMap[item.Key]; ok {
consumersResult = append(consumersResult, generateTopN(v.Id, v.Name, item, apiKind))
}
}
}()
// 收集所有错误
go func() {
wg.Wait()
close(errChan)
}()
// 收集错误
for err := range errChan {
errs = append(errs, err)
}
if len(errs) > 0 {
return nil, nil, fmt.Errorf("errors occurred: %v", errs)
}
return apisResult, consumersResult, nil
}
func (i *imlMonitorStatisticModule) ApiStatistics(ctx context.Context, input *monitor_dto.StatisticInput) ([]*monitor_dto.ApiStatisticBasicItem, error) {
clusterId := cluster.DefaultClusterID
_, err := i.clusterService.Get(ctx, clusterId)
@@ -142,6 +579,10 @@ func (i *imlMonitorStatisticModule) SubscriberStatistics(ctx context.Context, in
if err != nil {
return nil, err
}
apps = append(apps, &service.Service{
Id: "apipark-global",
Name: "System Consumer",
})
appIds := utils.SliceToSlice(apps, func(p *service.Service) string {
return p.Id
})
@@ -350,18 +791,27 @@ func (i *imlMonitorStatisticModule) statisticOnApi(ctx context.Context, clusterI
if err != nil {
return nil, err
}
var service []*service.Service
var services []*service.Service
switch groupBy {
case "app":
service, err = i.serviceService.AppList(ctx)
services, err = i.serviceService.AppList(ctx)
if err != nil {
return nil, err
}
services = append(services, &service.Service{
Id: "apipark-global",
Name: "System Consumer",
})
case "provider":
service, err = i.serviceService.ServiceList(ctx)
services, err = i.serviceService.ServiceList(ctx)
if err != nil {
return nil, err
}
default:
return nil, errors.New("invalid group by")
}
if err != nil {
return nil, err
}
wheres, err := i.genCommonWheres(ctx, clusterId)
if err != nil {
@@ -379,7 +829,7 @@ func (i *imlMonitorStatisticModule) statisticOnApi(ctx context.Context, clusterI
}
result := make([]*monitor_dto.ServiceStatisticBasicItem, 0, len(statisticMap))
for _, item := range service {
for _, item := range services {
statisticItem := &monitor_dto.ServiceStatisticBasicItem{
Id: item.Id,
@@ -445,17 +895,21 @@ func (i *imlMonitorStatisticModule) ApiStatisticsOnSubscriber(ctx context.Contex
if err != nil {
return nil, err
}
// 根据订阅ID查询订阅的服务列表
subscriptions, err := i.subscribeService.MySubscribeServices(ctx, subscriberId, nil)
if err != nil {
return nil, err
}
serviceIds := utils.SliceToSlice(subscriptions, func(t *subscribe.Subscribe) string {
return t.Service
})
if len(serviceIds) < 1 {
return nil, nil
serviceIds := make([]string, 0)
if subscriberId != "apipark-global" {
// 根据订阅ID查询订阅的服务列表
subscriptions, err := i.subscribeService.MySubscribeServices(ctx, subscriberId, nil)
if err != nil {
return nil, err
}
serviceIds = utils.SliceToSlice(subscriptions, func(t *subscribe.Subscribe) string {
return t.Service
})
if len(serviceIds) < 1 {
return nil, nil
}
}
apiInfos, err := i.apiService.ListInfoForServices(ctx, serviceIds...)
if err != nil {
return nil, err
+4
View File
@@ -43,6 +43,10 @@ type IMonitorStatisticModule interface {
ApiStatisticsOnProvider(ctx context.Context, providerId string, input *monitor_dto.StatisticInput) ([]*monitor_dto.ApiStatisticBasicItem, error)
ApiStatisticsOnSubscriber(ctx context.Context, subscriberId string, input *monitor_dto.StatisticInput) ([]*monitor_dto.ApiStatisticBasicItem, error)
SubscriberStatisticsOnApi(ctx context.Context, apiId string, input *monitor_dto.StatisticInput) ([]*monitor_dto.ServiceStatisticBasicItem, error)
AIChartOverview(ctx context.Context, serviceId string, start int64, end int64) (*monitor_dto.ChartAIOverview, error)
RestChartOverview(ctx context.Context, serviceId string, start int64, end int64) (*monitor_dto.ChartRestOverview, error)
Top(ctx context.Context, serviceId string, start int64, end int64, limit int, apiKind string) ([]*monitor_dto.TopN, []*monitor_dto.TopN, error)
}
type IMonitorConfigModule interface {
+29 -22
View File
@@ -66,28 +66,28 @@ type imlPublishModule struct {
func (m *imlPublishModule) initGateway(ctx context.Context, partitionId string, clientDriver gateway.IClientDriver) error {
return nil
projects, err := m.serviceService.List(ctx)
if err != nil {
return err
}
projectIds := utils.SliceToSlice(projects, func(p *service.Service) string {
return p.Id
})
for _, projectId := range projectIds {
releaseInfo, err := m.GetProjectRelease(ctx, projectId, partitionId)
if err != nil {
return err
}
if releaseInfo == nil {
continue
}
err = clientDriver.Project().Online(ctx, releaseInfo)
if err != nil {
return err
}
}
return nil
//projects, err := m.serviceService.List(ctx)
//if err != nil {
// return err
//}
//projectIds := utils.SliceToSlice(projects, func(p *service.Service) string {
// return p.Id
//})
//for _, projectId := range projectIds {
// releaseInfo, err := m.GetProjectRelease(ctx, projectId, partitionId)
// if err != nil {
// return err
// }
// if releaseInfo == nil {
// continue
// }
//
// err = clientDriver.Project().Online(ctx, releaseInfo)
// if err != nil {
// return err
// }
//}
//return nil
}
func (m *imlPublishModule) getProjectRelease(ctx context.Context, projectID string, commitId string) (*gateway.ProjectRelease, error) {
@@ -110,6 +110,10 @@ func (m *imlPublishModule) getProjectRelease(ctx context.Context, projectID stri
strategyCommitIds = append(strategyCommitIds, c.Commit)
}
}
serviceInfo, err := m.serviceService.Get(ctx, projectID)
if err != nil {
return nil, err
}
apiInfos, err := m.apiService.ListInfo(ctx, apiIds...)
if err != nil {
@@ -140,6 +144,9 @@ func (m *imlPublishModule) getProjectRelease(ctx context.Context, projectID stri
},
Path: a.Path,
Methods: a.Methods,
Labels: map[string]string{
"api_kind": serviceInfo.Kind.String(),
},
//Service: a.Upstream,
}
if hasUpstream {
+1
View File
@@ -11,6 +11,7 @@ import (
type Item struct {
Id string `json:"id"`
Name string `json:"name"`
Methods []string `json:"methods"`
Protocols []string `json:"protocols"`
Path string `json:"request_path"`
+1
View File
@@ -205,6 +205,7 @@ func (i *imlRouterModule) Search(ctx context.Context, keyword string, serviceId
}
return &router_dto.Item{
Id: item.UUID,
Name: item.Name,
Methods: item.Methods,
Protocols: protocols,
Path: item.Path,
+77
View File
@@ -221,3 +221,80 @@ type ExportApp struct {
Description string `json:"description"`
Team string `json:"team"`
}
type Overview struct {
Id string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
EnableMCP bool `json:"enable_mcp"`
ServiceKind string `json:"service_kind"`
SubscriberNum int64 `json:"subscriber_num"`
InvokeNum int64 `json:"invoke_num"`
Logo string `json:"logo"`
AvailableMonitor bool `json:"available_monitor"`
IsReleased bool `json:"is_released"`
Catalogue auto.Label `json:"catalogue" aolabel:"catalogue"`
APINum int64 `json:"api_num"`
}
type AILogItem struct {
Id string `json:"id"`
API auto.Label `json:"api" aolabel:"api"`
Status int64 `json:"status"`
LogTime auto.TimeLabel `json:"log_time"`
Ip string `json:"ip"`
Token int64 `json:"token"`
TokenPerSecond int64 `json:"token_per_second"`
Consumer auto.Label `json:"consumer" aolabel:"service"`
Provider auto.Label `json:"provider" aolabel:"ai_provider"`
Model string `json:"model"`
}
type RestLogItem struct {
Id string `json:"id"`
API auto.Label `json:"api" aolabel:"api"`
Status int64 `json:"status"`
LogTime auto.TimeLabel `json:"log_time"`
Ip string `json:"ip"`
Consumer auto.Label `json:"consumer" aolabel:"service"`
ResponseTime string `json:"response_time"`
Traffic string `json:"traffic"`
}
type RestLogInfo struct {
Id string `json:"id"`
API auto.Label `json:"api" aolabel:"api"`
Consumer auto.Label `json:"consumer" aolabel:"service"`
IsSystemConsumer bool `json:"is_system_consumer"`
Status int64 `json:"status"`
Ip string `json:"ip"`
ResponseTime string `json:"response_time"`
Traffic string `json:"traffic"`
LogTime auto.TimeLabel `json:"log_time"`
Request OriginRequest `json:"request"`
Response OriginRequest `json:"response"`
}
type AILogInfo struct {
Id string `json:"id"`
API auto.Label `json:"api" aolabel:"api"`
Consumer auto.Label `json:"consumer" aolabel:"service"`
IsSystemConsumer bool `json:"is_system_consumer"`
Status int64 `json:"status"`
Ip string `json:"ip"`
Provider auto.Label `json:"provider" aolabel:"ai_provider"`
Model string `json:"model"`
LogTime auto.TimeLabel `json:"log_time"`
Request OriginAIRequest `json:"request"`
Response OriginAIRequest `json:"response"`
}
type OriginRequest struct {
Header string `json:"header"`
Origin string `json:"origin"`
Body string `json:"body"`
}
type OriginAIRequest struct {
OriginRequest
Token int64 `json:"token"`
}
+215 -6
View File
@@ -5,10 +5,13 @@ import (
"encoding/json"
"errors"
"fmt"
"net/url"
"sort"
"strings"
"time"
"github.com/APIParkLab/APIPark/common"
"github.com/mitchellh/mapstructure"
"github.com/eolinker/go-common/register"
@@ -27,6 +30,7 @@ import (
model_runtime "github.com/APIParkLab/APIPark/ai-provider/model-runtime"
"github.com/APIParkLab/APIPark/resources/access"
log_service "github.com/APIParkLab/APIPark/service/log"
"github.com/eolinker/eosc/log"
"github.com/eolinker/go-common/server"
@@ -79,14 +83,217 @@ type imlServiceModule struct {
tagService tag.ITagService `autowired:""`
localModelService ai_local.ILocalModelService `autowired:""`
serviceTagService service_tag.ITagService `autowired:""`
apiService api.IAPIService `autowired:""`
apiDocService api_doc.IAPIDocService `autowired:""`
clusterService cluster.IClusterService `autowired:""`
transaction store.ITransaction `autowired:""`
serviceTagService service_tag.ITagService `autowired:""`
apiService api.IAPIService `autowired:""`
apiDocService api_doc.IAPIDocService `autowired:""`
clusterService cluster.IClusterService `autowired:""`
subscribeServer subscribe.ISubscribeService `autowired:""`
releaseService release.IReleaseService `autowired:""`
serviceModelMappingService service_model_mapping.IServiceModelMappingService `autowired:""`
logService log_service.ILogService `autowired:""`
transaction store.ITransaction `autowired:""`
}
func formatHeader(header string) string {
result, err := url.QueryUnescape(header)
if err != nil {
return header
}
result = strings.ReplaceAll(result, "&", "\n")
result = strings.ReplaceAll(result, "=", ": ")
return result
}
func (i *imlServiceModule) RestLogInfo(ctx context.Context, serviceId string, logId string) (*service_dto.RestLogInfo, error) {
c, err := i.clusterService.Get(ctx, cluster.DefaultClusterID)
if err != nil {
return nil, fmt.Errorf("cluster %s not found", cluster.DefaultClusterID)
}
info, err := i.logService.LogInfo(ctx, "loki", c.Cluster, logId)
if err != nil {
return nil, err
}
if info.Service != serviceId {
return nil, errors.New("service not match")
}
logInfo := &service_dto.RestLogInfo{
Id: info.ID,
API: auto.UUID(info.API),
Consumer: auto.UUID(info.Consumer),
Status: info.StatusCode,
Ip: info.RemoteIP,
ResponseTime: common.FormatTime(info.ResponseTime),
Traffic: common.FormatByte(info.Traffic),
LogTime: auto.TimeLabel(info.RecordTime),
Request: service_dto.OriginRequest{
Header: formatHeader(info.RequestHeader),
Origin: info.RequestBody,
Body: info.RequestBody,
},
Response: service_dto.OriginRequest{
Header: formatHeader(info.ResponseHeader),
Origin: info.ResponseBody,
Body: info.ResponseBody,
},
}
if info.Consumer == "apipark-global" {
logInfo.IsSystemConsumer = true
logInfo.Consumer = auto.Label{
Id: info.Consumer,
Name: "System Consumer",
}
}
return logInfo, nil
}
func (i *imlServiceModule) AILogInfo(ctx context.Context, serviceId string, logId string) (*service_dto.AILogInfo, error) {
c, err := i.clusterService.Get(ctx, cluster.DefaultClusterID)
if err != nil {
return nil, fmt.Errorf("cluster %s not found", cluster.DefaultClusterID)
}
info, err := i.logService.LogInfo(ctx, "loki", c.Cluster, logId)
if err != nil {
return nil, err
}
if info.Service != serviceId {
return nil, errors.New("service not match")
}
response, err := parseAIResponse(info.ResponseBody)
if err != nil {
response = info.ResponseBody
}
logInfo := &service_dto.AILogInfo{
Id: info.ID,
API: auto.UUID(info.API),
Consumer: auto.UUID(info.Consumer),
Status: info.StatusCode,
Ip: info.RemoteIP,
Provider: auto.UUID(info.AIProvider),
Model: info.AIModel,
LogTime: auto.TimeLabel(info.RecordTime),
Request: service_dto.OriginAIRequest{
OriginRequest: service_dto.OriginRequest{
Header: formatHeader(info.RequestHeader),
Origin: info.RequestBody,
Body: parseAIRequest(info.RequestBody),
},
Token: info.InputToken,
},
Response: service_dto.OriginAIRequest{
OriginRequest: service_dto.OriginRequest{
Header: formatHeader(info.ResponseHeader),
Origin: info.ResponseBody,
Body: response,
},
Token: info.OutputToken,
},
}
if info.Consumer == "apipark-global" {
logInfo.IsSystemConsumer = true
logInfo.Consumer = auto.Label{
Id: info.Consumer,
Name: "System Consumer",
}
}
return logInfo, nil
}
func (i *imlServiceModule) RestLogs(ctx context.Context, serviceId string, start int64, end int64, page int, size int) ([]*service_dto.RestLogItem, int64, error) {
list, total, err := i.logService.LogRecordsByService(ctx, serviceId, time.Unix(start, 0), time.Unix(end, 0), page, size)
if err != nil {
return nil, 0, err
}
return utils.SliceToSlice(list, func(s *log_service.Item) *service_dto.RestLogItem {
item := &service_dto.RestLogItem{
Id: s.ID,
API: auto.UUID(s.API),
Status: s.StatusCode,
LogTime: auto.TimeLabel(s.RecordTime),
Ip: s.RemoteIP,
Consumer: auto.UUID(s.Consumer),
ResponseTime: common.FormatTime(s.ResponseTime),
Traffic: common.FormatByte(s.Traffic),
}
if s.Consumer == "apipark-global" {
item.Consumer = auto.Label{
Id: s.Consumer,
Name: "System Consumer",
}
}
return item
}), total, nil
}
func (i *imlServiceModule) AILogs(ctx context.Context, serviceId string, start int64, end int64, page int, size int) ([]*service_dto.AILogItem, int64, error) {
list, total, err := i.logService.LogRecordsByService(ctx, serviceId, time.Unix(start, 0), time.Unix(end, 0), page, size)
if err != nil {
return nil, 0, err
}
return utils.SliceToSlice(list, func(s *log_service.Item) *service_dto.AILogItem {
item := &service_dto.AILogItem{
Id: s.ID,
API: auto.UUID(s.API),
Status: s.StatusCode,
LogTime: auto.TimeLabel(s.RecordTime),
Ip: s.RemoteIP,
Token: s.TotalToken,
TokenPerSecond: s.TotalToken * 1000 / s.ResponseTime,
Consumer: auto.UUID(s.Consumer),
Provider: auto.UUID(s.AIProvider),
Model: s.AIModel,
}
if s.Consumer == "apipark-global" {
item.Consumer = auto.Label{
Id: s.Consumer,
Name: "System Consumer",
}
}
return item
}), total, nil
}
func (i *imlServiceModule) ServiceOverview(ctx context.Context, id string) (*service_dto.Overview, error) {
info, err := i.serviceService.Get(ctx, id)
if err != nil {
return nil, err
}
apiCountMap, err := i.apiDocService.APICountByServices(ctx, id)
if err != nil {
return nil, err
}
subscribeMap, err := i.subscribeServer.CountMapByService(ctx, subscribe.ApplyStatusSubscribe, id)
if err != nil {
return nil, err
}
result := &service_dto.Overview{
Id: info.Id,
Name: info.Name,
Description: info.Description,
EnableMCP: info.EnableMCP,
ServiceKind: info.Kind.String(),
SubscriberNum: subscribeMap[id],
Logo: info.Logo,
Catalogue: auto.UUID(info.Catalogue),
APINum: apiCountMap[id],
}
_, err = i.releaseService.GetRunning(ctx, id)
if err != nil {
if !errors.Is(err, gorm.ErrRecordNotFound) {
return nil, err
}
} else {
result.IsReleased = true
}
return result, nil
}
func (i *imlServiceModule) OnInit() {
@@ -746,7 +953,9 @@ func (i *imlServiceModule) Delete(ctx context.Context, id string) error {
Id: id,
})
if err != nil {
return err
if err.Error() != "nil" {
return err
}
}
err = client.Subscribe().Offline(ctx, &gateway.SubscribeRelease{
Service: id,
+10
View File
@@ -34,6 +34,16 @@ type IServiceModule interface {
//MySimple 获取我的简易项目列表
MySimple(ctx context.Context) ([]*service_dto.SimpleServiceItem, error)
ServiceOverview(ctx context.Context, id string) (*service_dto.Overview, error)
ILogModule
}
type ILogModule interface {
AILogs(ctx context.Context, serviceId string, start int64, end int64, page int, size int) ([]*service_dto.AILogItem, int64, error)
RestLogs(ctx context.Context, serviceId string, start int64, end int64, page int, size int) ([]*service_dto.RestLogItem, int64, error)
RestLogInfo(ctx context.Context, serviceId string, logId string) (*service_dto.RestLogInfo, error)
AILogInfo(ctx context.Context, serviceId string, logId string) (*service_dto.AILogInfo, error)
}
type IServiceDocModule interface {
+115
View File
@@ -0,0 +1,115 @@
package service
import (
"bufio"
"encoding/json"
"strings"
)
// ChatCompletionChunk represents the structure of a single chunk in the streaming response
type ChatCompletionChunk struct {
Object string `json:"object"`
Choices []Choice `json:"choices"`
}
// ChatCompletion represents the structure of a non-streaming response
type ChatCompletion struct {
Object string `json:"object"`
Choices []FullChoice `json:"choices"`
}
// Choice represents a choice in the streaming chunk
type Choice struct {
Delta Delta `json:"delta"`
FinishReason *string `json:"finish_reason"`
}
// FullChoice represents a choice in the non-streaming response
type FullChoice struct {
Message Message `json:"message"`
}
// Delta represents the delta content in a streaming choice
type Delta struct {
Content string `json:"content"`
Role string `json:"role,omitempty"`
}
// Message represents the message content in a non-streaming choice
type Message struct {
Content string `json:"content"`
Role string `json:"role"`
}
// ParseAIResponse parses both streaming and non-streaming AI responses and returns the concatenated content
func parseAIResponse(input string) (string, error) {
// First, try to parse as a non-streaming response
var nonStreaming ChatCompletion
if err := json.Unmarshal([]byte(input), &nonStreaming); err == nil && nonStreaming.Object == "chat.completion" {
var result strings.Builder
for _, choice := range nonStreaming.Choices {
result.WriteString(choice.Message.Content)
}
return result.String(), nil
}
// If not non-streaming, parse as streaming response
var result strings.Builder
scanner := bufio.NewScanner(strings.NewReader(input))
for scanner.Scan() {
line := scanner.Text()
// Skip empty lines or [DONE]
if line == "" || line == "data: [DONE]" {
continue
}
// Check if line starts with "data: "
if !strings.HasPrefix(line, "data: ") {
continue
}
// Extract JSON data
jsonData := strings.TrimPrefix(line, "data: ")
var chunk ChatCompletionChunk
if err := json.Unmarshal([]byte(jsonData), &chunk); err != nil {
return "", err
}
// Process each choice
for _, choice := range chunk.Choices {
// Append content from delta
result.WriteString(choice.Delta.Content)
// Check if this is the final chunk
if choice.FinishReason != nil && *choice.FinishReason == "stop" {
return result.String(), nil
}
}
}
if err := scanner.Err(); err != nil {
return "", err
}
return result.String(), nil
}
func parseAIRequest(ori string) string {
type aiRequest struct {
Messages []struct {
Role string `json:"role"`
Content string `json:"content"`
} `json:"messages"`
}
var req aiRequest
err := json.Unmarshal([]byte(ori), &req)
if err != nil {
return ori
}
size := len(req.Messages)
if size == 0 {
return ""
}
return req.Messages[size-1].Content
}
+5
View File
@@ -22,5 +22,10 @@ func (p *plugin) monitorStatisticApis() []pm3.Api {
pm3.CreateApiWidthDoc(http.MethodPost, "/api/v1/monitor/:data_type/trend", []string{"context", "rest:data_type", "query:id", "body"}, []string{"tendency", "time_interval"}, p.monitorStatisticController.InvokeTrend),
pm3.CreateApiWidthDoc(http.MethodPost, "/api/v1/monitor/:data_type/trend/:typ", []string{"context", "rest:data_type", "rest:typ", "query:api", "query:provider", "query:subscriber", "body"}, []string{"tendency", "time_interval"}, p.monitorStatisticController.InvokeTrendInner),
pm3.CreateApiWidthDoc(http.MethodPost, "/api/v1/monitor/:data_type/statistics/:typ", []string{"context", "rest:data_type", "rest:typ", "query:id", "body"}, []string{"statistics"}, p.monitorStatisticController.StatisticsInner),
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/monitor/overview/chart/rest", []string{"context", "query:start", "query:end"}, []string{"overview"}, p.monitorStatisticController.ChartRestOverview, access.SystemAnalysisRunViewView),
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/monitor/overview/chart/ai", []string{"context", "query:start", "query:end"}, []string{"overview"}, p.monitorStatisticController.ChartAIOverview, access.SystemAnalysisRunViewView),
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/monitor/overview/top10/rest", []string{"context", "query:start", "query:end", "query:limit"}, []string{"apis", "consumers"}, p.monitorStatisticController.RestTopN, access.SystemAnalysisRunViewView),
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/monitor/overview/top10/ai", []string{"context", "query:start", "query:end", "query:limit"}, []string{"apis", "consumers"}, p.monitorStatisticController.AITopN, access.SystemAnalysisRunViewView),
}
}
+10
View File
@@ -39,5 +39,15 @@ func (p *plugin) ServiceApis() []pm3.Api {
pm3.CreateApiSimple(http.MethodGet, "/api/v1/service/swagger/:id", p.serviceController.Swagger),
pm3.CreateApiSimple(http.MethodGet, "/api/v1/service/apidoc/:id", p.serviceController.Swagger),
pm3.CreateApiSimple(http.MethodGet, "/api/v1/export/openapi/:id", p.serviceController.ExportSwagger),
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/monitor/top10", []string{"context", "query:service", "query:start", "query:end"}, []string{"apis", "consumers"}, p.serviceController.Top10, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView),
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/overview/monitor/ai", []string{"context", "query:service", "query:start", "query:end"}, []string{"overview"}, p.serviceController.AIChartOverview, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView),
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/overview/monitor/rest", []string{"context", "query:service", "query:start", "query:end"}, []string{"overview"}, p.serviceController.RestChartOverview, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView),
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/overview/basic", []string{"context", "query:service"}, []string{"overview"}, p.serviceController.ServiceOverview, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView),
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/logs/ai", []string{"context", "query:service", "query:start", "query:end", "query:page", "query:page_size"}, []string{"logs", "total"}, p.serviceController.AILogs, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView),
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/logs/rest", []string{"context", "query:service", "query:start", "query:end", "query:page", "query:page_size"}, []string{"logs", "total"}, p.serviceController.RestLogs, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView),
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/log/rest", []string{"context", "query:service", "query:log"}, []string{"log"}, p.serviceController.RestLogInfo, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView),
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/log/ai", []string{"context", "query:service", "query:log"}, []string{"log"}, p.serviceController.AILogInfo, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView),
}
}
+8 -4
View File
@@ -1,10 +1,14 @@
# 名称:apipark通用镜像
# 创建时间:2022-10-25
FROM centos:7.9.2009
MAINTAINER liujian
FROM alpine:latest
RUN sed -i 's|https://dl-cdn.alpinelinux.org/alpine|https://mirrors.aliyun.com/alpine|g' /etc/apk/repositories \
&& apk update \
&& apk add --no-cache curl tzdata bind-tools
ENV TZ=Asia/Shanghai
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
RUN ln -sf /usr/share/zoneinfo/${TZ} /etc/localtime \
&& echo "Asia/Shanghai" > /etc/timezone
ARG APP
+1 -1
View File
@@ -73,7 +73,7 @@ build_frontend() {
echo_info "Install dependencies..."
pnpm install --registry https://registry.npmmirror.com --dir ./frontend
echo_info "Build frontend..."
cd ./frontend && pnpm run build
cd ./frontend && pnpm run build --verbose
cd ..
else
echo_info "Need not build frontend."
+9 -5
View File
@@ -10,6 +10,11 @@ BuildMode=$3
if [[ "${BuildMode}" == "" ]];then
BuildMode="all"
fi
if [[ "${ARCH}" == "" ]];then
ARCH="amd64"
fi
# 编译可执行文件
./scripts/build.sh "cmd" "" "${BuildMode}" ${ARCH}
@@ -22,12 +27,11 @@ mkdir -p scripts/cmd/ && cp cmd/${APP} scripts/cmd/ && cp cmd/apipark_ai_event_l
VERSION=$(gen_version)
if [[ "${ARCH}" == "" ]];then
ARCH="amd64"
fi
OPTIONS=""
if [[ "${ARCH}" == "arm" ]];then
SYS_ARCH=$(arch)
if [[ (${SYS_ARCH} == "aarch64" || ${SYS_ARCH} == "arm64") && $ARCH == "amd64" ]];then
OPTIONS="--platform=linux/amd64"
elif [[ ${SYS_ARCH} == "amd64" && $ARCH == "arm64" ]];then
OPTIONS="--platform=linux/arm64"
fi
+29 -3
View File
@@ -1,11 +1,10 @@
#!/bin/sh
set -e
source ./init_config.sh
OLD_IFS="$IFS"
IFS=","
arr=(${REDIS_ADDR})
IFS="$OLD_IFS"
@@ -21,10 +20,11 @@ echo -e "redis:" >> config.yml
echo -e " user_name: ${REDIS_USER_NAME}" >> config.yml
echo -e " password: ${REDIS_PWD}" >> config.yml
echo -e " addr: " >> config.yml
for s in ${arr[@]}
for s in $REDIS_ADDR
do
echo -e " - $s" >> config.yml
done
echo -e "nsq:" >> config.yml
echo -e " addr: ${NSQ_ADDR}" >> config.yml
echo -e " topic_prefix: ${NSQ_TOPIC_PREFIX}" >> config.yml
@@ -38,5 +38,31 @@ echo -e " log_period: ${ERROR_PERIOD}" >> config.yml
cat config.yml
nohup ./apipark >> run.log 2>&1 &
wait_for_apipark
nohup ./apipark_ai_event_listen >> run.log 2>&1 &
if [[ ${Init} == "true" ]];then
login_apipark
r=$(is_init)
if [[ $r == "true" ]];then
echo "Already initialized, skipping initialization."
exit 0
fi
wait_for_influxdb
wait_for_apinto
set_cluster
wait_for_influxdb
set_influxdb
set_loki
set_nsq
set_openapi_config
# 重启apipark
kill -9 $(pgrep apipark)
nohup ./apipark >> run.log 2>&1 &
fi
tail -F run.log
+292
View File
@@ -0,0 +1,292 @@
#!/bin/sh
Cookie=""
if [[ "$ApiparkAddress" == "" ]]; then
ApiparkAddress="http://127.0.0.1:8288"
fi
if [[ "${ApintoAddress}" == "" ]]; then
ApintoAddress="http://apipark-apinto:9400"
fi
if [[ "$InfluxdbAddress" == "" ]]; then
InfluxdbAddress="http://apipark-influxdb:8086"
fi
if [[ "$NSQAddress" == "" ]]; then
NSQAddress="apipark-nsq:4150"
fi
if [[ "$LokiAddress" == "" ]]; then
LokiAddress="http://apipark-loki:3100"
fi
echo_fail() {
printf "\e[91m✘ Error:\e[0m $@\n" >&2
}
echo_pass() {
printf "\e[92m✔ Passed:\e[0m $@\n" >&2
}
echo_warn() {
printf "\e[93m⚠ Warning:\e[0m $@\n" >&2
}
echo_pause() {
printf "\e[94m⏸ Pause:\e[0m $1\n" >&2
}
echo_question() {
printf "\e[95m? Question:\e[0m $@\n" >&2
}
echo_info() {
printf "\e[96m Info:\e[0m $1\n" >&2
}
echo_point() {
printf "\e[94m➜ Point:\e[0m $1\n" >&2
}
echo_bullet() {
printf "\e[94m• Step:\e[0m $1\n" >&2
}
echo_wait() {
printf "\e[95m⏳ Waiting:\e[0m $1\n" >&2
}
echo_split() {
echo "" >&2
echo "" >&2
echo -e "\e[94m────────────────────────────────────────────────────────────\e[0m" >&2
}
request_apipark() {
path=$1
body=$2
method=$3
if [[ "$method" == "" ]]; then
method="POST"
fi
if [[ "$Cookie" == "" ]]; then
cmd="curl -X ${method} -s -i -H \"Content-Type: application/json\" -d '$body' \"${ApiparkAddress}${path}\""
echo_info "Executing: $cmd" # 打印命令
response=$(eval "$cmd")
else
cmd="curl -X ${method} -s -i -H \"Content-Type: application/json\" -H \"Cookie: $Cookie\" -d '$body' \"${ApiparkAddress}${path}\""
echo_info "Executing: $cmd" # 打印命令
response=$(eval "$cmd")
fi
echo "$response"
}
request_apinto() {
path="$1"
body="$2"
echo_info "Executing: curl -i -X POST -H \"Content-Type: application/json\" \"${ApintoAddress}${path}\" -d '$body'"
response=$(curl -i -X POST -H "Content-Type: application/json" "${ApintoAddress}${path}" -d "$body")
status_code=$(echo "$response" | grep -E 'HTTP/[0-9.]+ [0-9]+' | awk '{print $2}' || echo "0")
echo_info "$response"
echo "$status_code"
}
login_apipark() {
# 执行登录请求并捕获响应头
body='{"name":"admin","password":"'"${ADMIN_PASSWORD}"'"}'
response=$(request_apipark "/api/v1/account/login/username" "$body")
# 从响应中提取 Set-Cookie 头
cookie=$(echo "$response" | grep -i "Set-Cookie" | sed 's/Set-Cookie: //;s/;.*//')
# 提取 JSON 主体(假设 JSON 在最后一行或响应中可识别)
json_body=$(echo "$response" | grep '^{.*}$')
# 提取 code 值
code=$(echo "$json_body" | sed 's/.*"code":\([0-9]*\).*/\1/')
# 检查 code 是否为 0
if [ "$code" -eq 0 ]; then
Cookie=$cookie
echo_pass "login success"
else
echo_fail "login failed: $json_body"
exit 1
fi
}
set_cluster() {
# 设置集群地址
body='{"manager_address":"'"${ApintoAddress}"'"}'
path="/api/v1/cluster/reset"
response=$(request_apipark "$path" "$body" "PUT")
# 从响应中提取 code
code=$(echo "${response}" | grep '^{.*}$' | sed 's/.*"code":\([0-9]*\).*/\1/')
if [ "$code" -eq 0 ]; then
echo_pass "Set cluster successfully"
else
echo_fail "Set cluster failed: ${response}"
exit 1
fi
}
set_loki() {
# 设置 loki 地址
body='{"config":{"url":"'"${LokiAddress}"'"}}'
path="/api/v1/log/loki"
response=$(request_apipark "$path" "$body")
# 从响应中提取 code
code=$(echo "${response}" | grep '^{.*}$' | sed 's/.*"code":\([0-9]*\).*/\1/')
if [ "$code" -eq 0 ]; then
echo_pass "Set loki successfully"
else
echo_fail "Set loki failed: ${response}"
exit 1
fi
}
set_nsq() {
body="{
\"address\": [
\"${NSQAddress}\"
],
\"description\": \"auto init nsqd config\",
\"driver\": \"nsqd\",
\"formatter\": {
\"ai\": [
\"\$ai_provider\",
\"\$ai_model\",
\"\$ai_model_input_token\",
\"\$ai_model_output_token\",
\"\$ai_model_total_token\",
\"\$ai_model_cost\",
\"\$ai_provider_statuses\"
],
\"fields\": [
\"\$time_iso8601\",
\"\$request_id\",
\"\$api\",
\"\$provider\",
\"@ai\"
]
},
\"scopes\": [
\"access_log\"
],
\"topic\": \"apipark_ai_event\",
\"type\": \"json\"
}"
status_code=$(request_apinto "/api/output/ai_event" "$body")
echo "Status code: $status_code"
if [ "$status_code" -eq 200 ]; then
echo_pass "Update nsq successfully"
else
echo_fail "Update nsq failed: ${status_code}"
exit 1
fi
}
set_influxdb() {
if [ -z "$InfluxdbToken" ]; then
echo_fail "Influxdb token is empty"
exit 1
fi
if [ -z "$InfluxdbOrg" ]; then
InfluxdbOrg="apipark"
fi
body='{"driver":"influxdb-v2","config":{"addr":"'"${InfluxdbAddress}"'","org":"'"${InfluxdbOrg}"'","token":"'${InfluxdbToken}'"}}'
response=$(request_apipark "/api/v1/monitor/config" "$body")
# 从响应中提取 code
code=$(echo "${response}" | grep '^{.*}$' | sed 's/.*"code":\([0-9]*\).*/\1/')
if [ "$code" -eq 0 ]; then
echo_pass "Update influxdb config successfully"
else
echo_fail "Update influxdb config failed: ${response}"
exit 1
fi
}
retry() {
max_wait="$1"
shift
cmd="$@"
sleep_interval=2
curr_wait=0
until $cmd
do
if [ "$curr_wait" -ge "$max_wait" ]
then
echo "Command '$cmd' failed after $curr_wait seconds."
return 1
else
curr_wait=$((curr_wait + sleep_interval))
sleep "$sleep_interval"
fi
done
}
is_init() {
path="/api/v1/system/general"
method="GET"
response=$(request_apipark "$path" "" "$method")
# 从响应中提取 site_prefix
site_prefix=$(echo "${response}" | grep '^{.*}$' | sed 's/.*"site_prefix":"\{0,1\}\([^"]*\)"\{0,1\}.*/\1/')
if [ -z "$site_prefix" ]; then
echo_pass "No apipark openapi address set"
echo false
else
echo_pass "Apipark openapi address found: $site_prefix"
echo true
fi
}
set_openapi_config() {
IP=$(dig +short myip.opendns.com @resolver1.opendns.com | grep -E '^[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}$')
if [ -z "$IP" ]; then
echo_fail "Failed to resolve IP address"
exit 1
fi
body='{"site_prefix":"http://'"${IP}"':18288"}'
response=$(request_apipark "/api/v1/system/general" "$body")
# 从响应中提取 code
code=$(echo "${response}" | grep '^{.*}$' | sed 's/.*"code":\([0-9]*\).*/\1/')
if [ "$code" -eq 0 ]; then
echo_pass "Update apipark openapi address successfully"
else
echo_fail "Update apipark openapi address failed: ${response}"
exit 1
fi
}
wait_for() {
waitName=$1
cmd=$2
echo ${cmd}
echo_wait "Waiting for ${waitName} to start..."
retry 30 ${cmd}
if [ $? -eq 0 ]; then
echo_pass "${waitName} has been installed successfully"
else
echo_fail "${waitName} installation failed"
exit 1
fi
}
wait_for_apipark() {
wait_for "apipark" "curl -s -o /dev/null ${ApiparkAddress}/api/v1/account/login"
}
wait_for_apinto() {
wait_for "apinto" "curl -s -o /dev/null ${ApintoAddress}/api/router"
}
wait_for_influxdb() {
wait_for "influxdb" "curl -s -o /dev/null ${InfluxdbAddress}/api/v2/health"
}
+107 -3
View File
@@ -23,7 +23,77 @@ var (
)
type imlLogService struct {
store log_source.ILogSourceStore `autowired:""`
store log_source.ILogSourceStore `autowired:""`
logRecordStore log_source.ILogRecordStore `autowired:""`
}
func (i *imlLogService) LogRecordsByService(ctx context.Context, serviceId string, start time.Time, end time.Time, page int, size int) ([]*Item, int64, error) {
list, total, err := i.logRecordStore.ListPage(ctx, "`record_time` between ? and ? and `service` = ?", page, size, []interface{}{
start,
end,
serviceId,
}, "record_time desc")
if err != nil {
return nil, 0, err
}
return utils.SliceToSlice(list, func(s *log_source.LogRecord) *Item {
return &Item{
ID: s.UUID,
Strategy: s.Strategy,
Service: s.Service,
API: s.API,
Method: s.Method,
Url: s.Url,
RemoteIP: s.RemoteIP,
Consumer: s.Consumer,
Authorization: s.Authorization,
InputToken: s.InputToken,
OutputToken: s.OutputToken,
TotalToken: s.TotalToken,
AIProvider: s.AIProvider,
AIModel: s.AIModel,
StatusCode: s.StatusCode,
ResponseTime: s.ResponseTime,
Traffic: s.Traffic,
RecordTime: s.RecordTime,
}
}), total, nil
}
func (i *imlLogService) InsertLog(ctx context.Context, driver string, input *InsertLog) error {
// 判断日志是否已存在,若已存在,则不插入
_, err := i.logRecordStore.First(ctx, map[string]interface{}{"uuid": input.ID})
if err != nil {
if !errors.Is(err, gorm.ErrRecordNotFound) {
log_print.Errorf("get log record %s error: %s", input.ID, err)
return err
}
return i.logRecordStore.Insert(ctx, &log_source.LogRecord{
UUID: input.ID,
Driver: input.Driver,
Service: input.Service,
API: input.API,
Strategy: input.Strategy,
Method: input.Method,
Url: input.Url,
RemoteIP: input.RemoteIP,
Consumer: input.Consumer,
Authorization: input.Authorization,
InputToken: input.InputToken,
OutputToken: input.OutputToken,
TotalToken: input.TotalToken,
AIProvider: input.AIProvider,
AIModel: input.AIModel,
StatusCode: input.StatusCode,
ResponseTime: input.ResponseTime,
Traffic: input.Traffic,
RecordTime: input.RecordTime,
})
}
return nil
}
func (i *imlLogService) OnComplete() {
@@ -67,9 +137,10 @@ func (i *imlLogService) UpdateLogSource(ctx context.Context, driver string, inpu
if input.Config == nil || *input.Config == "" {
return errors.New("config is required")
}
now := time.Now()
userId := utils.UserId(ctx)
s = &log_source.Log{
s = &log_source.LogSource{
UUID: input.ID,
Cluster: *input.Cluster,
Driver: driver,
@@ -79,11 +150,19 @@ func (i *imlLogService) UpdateLogSource(ctx context.Context, driver string, inpu
CreateAt: now,
UpdateAt: now,
}
if input.LastPullTime == nil {
s.LastPullAt = time.Now().Add(-24 * time.Hour)
} else {
s.LastPullAt = *input.LastPullTime
}
} else {
if input.Config != nil && *input.Config != "" {
s.Config = *input.Config
}
if input.LastPullTime != nil {
s.LastPullAt = *input.LastPullTime
}
s.Updater = utils.UserId(ctx)
s.UpdateAt = time.Now()
}
@@ -129,6 +208,10 @@ func (i *imlLogService) Logs(ctx context.Context, driver string, cluster string,
return result, count, nil
}
func (i *imlLogService) LogRecords(ctx context.Context, driver string, keyword string, start time.Time, end time.Time) ([]*Item, int64, error) {
panic(errors.New("not implemented"))
}
func (i *imlLogService) LogCount(ctx context.Context, driver string, cluster string, conditions map[string]string, spendHour int64, group string) (map[string]int64, error) {
d, has := log_driver.GetDriver(driver)
if !has {
@@ -147,11 +230,32 @@ func (i *imlLogService) LogInfo(ctx context.Context, driver string, cluster stri
return nil, err
}
return &Info{
ID: info.ID,
Item: Item{
ID: info.ID,
Strategy: info.Strategy,
Service: info.Service,
API: info.API,
Method: info.Method,
Url: info.Url,
RemoteIP: info.RemoteIP,
Consumer: info.Consumer,
Authorization: info.Authorization,
InputToken: info.InputToken,
OutputToken: info.OutputToken,
TotalToken: info.TotalToken,
AIProvider: info.AIProvider,
AIModel: info.AIModel,
StatusCode: info.StatusCode,
ResponseTime: info.ResponseTime,
Traffic: info.Traffic,
RecordTime: info.RecordTime,
},
ContentType: info.ContentType,
RequestBody: info.RequestBody,
ProxyBody: info.ProxyBody,
ProxyResponseBody: info.ProxyResponseBody,
ResponseBody: info.ResponseBody,
RequestHeader: info.RequestHeader,
ResponseHeader: info.ResponseHeader,
}, nil
}
+59 -22
View File
@@ -7,51 +7,88 @@ import (
)
type Save struct {
ID string
Cluster *string
Config *string
ID string
Cluster *string
Config *string
LastPullTime *time.Time
}
type Source struct {
ID string
Cluster string
Driver string
Config string
Creator string
Updater string
CreateAt time.Time
UpdateAt time.Time
ID string
Cluster string
Driver string
Config string
Creator string
Updater string
CreateAt time.Time
UpdateAt time.Time
LastPullTime time.Time
}
func FromEntity(ov *log_source.Log) *Source {
func FromEntity(ov *log_source.LogSource) *Source {
return &Source{
ID: ov.UUID,
Cluster: ov.Cluster,
Driver: ov.Driver,
Config: ov.Config,
Creator: ov.Creator,
Updater: ov.Updater,
CreateAt: ov.CreateAt,
UpdateAt: ov.UpdateAt,
ID: ov.UUID,
Cluster: ov.Cluster,
Driver: ov.Driver,
Config: ov.Config,
Creator: ov.Creator,
Updater: ov.Updater,
LastPullTime: ov.LastPullAt,
CreateAt: ov.CreateAt,
UpdateAt: ov.UpdateAt,
}
}
type Item struct {
type InsertLog struct {
ID string
Driver string
Strategy string
Service string
API string
Method string
Url string
RemoteIP string
Consumer string
Authorization string
InputToken int64
OutputToken int64
TotalToken int64
AIProvider string
AIModel string
StatusCode int64
ResponseTime int64
Traffic int64
RecordTime time.Time
}
type Item struct {
ID string
Strategy string
Service string
API string
Method string
Url string
RemoteIP string
Consumer string
Authorization string
InputToken int64
OutputToken int64
TotalToken int64
AIProvider string
AIModel string
StatusCode int64
ResponseTime int64
Traffic int64
RecordTime time.Time
}
type Info struct {
ID string
Item
ContentType string
RequestBody string
ProxyBody string
ProxyResponseBody string
ResponseBody string
RequestHeader string
ResponseHeader string
}
+6
View File
@@ -13,8 +13,14 @@ type ILogService interface {
UpdateLogSource(ctx context.Context, driver string, input *Save) error
GetLogSource(ctx context.Context, driver string) (*Source, error)
Logs(ctx context.Context, driver string, cluster string, conditions map[string]string, start time.Time, end time.Time, limit int64, offset int64) ([]*Item, int64, error)
LogRecordsByService(ctx context.Context, serviceId string, start time.Time, end time.Time, page int, size int) ([]*Item, int64, error)
LogCount(ctx context.Context, driver string, cluster string, conditions map[string]string, spendHour int64, group string) (map[string]int64, error)
LogInfo(ctx context.Context, driver string, cluster string, id string) (*Info, error)
InsertLog(ctx context.Context, driver string, input *InsertLog) error
}
type ILogUpdateService interface {
UpdateLogSource(ctx context.Context, driver string, input *Save) error
}
func init() {
+26
View File
@@ -157,3 +157,29 @@ type MonTrendValues struct {
Names []string
Values [][]interface{}
}
type StatusCodeOverview struct {
Status2xx int64
Status4xx int64
Status5xx int64
StatusTotal int64
}
type TokenOverview struct {
InputToken int64
OutputToken int64
TotalToken int64
}
type TopN struct {
Key string
Request int64
Token int64
Traffic int64
}
type Aggregate struct {
Max int64
Min int64
Avg int64
}
+44 -12
View File
@@ -2,22 +2,54 @@ package log_source
import "time"
type Log struct {
Id int64 `gorm:"column:id;type:BIGINT(20);AUTO_INCREMENT;NOT NULL;comment:id;primary_key;comment:主键ID;"`
UUID string `gorm:"type:varchar(36);not null;column:uuid;uniqueIndex:uuid;comment:UUID;"`
Cluster string `gorm:"column:cluster;type:varchar(36);NOT NULL;comment:集群ID"`
Driver string `gorm:"column:driver;type:VARCHAR(36);NOT NULL;comment:驱动"`
Config string `gorm:"column:config;type:TEXT;NOT NULL;comment:配置"`
Creator string `gorm:"type:varchar(36);not null;column:creator;comment:creator" aovalue:"creator"`
Updater string `gorm:"type:varchar(36);not null;column:updater;comment:updater" aovalue:"updater"`
CreateAt time.Time `gorm:"type:timestamp;NOT NULL;DEFAULT:CURRENT_TIMESTAMP;column:create_at;comment:创建时间"`
UpdateAt time.Time `gorm:"type:timestamp;NOT NULL;DEFAULT:CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;column:update_at;comment:修改时间" json:"update_at"`
type LogSource struct {
Id int64 `gorm:"column:id;type:BIGINT(20);AUTO_INCREMENT;NOT NULL;comment:id;primary_key;comment:主键ID;"`
UUID string `gorm:"type:varchar(36);not null;column:uuid;uniqueIndex:uuid;comment:UUID;"`
Cluster string `gorm:"column:cluster;type:varchar(36);NOT NULL;comment:集群ID"`
Driver string `gorm:"column:driver;type:VARCHAR(36);NOT NULL;comment:驱动"`
Config string `gorm:"column:config;type:TEXT;NOT NULL;comment:配置"`
Creator string `gorm:"type:varchar(36);not null;column:creator;comment:creator" aovalue:"creator"`
Updater string `gorm:"type:varchar(36);not null;column:updater;comment:updater" aovalue:"updater"`
LastPullAt time.Time `gorm:"type:timestamp;NOT NULL;DEFAULT:CURRENT_TIMESTAMP;column:last_pull_at;comment:最后拉取时间"`
CreateAt time.Time `gorm:"type:timestamp;NOT NULL;DEFAULT:CURRENT_TIMESTAMP;column:create_at;comment:创建时间"`
UpdateAt time.Time `gorm:"type:timestamp;NOT NULL;DEFAULT:CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;column:update_at;comment:修改时间" json:"update_at"`
}
func (c *Log) IdValue() int64 {
func (c *LogSource) IdValue() int64 {
return c.Id
}
func (c *Log) TableName() string {
func (c *LogSource) TableName() string {
return "log"
}
type LogRecord struct {
Id int64 `gorm:"column:id;type:BIGINT(20);AUTO_INCREMENT;NOT NULL;comment:id;primary_key;comment:主键ID;"`
UUID string `gorm:"type:varchar(36);not null;column:uuid;uniqueIndex:uuid;comment:UUID;"`
Driver string `gorm:"column:driver;type:VARCHAR(36);NOT NULL;comment:驱动"`
Service string `gorm:"column:service;type:varchar(36);NOT NULL;comment:服务ID"`
API string `gorm:"column:api;type:varchar(36);NOT NULL;comment:接口ID"`
Strategy string `gorm:"column:strategy;type:varchar(36);NOT NULL;comment:策略ID"`
Method string `gorm:"column:method;type:varchar(36);NOT NULL;comment:请求方法"`
Url string `gorm:"column:url;type:varchar(255);NOT NULL;comment:请求URL"`
RemoteIP string `gorm:"column:remote_ip;type:varchar(255);NOT NULL;comment:请求IP"`
Consumer string `gorm:"column:consumer;type:varchar(255);NOT NULL;comment:消费者ID"`
Authorization string `gorm:"column:authorization;type:varchar(255);NOT NULL;comment:鉴权ID"`
InputToken int64 `gorm:"column:input_token;type:int(11);NOT NULL;comment:输入令牌"`
OutputToken int64 `gorm:"column:output_token;type:int(11);NOT NULL;comment:输出令牌"`
TotalToken int64 `gorm:"column:total_token;type:int(11);NOT NULL;comment:总令牌"`
AIProvider string `gorm:"column:ai_provider;type:varchar(255);NOT NULL;comment:AI提供商"`
AIModel string `gorm:"column:ai_model;type:varchar(255);NOT NULL;comment:AI模型"`
StatusCode int64 `gorm:"column:status_code;type:int(11);NOT NULL;comment:请求状态码"`
ResponseTime int64 `gorm:"column:response_time;type:int(11);NOT NULL;comment:响应时间"`
Traffic int64 `gorm:"column:traffic;type:BIGINT(20);NOT NULL;comment:流量"`
RecordTime time.Time `gorm:"column:record_time;type:timestamp;NOT NULL;DEFAULT:CURRENT_TIMESTAMP;comment:记录时间"`
}
func (c *LogRecord) IdValue() int64 {
return c.Id
}
func (c *LogRecord) TableName() string {
return "log_record"
}
+13 -2
View File
@@ -8,15 +8,26 @@ import (
)
type ILogSourceStore interface {
store.IBaseStore[Log]
store.IBaseStore[LogSource]
}
type storeLogSource struct {
store.Store[Log]
store.Store[LogSource]
}
type ILogRecordStore interface {
store.IBaseStore[LogRecord]
}
type storeLogRecord struct {
store.Store[LogRecord]
}
func init() {
autowire.Auto[ILogSourceStore](func() reflect.Value {
return reflect.ValueOf(new(storeLogSource))
})
autowire.Auto[ILogRecordStore](func() reflect.Value {
return reflect.ValueOf(new(storeLogRecord))
})
}