mirror of
https://github.com/APIParkLab/APIPark.git
synced 2026-06-14 20:41:15 +08:00
Compare commits
143 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| ca2682fb22 | |||
| 2bd1d4a423 | |||
| b2baa711c2 | |||
| b55675e5a5 | |||
| 9a33992a0b | |||
| 07ae37eb5f | |||
| c36726f25f | |||
| e2e9abeb4c | |||
| 0fcc2215f7 | |||
| ce559c4643 | |||
| 8d4b13f633 | |||
| 19a3378fa3 | |||
| cef1250199 | |||
| 3c85658931 | |||
| ba7022bc2d | |||
| fb24abc111 | |||
| 0e3fb84e7c | |||
| 5d6d949ca4 | |||
| 3578182343 | |||
| 28bad2d963 | |||
| 384bd239fa | |||
| 98710ad296 | |||
| 4806e12907 | |||
| 9097760a0f | |||
| a5639bff60 | |||
| 1d66ed84f3 | |||
| 249ac3ea1c | |||
| b02db8020d | |||
| 4105540686 | |||
| 36d10c5cfd | |||
| f77bd76a14 | |||
| cef548ce7d | |||
| 5efd19ef7c | |||
| 28cd4fd91c | |||
| 82f4089f42 | |||
| 00ef4d2cfc | |||
| cd33448446 | |||
| ddd70b0ff5 | |||
| e5b50a7073 | |||
| 5a1baadf3b | |||
| 10bd352bf4 | |||
| cbea45e6e0 | |||
| e5c6e4fa82 | |||
| f05457fd2c | |||
| bc6875fe9f | |||
| 1572e03dd1 | |||
| 61025763ed | |||
| ef1c48e395 | |||
| 00905e4167 | |||
| 9572c4157e | |||
| fef49eb32c | |||
| a5a895e42d | |||
| ed1d19532b | |||
| 9c1b19a1c7 | |||
| bf990517dc | |||
| 0cf7f952e2 | |||
| 83873c8c92 | |||
| a61e6ba67f | |||
| 94d881cc18 | |||
| e081580786 | |||
| 8927211ea2 | |||
| 813905ca40 | |||
| 66be761d18 | |||
| 6b6fa5bd40 | |||
| 943ef4f9b0 | |||
| 78d9a1c23c | |||
| 342d022c43 | |||
| 1d36f4b821 | |||
| 4e459168df | |||
| a8c14ee839 | |||
| 23c40efe0d | |||
| a76941ea17 | |||
| 0c392d2092 | |||
| e5f0423a90 | |||
| 46caf49f18 | |||
| 7dc8d65235 | |||
| 74c87ec308 | |||
| 604a8312ef | |||
| 8b318caa0b | |||
| 6bad1c3c7c | |||
| 1333d4ed02 | |||
| a3bebde83c | |||
| d3e91b04a2 | |||
| f4400c0130 | |||
| cc5d677d67 | |||
| e4c3cbc99b | |||
| 771c86229d | |||
| 5c1db00d7e | |||
| cff536710e | |||
| cd91f4bdb9 | |||
| 79860bc665 | |||
| 4623ba6fba | |||
| 1f4acdc99e | |||
| 0307282dbd | |||
| 7dbc2a1a78 | |||
| 12d42c4247 | |||
| a22759136e | |||
| b8ebbac2b8 | |||
| 9c4590db07 | |||
| 7ba8a57793 | |||
| 4eb3368875 | |||
| 4478e6823a | |||
| ab5bffea87 | |||
| 8a48828a76 | |||
| c2b70e23e4 | |||
| eb46a4365c | |||
| 058a8f7974 | |||
| b5585f548a | |||
| e4a3e1a1a2 | |||
| 674a15ef32 | |||
| d82d665280 | |||
| 752db42b3b | |||
| 10aaf85a26 | |||
| 5c97ef9416 | |||
| 5fc84299f1 | |||
| 2eeeebf7c2 | |||
| 155ad537a9 | |||
| 4a1430c62a | |||
| 8cc0d038bd | |||
| 165759398e | |||
| 1091d4e086 | |||
| 0523f13dfb | |||
| 256c04f5bb | |||
| a9dcc78db6 | |||
| 1f6c173e18 | |||
| b593e8b57b | |||
| 1ec00de03c | |||
| bad7fbadda | |||
| 7a506fc15e | |||
| dec2c3a23e | |||
| 2093541c37 | |||
| 40c7ba4305 | |||
| a95bca31e2 | |||
| a541e45a53 | |||
| b20c66b311 | |||
| 729e1f105c | |||
| 6aa96a2ae9 | |||
| 5093c98656 | |||
| f4b70d4e71 | |||
| eeb36f43a4 | |||
| 5a59a6d378 | |||
| c0045d17e2 | |||
| 42963d3ee5 |
+2
-1
@@ -7,4 +7,5 @@
|
||||
/.vscode/
|
||||
.air.toml
|
||||
/tmp/
|
||||
/work
|
||||
/work
|
||||
/cmd/
|
||||
|
||||
@@ -0,0 +1,88 @@
|
||||
variables:
|
||||
PATH: /opt/go-1.23/go/bin/:/opt/node-1.22/bin/:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin
|
||||
GOROOT: /opt/go-1.23/go
|
||||
GOPROXY: https://goproxy.cn
|
||||
VERSION: $CI_COMMIT_SHORT_SHA
|
||||
APP: apipark
|
||||
APP_PRE: ${APP}_${VERSION}
|
||||
BUILD_DIR: ${APP}-build
|
||||
DEPLOY_DESC: "DEV 环境"
|
||||
VIEW_ADDR: http://172.18.166.219:8288
|
||||
SAVE_DIR: /opt/${APP}
|
||||
NODE_OPTIONS: --max_old_space_size=8192
|
||||
|
||||
stages:
|
||||
# - notice
|
||||
- build
|
||||
- deploy
|
||||
- webhook
|
||||
#
|
||||
#feishu-informer: # 飞书回调
|
||||
# stage: notice
|
||||
# variables:
|
||||
# DIFF_URL: "$CI_MERGE_REQUEST_PROJECT_URL/-/merge_requests/$CI_MERGE_REQUEST_IID/diffs"
|
||||
# rules:
|
||||
# - if: $CI_PIPELINE_SOURCE=="merge_request_event" && $CI_COMMIT_BRANCH =~ "main-github-pro"
|
||||
# script:
|
||||
# - echo "merge request"
|
||||
# - |
|
||||
# curl -X POST -H "Content-Type: application/json" \
|
||||
# -d "{\"msg_type\":\"text\",\"content\":{\"text\":\"项目:${CI_PROJECT_NAME}\\n提交人:${GITLAB_USER_NAME}\\n提交信息:${CI_MERGE_REQUEST_TITLE}\\n合并分支信息:${CI_MERGE_REQUEST_SOURCE_BRANCH_NAME} -> ${CI_MERGE_REQUEST_TARGET_BRANCH_NAME}\\n差异性地址:${DIFF_URL}\\n请及时review代码\"}}" \
|
||||
# ${FEISHU_WEBHOOK}
|
||||
|
||||
builder:
|
||||
stage: build
|
||||
rules:
|
||||
- if: $CI_COMMIT_BRANCH == "main-github-pro" || $CI_COMMIT_BRANCH == "main"
|
||||
script:
|
||||
- set -e
|
||||
- |
|
||||
if [ ! -d "../artifacts" ]; then
|
||||
mkdir -p ../artifacts
|
||||
fi
|
||||
if [ -d "../artifacts/dist" ]; then
|
||||
cp -r ../artifacts/dist frontend/dist
|
||||
fi
|
||||
- |
|
||||
if [ -n "$(git diff --name-status HEAD~1 HEAD -- frontend)" ]; then
|
||||
./scripts/build.sh $BUILD_DIR ${VERSION} all ""
|
||||
else
|
||||
./scripts/build.sh $BUILD_DIR ${VERSION}
|
||||
fi
|
||||
if [ -d "frontend/dist" ]; then
|
||||
echo "copy frontend/dist to artifacts/dist"
|
||||
rm -fr ../artifacts/dist
|
||||
cp -r frontend/dist ../artifacts/dist
|
||||
fi
|
||||
cp $BUILD_DIR/${APP_PRE}_linux_amd64.tar.gz ${SAVE_DIR}
|
||||
|
||||
deployer:
|
||||
stage: deploy
|
||||
rules:
|
||||
- if: $CI_COMMIT_BRANCH == "main-github-pro" || $CI_COMMIT_BRANCH == "main"
|
||||
variables:
|
||||
APIPARK_GUEST_MODE: allow
|
||||
APIPARK_GUEST_ID: dklejrfbhjqwdh
|
||||
script:
|
||||
- cd ${SAVE_DIR};mkdir -p ${APP_PRE};tar -zxvf ${APP_PRE}_linux_amd64.tar.gz -C ${APP_PRE};cd ${APP_PRE};./install.sh ${SAVE_DIR};./run.sh restart;cd ${SAVE_DIR} && ./clean.sh ${APP_PRE}
|
||||
when: on_success
|
||||
success:
|
||||
stage: webhook
|
||||
rules:
|
||||
- if: $CI_COMMIT_BRANCH == "main-github-pro" || $CI_COMMIT_BRANCH == "main"
|
||||
script:
|
||||
- |
|
||||
curl -X POST -H "Content-Type: application/json" \
|
||||
-d "{\"msg_type\":\"text\",\"content\":{\"text\":\"最近一次提交:${CI_COMMIT_TITLE}\\n提交人:${GITLAB_USER_NAME}\\n项目:${CI_PROJECT_NAME}\\n环境:${DEPLOY_DESC}\\n更新部署完成.\\n访问地址:${VIEW_ADDR}\\n工作流地址:${CI_PIPELINE_URL}\"}}" \
|
||||
${FEISHU_WEBHOOK}
|
||||
when: on_success
|
||||
failure:
|
||||
stage: webhook
|
||||
rules:
|
||||
- if: $CI_COMMIT_BRANCH == "main-github-pro" || $CI_COMMIT_BRANCH == "main"
|
||||
script:
|
||||
- |
|
||||
curl -X POST -H "Content-Type: application/json" \
|
||||
-d "{\"msg_type\":\"text\",\"content\":{\"text\":\"最近一次提交:${CI_COMMIT_TITLE}\\n提交人:${GITLAB_USER_NAME}\\n项目:${CI_PROJECT_NAME}\\n环境:${DEPLOY_DESC}\\n更新部署失败,请及时到gitlab上查看\\n工作流地址:${CI_PIPELINE_URL}\"}}" \
|
||||
${FEISHU_WEBHOOK}
|
||||
when: on_failure
|
||||
@@ -0,0 +1,77 @@
|
||||
package common
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
func FormatCountInt64(count int64) string {
|
||||
switch {
|
||||
case count < 1000:
|
||||
return strconv.FormatInt(count, 10)
|
||||
case count < 1000000:
|
||||
return fmt.Sprintf("%.1fK", float64(count)/1000)
|
||||
case count < 1000000000:
|
||||
return fmt.Sprintf("%.1fM", float64(count)/1000000)
|
||||
case count < 1000000000000:
|
||||
return fmt.Sprintf("%.1fB", float64(count)/1000000000)
|
||||
default:
|
||||
return fmt.Sprintf("%.1fT", float64(count)/1000000000000)
|
||||
}
|
||||
}
|
||||
|
||||
func FormatCountFloat64(count float64) string {
|
||||
switch {
|
||||
case count < 1000:
|
||||
return fmt.Sprintf("%.1f", count)
|
||||
case count < 1000000:
|
||||
return fmt.Sprintf("%.1fK", count/1000)
|
||||
case count < 1000000000:
|
||||
return fmt.Sprintf("%.1fM", count/1000000)
|
||||
case count < 1000000000000:
|
||||
return fmt.Sprintf("%.1fB", count/1000000000)
|
||||
default:
|
||||
return fmt.Sprintf("%.1fT", count/1000000000000)
|
||||
}
|
||||
}
|
||||
|
||||
func FormatTime(t int64) string {
|
||||
if t < 1000 {
|
||||
return strconv.FormatInt(t, 10) + "ms"
|
||||
}
|
||||
if t < 1000000 {
|
||||
return fmt.Sprintf("%.1fs", float64(t)/1000)
|
||||
}
|
||||
if t < 1000000000 {
|
||||
return fmt.Sprintf("%.1fmin", float64(t)/1000000)
|
||||
}
|
||||
if t < 1000000000000 {
|
||||
return fmt.Sprintf("%.1fhour", float64(t)/1000000000)
|
||||
}
|
||||
return fmt.Sprintf("%.1D", float64(t)/1000000000000)
|
||||
}
|
||||
|
||||
func FormatByte(b int64) string {
|
||||
const (
|
||||
KB = 1000
|
||||
MB = KB * 1000
|
||||
GB = MB * 1000
|
||||
TB = GB * 1000
|
||||
PB = TB * 1000
|
||||
)
|
||||
|
||||
switch {
|
||||
case b < KB:
|
||||
return fmt.Sprintf("%dB", b)
|
||||
case b < MB:
|
||||
return fmt.Sprintf("%.1fKB", float64(b)/KB)
|
||||
case b < GB:
|
||||
return fmt.Sprintf("%.1fMB", float64(b)/MB)
|
||||
case b < TB:
|
||||
return fmt.Sprintf("%.1fGB", float64(b)/GB)
|
||||
case b < PB:
|
||||
return fmt.Sprintf("%.1fTB", float64(b)/TB)
|
||||
default:
|
||||
return fmt.Sprintf("%.1fPB", float64(b)/PB)
|
||||
}
|
||||
}
|
||||
@@ -29,6 +29,8 @@ func FmtIntFromInterface(val interface{}) int64 {
|
||||
return int64(ret)
|
||||
case int:
|
||||
return int64(ret)
|
||||
case float64:
|
||||
return int64(ret)
|
||||
default:
|
||||
return 0
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package monitor
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/APIParkLab/APIPark/module/monitor"
|
||||
@@ -17,6 +18,66 @@ type imlMonitorStatisticController struct {
|
||||
module monitor.IMonitorStatisticModule `autowired:""`
|
||||
}
|
||||
|
||||
func (i *imlMonitorStatisticController) ChartRestOverview(ctx *gin.Context, start string, end string) (*monitor_dto.ChartRestOverview, error) {
|
||||
s, e, err := formatTime(start, end)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return i.module.RestChartOverview(ctx, "", s, e)
|
||||
}
|
||||
|
||||
func (i *imlMonitorStatisticController) ChartAIOverview(ctx *gin.Context, start string, end string) (*monitor_dto.ChartAIOverview, error) {
|
||||
s, e, err := formatTime(start, end)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return i.module.AIChartOverview(ctx, "", s, e)
|
||||
}
|
||||
|
||||
func (i *imlMonitorStatisticController) AITopN(ctx *gin.Context, start string, end string, limit string) ([]*monitor_dto.TopN, []*monitor_dto.TopN, error) {
|
||||
s, e, err := formatTime(start, end)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
l, err := strconv.Atoi(limit)
|
||||
if err != nil {
|
||||
if limit == "" {
|
||||
l = 10
|
||||
} else {
|
||||
return nil, nil, fmt.Errorf("parse limit %s error: %w", limit, err)
|
||||
}
|
||||
}
|
||||
return i.module.Top(ctx, "", s, e, l, "ai")
|
||||
}
|
||||
|
||||
func formatTime(start string, end string) (int64, int64, error) {
|
||||
s, err := strconv.ParseInt(start, 10, 64)
|
||||
if err != nil {
|
||||
return 0, 0, fmt.Errorf("parse start time %s error: %w", start, err)
|
||||
}
|
||||
e, err := strconv.ParseInt(end, 10, 64)
|
||||
if err != nil {
|
||||
return 0, 0, fmt.Errorf("parse end time %s error: %w", end, err)
|
||||
}
|
||||
return s, e, nil
|
||||
}
|
||||
|
||||
func (i *imlMonitorStatisticController) RestTopN(ctx *gin.Context, start string, end string, limit string) ([]*monitor_dto.TopN, []*monitor_dto.TopN, error) {
|
||||
s, e, err := formatTime(start, end)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
l, err := strconv.Atoi(limit)
|
||||
if err != nil {
|
||||
if limit == "" {
|
||||
l = 10
|
||||
} else {
|
||||
return nil, nil, fmt.Errorf("parse limit %s error: %w", limit, err)
|
||||
}
|
||||
}
|
||||
return i.module.Top(ctx, "", s, e, l, "rest")
|
||||
}
|
||||
|
||||
func (i *imlMonitorStatisticController) Statistics(ctx *gin.Context, dataType string, input *monitor_dto.StatisticInput) (interface{}, error) {
|
||||
switch dataType {
|
||||
case monitor_dto.DataTypeApi:
|
||||
|
||||
@@ -22,6 +22,11 @@ type IMonitorStatisticController interface {
|
||||
|
||||
InvokeTrendInner(ctx *gin.Context, dataType string, typ string, api string, provider string, subscriber string, input *monitor_dto.CommonInput) (*monitor_dto.MonInvokeCountTrend, string, error)
|
||||
StatisticsInner(ctx *gin.Context, dataType string, typ string, id string, input *monitor_dto.StatisticInput) (interface{}, error)
|
||||
|
||||
ChartRestOverview(ctx *gin.Context, start string, end string) (*monitor_dto.ChartRestOverview, error)
|
||||
ChartAIOverview(ctx *gin.Context, start string, end string) (*monitor_dto.ChartAIOverview, error)
|
||||
AITopN(ctx *gin.Context, start string, end string, limit string) ([]*monitor_dto.TopN, []*monitor_dto.TopN, error)
|
||||
RestTopN(ctx *gin.Context, start string, end string, limit string) ([]*monitor_dto.TopN, []*monitor_dto.TopN, error)
|
||||
}
|
||||
|
||||
type IMonitorConfigController interface {
|
||||
|
||||
+208
-18
@@ -5,9 +5,13 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/APIParkLab/APIPark/module/monitor"
|
||||
monitor_dto "github.com/APIParkLab/APIPark/module/monitor/dto"
|
||||
|
||||
ai_provider_local "github.com/APIParkLab/APIPark/ai-provider/local"
|
||||
|
||||
subscribe_dto "github.com/APIParkLab/APIPark/module/subscribe/dto"
|
||||
@@ -55,10 +59,6 @@ import (
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
//var (
|
||||
// ollamaConfig = "{\n \"mirostat\": 0,\n \"mirostat_eta\": 0.1,\n \"mirostat_tau\": 5.0,\n \"num_ctx\": 4096,\n \"repeat_last_n\":64,\n \"repeat_penalty\": 1.1,\n \"temperature\": 0.7,\n \"seed\": 42,\n \"num_predict\": 42,\n \"top_k\": 40,\n \"top_p\": 0.9,\n \"min_p\": 0.5\n}\n"
|
||||
//)
|
||||
|
||||
var (
|
||||
_ IServiceController = (*imlServiceController)(nil)
|
||||
|
||||
@@ -66,20 +66,210 @@ var (
|
||||
)
|
||||
|
||||
type imlServiceController struct {
|
||||
module service.IServiceModule `autowired:""`
|
||||
docModule service.IServiceDocModule `autowired:""`
|
||||
subscribeModule subscribe.ISubscribeModule `autowired:""`
|
||||
aiAPIModule ai_api.IAPIModule `autowired:""`
|
||||
routerModule router.IRouterModule `autowired:""`
|
||||
apiDocModule api_doc.IAPIDocModule `autowired:""`
|
||||
providerModule ai.IProviderModule `autowired:""`
|
||||
aiLocalModel ai_local.ILocalModelModule `autowired:""`
|
||||
appModule service.IAppModule `autowired:""`
|
||||
upstreamModule upstream.IUpstreamModule `autowired:""`
|
||||
settingModule system.ISettingModule `autowired:""`
|
||||
teamModule team.ITeamModule `autowired:""`
|
||||
catalogueModule catalogue.ICatalogueModule `autowired:""`
|
||||
transaction store.ITransaction `autowired:""`
|
||||
module service.IServiceModule `autowired:""`
|
||||
docModule service.IServiceDocModule `autowired:""`
|
||||
subscribeModule subscribe.ISubscribeModule `autowired:""`
|
||||
aiAPIModule ai_api.IAPIModule `autowired:""`
|
||||
routerModule router.IRouterModule `autowired:""`
|
||||
apiDocModule api_doc.IAPIDocModule `autowired:""`
|
||||
providerModule ai.IProviderModule `autowired:""`
|
||||
aiLocalModel ai_local.ILocalModelModule `autowired:""`
|
||||
appModule service.IAppModule `autowired:""`
|
||||
upstreamModule upstream.IUpstreamModule `autowired:""`
|
||||
settingModule system.ISettingModule `autowired:""`
|
||||
teamModule team.ITeamModule `autowired:""`
|
||||
catalogueModule catalogue.ICatalogueModule `autowired:""`
|
||||
monitorModule monitor.IMonitorStatisticModule `autowired:""`
|
||||
monitorConfigModule monitor.IMonitorConfigModule `autowired:""`
|
||||
transaction store.ITransaction `autowired:""`
|
||||
}
|
||||
|
||||
func (i *imlServiceController) RestLogInfo(ctx *gin.Context, serviceId string, logId string) (*service_dto.RestLogInfo, error) {
|
||||
return i.module.RestLogInfo(ctx, serviceId, logId)
|
||||
}
|
||||
|
||||
func (i *imlServiceController) AILogInfo(ctx *gin.Context, serviceId string, logId string) (*service_dto.AILogInfo, error) {
|
||||
return i.module.AILogInfo(ctx, serviceId, logId)
|
||||
}
|
||||
|
||||
func (i *imlServiceController) AILogs(ctx *gin.Context, serviceId string, start string, end string, page string, size string) ([]*service_dto.AILogItem, int64, error) {
|
||||
s, e, err := formatTime(start, end)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
if serviceId == "" {
|
||||
return nil, 0, fmt.Errorf("service id is empty")
|
||||
}
|
||||
if page == "" {
|
||||
page = "1"
|
||||
}
|
||||
if size == "" {
|
||||
size = "20"
|
||||
}
|
||||
p, err := strconv.Atoi(page)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
ps, err := strconv.Atoi(size)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
return i.module.AILogs(ctx, serviceId, s, e, p, ps)
|
||||
}
|
||||
|
||||
func (i *imlServiceController) RestLogs(ctx *gin.Context, serviceId string, start string, end string, page string, size string) ([]*service_dto.RestLogItem, int64, error) {
|
||||
s, e, err := formatTime(start, end)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
if serviceId == "" {
|
||||
return nil, 0, fmt.Errorf("service id is empty")
|
||||
}
|
||||
if page == "" {
|
||||
page = "1"
|
||||
}
|
||||
if size == "" {
|
||||
size = "20"
|
||||
}
|
||||
p, err := strconv.Atoi(page)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
ps, err := strconv.Atoi(size)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
return i.module.RestLogs(ctx, serviceId, s, e, p, ps)
|
||||
}
|
||||
|
||||
func (i *imlServiceController) ServiceOverview(ctx *gin.Context, serviceId string) (*service_dto.Overview, error) {
|
||||
o, err := i.module.ServiceOverview(ctx, serviceId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cfg, err := i.monitorConfigModule.GetMonitorConfig(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(cfg.Config) < 1 {
|
||||
return o, nil
|
||||
}
|
||||
statistics, err := i.monitorModule.ProviderStatistics(ctx, &monitor_dto.StatisticInput{
|
||||
Services: []string{serviceId},
|
||||
CommonInput: &monitor_dto.CommonInput{
|
||||
Start: time.Now().Add(-24 * 30 * time.Hour).Unix(),
|
||||
End: time.Now().Unix(),
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(statistics) < 1 {
|
||||
return o, nil
|
||||
}
|
||||
o.InvokeNum = statistics[0].RequestTotal
|
||||
return o, nil
|
||||
}
|
||||
|
||||
func (i *imlServiceController) AIChartOverview(ctx *gin.Context, serviceId string, start string, end string) (*monitor_dto.ServiceChartAIOverview, error) {
|
||||
s, e, err := formatTime(start, end)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if serviceId == "" {
|
||||
return nil, fmt.Errorf("service is required")
|
||||
}
|
||||
so, err := i.module.ServiceOverview(ctx, serviceId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result := &monitor_dto.ServiceChartAIOverview{
|
||||
EnableMCP: so.EnableMCP,
|
||||
SubscriberNum: so.SubscriberNum,
|
||||
APINum: so.APINum,
|
||||
ServiceKind: so.ServiceKind,
|
||||
}
|
||||
cfg, err := i.monitorConfigModule.GetMonitorConfig(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(cfg.Config) < 1 {
|
||||
return result, nil
|
||||
}
|
||||
|
||||
o, err := i.monitorModule.AIChartOverview(ctx, serviceId, s, e)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result.AvailableMonitor = true
|
||||
result.ChartAIOverview = o
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (i *imlServiceController) RestChartOverview(ctx *gin.Context, serviceId string, start string, end string) (*monitor_dto.ServiceChartRestOverview, error) {
|
||||
s, e, err := formatTime(start, end)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if serviceId == "" {
|
||||
return nil, fmt.Errorf("service is required")
|
||||
}
|
||||
so, err := i.module.ServiceOverview(ctx, serviceId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result := &monitor_dto.ServiceChartRestOverview{
|
||||
EnableMCP: so.EnableMCP,
|
||||
SubscriberNum: so.SubscriberNum,
|
||||
APINum: so.APINum,
|
||||
ServiceKind: so.ServiceKind,
|
||||
}
|
||||
cfg, err := i.monitorConfigModule.GetMonitorConfig(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(cfg.Config) < 1 {
|
||||
return result, nil
|
||||
}
|
||||
o, err := i.monitorModule.RestChartOverview(ctx, serviceId, s, e)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result.AvailableMonitor = true
|
||||
result.ChartRestOverview = o
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func formatTime(start string, end string) (int64, int64, error) {
|
||||
s, err := strconv.ParseInt(start, 10, 64)
|
||||
if err != nil {
|
||||
return 0, 0, fmt.Errorf("parse start time %s error: %w", start, err)
|
||||
}
|
||||
e, err := strconv.ParseInt(end, 10, 64)
|
||||
if err != nil {
|
||||
return 0, 0, fmt.Errorf("parse end time %s error: %w", end, err)
|
||||
}
|
||||
return s, e, nil
|
||||
}
|
||||
|
||||
func (i *imlServiceController) Top10(ctx *gin.Context, serviceId string, start string, end string) ([]*monitor_dto.TopN, []*monitor_dto.TopN, error) {
|
||||
if serviceId == "" {
|
||||
return nil, nil, fmt.Errorf("serviceId is required")
|
||||
}
|
||||
info, err := i.module.Get(ctx, serviceId)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
s, e, err := formatTime(start, end)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return i.monitorModule.Top(ctx, serviceId, s, e, 10, info.ServiceKind)
|
||||
}
|
||||
|
||||
func (i *imlServiceController) QuickCreateAIService(ctx *gin.Context, input *service_dto.QuickCreateAIService) error {
|
||||
|
||||
@@ -3,6 +3,8 @@ package service
|
||||
import (
|
||||
"reflect"
|
||||
|
||||
monitor_dto "github.com/APIParkLab/APIPark/module/monitor/dto"
|
||||
|
||||
service_dto "github.com/APIParkLab/APIPark/module/service/dto"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
@@ -32,6 +34,19 @@ type IServiceController interface {
|
||||
|
||||
Swagger(ctx *gin.Context)
|
||||
ExportSwagger(ctx *gin.Context)
|
||||
|
||||
Top10(ctx *gin.Context, serviceId string, start string, end string) ([]*monitor_dto.TopN, []*monitor_dto.TopN, error)
|
||||
|
||||
AIChartOverview(ctx *gin.Context, serviceId string, start string, end string) (*monitor_dto.ServiceChartAIOverview, error)
|
||||
RestChartOverview(ctx *gin.Context, serviceId string, start string, end string) (*monitor_dto.ServiceChartRestOverview, error)
|
||||
|
||||
ServiceOverview(ctx *gin.Context, serviceId string) (*service_dto.Overview, error)
|
||||
|
||||
AILogs(ctx *gin.Context, serviceId string, start string, end string, page string, size string) ([]*service_dto.AILogItem, int64, error)
|
||||
|
||||
RestLogs(ctx *gin.Context, serviceId string, start string, end string, page string, size string) ([]*service_dto.RestLogItem, int64, error)
|
||||
RestLogInfo(ctx *gin.Context, serviceId string, logId string) (*service_dto.RestLogInfo, error)
|
||||
AILogInfo(ctx *gin.Context, serviceId string, logId string) (*service_dto.AILogInfo, error)
|
||||
}
|
||||
|
||||
type IAppController interface {
|
||||
|
||||
@@ -7,7 +7,7 @@ toolchain go1.23.6
|
||||
require (
|
||||
github.com/eolinker/ap-account v1.0.15
|
||||
github.com/eolinker/eosc v0.18.3
|
||||
github.com/eolinker/go-common v1.1.6
|
||||
github.com/eolinker/go-common v1.1.7
|
||||
github.com/gabriel-vasile/mimetype v1.4.4
|
||||
github.com/getkin/kin-openapi v0.127.0
|
||||
github.com/gin-contrib/gzip v1.0.1
|
||||
|
||||
@@ -32,8 +32,8 @@ github.com/eolinker/ap-account v1.0.15 h1:n6DJeL6RHZ8eLlZUcY2U3H4d/GPaA5oelAx3R0
|
||||
github.com/eolinker/ap-account v1.0.15/go.mod h1:zm/Ivs6waJ/M/nEszhpPmM6g50y/MKO+5eABFAdeD0g=
|
||||
github.com/eolinker/eosc v0.18.3 h1:3IK5HkAPnJRfLbQ0FR7kWsZr6Y/OiqqGazvN1q2BL5A=
|
||||
github.com/eolinker/eosc v0.18.3/go.mod h1:O9PQQXFCpB6fjHf+oFt/LN6EOAv779ItbMixMKCfTfk=
|
||||
github.com/eolinker/go-common v1.1.6 h1:s+NaQL0InjX/MwWY53+8y8qzAgsULIUc4U6nWXWQ2Nw=
|
||||
github.com/eolinker/go-common v1.1.6/go.mod h1:Kb/jENMN1mApnodvRgV4YwO9FJby1Jkt2EUjrBjvSX4=
|
||||
github.com/eolinker/go-common v1.1.7 h1:bi7wDmlCYQGjS3k8Bz/o+Mo9aMJAzmPsBLXWurxPfwk=
|
||||
github.com/eolinker/go-common v1.1.7/go.mod h1:Kb/jENMN1mApnodvRgV4YwO9FJby1Jkt2EUjrBjvSX4=
|
||||
github.com/gabriel-vasile/mimetype v1.4.4 h1:QjV6pZ7/XZ7ryI2KuyeEDE8wnh7fHP9YnQy+R0LnH8I=
|
||||
github.com/gabriel-vasile/mimetype v1.4.4/go.mod h1:JwLei5XPtWdGiMFB5Pjle1oEeoSeEuJfJE+TtfvdB/s=
|
||||
github.com/getkin/kin-openapi v0.127.0 h1:Mghqi3Dhryf3F8vR370nN67pAERW+3a95vomb3MAREY=
|
||||
|
||||
@@ -9,7 +9,8 @@ import (
|
||||
type ILogDriver interface {
|
||||
LogInfo(clusterId string, id string) (*LogInfo, error)
|
||||
LogCount(clusterId string, conditions map[string]string, spendHour int64, group string) (map[string]int64, error)
|
||||
Logs(clusterId string, conditions map[string]string, start time.Time, end time.Time, limit int64, offset int64) ([]*Log, int64, error)
|
||||
Logs(clusterId string, conditions map[string]string, start time.Time, end time.Time, limit int64, offset int64) ([]*LogItem, int64, error)
|
||||
LogRecords(clusterId string, start time.Time, end time.Time) ([]*LogItem, error)
|
||||
}
|
||||
|
||||
var (
|
||||
|
||||
+14
-2
@@ -4,22 +4,34 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
type Log struct {
|
||||
type LogItem struct {
|
||||
ID string
|
||||
Strategy string
|
||||
Service string
|
||||
API string
|
||||
Method string
|
||||
Url string
|
||||
RemoteIP string
|
||||
Consumer string
|
||||
Authorization string
|
||||
InputToken int64
|
||||
OutputToken int64
|
||||
TotalToken int64
|
||||
AIProvider string
|
||||
AIModel string
|
||||
StatusCode int64
|
||||
ResponseTime int64
|
||||
Traffic int64
|
||||
RecordTime time.Time
|
||||
}
|
||||
|
||||
type LogInfo struct {
|
||||
ID string
|
||||
*LogItem
|
||||
ContentType string
|
||||
RequestBody string
|
||||
ProxyBody string
|
||||
ProxyResponseBody string
|
||||
ResponseBody string
|
||||
RequestHeader string
|
||||
ResponseHeader string
|
||||
}
|
||||
|
||||
+34
-19
@@ -49,29 +49,44 @@ type LogCount struct {
|
||||
}
|
||||
|
||||
type LogInfo struct {
|
||||
Stream *LogDetail `json:"stream"`
|
||||
Stream *LogDetail `json:"stream"`
|
||||
Values []interface{} `json:"values"`
|
||||
}
|
||||
|
||||
type LogDetail struct {
|
||||
Api string `json:"api"`
|
||||
Application string `json:"application"`
|
||||
Strategy string `json:"strategy"`
|
||||
ContentType string `json:"content_type"`
|
||||
Cluster string `json:"cluster"`
|
||||
Msec string `json:"msec"`
|
||||
Node string `json:"node"`
|
||||
RequestId string `json:"request_id"`
|
||||
RequestMethod string `json:"request_method"`
|
||||
RequestScheme string `json:"request_scheme"`
|
||||
RequestTime string `json:"request_time"`
|
||||
RequestUri string `json:"request_uri"`
|
||||
type LogBodyDetail struct {
|
||||
RequestBody string `json:"request_body"`
|
||||
ProxyBody string `json:"proxy_body"`
|
||||
ResponseBody string `json:"response_body"`
|
||||
ProxyResponseBody string `json:"proxy_response_body"`
|
||||
Service string `json:"service"`
|
||||
Provider string `json:"provider"`
|
||||
Authorization string `json:"authorization"`
|
||||
SrcIp string `json:"src_ip"`
|
||||
Status string `json:"status"`
|
||||
}
|
||||
|
||||
type LogDetail struct {
|
||||
Api string `json:"api"`
|
||||
Application string `json:"application"`
|
||||
Strategy string `json:"strategy"`
|
||||
ContentType string `json:"content_type"`
|
||||
Cluster string `json:"cluster"`
|
||||
Msec string `json:"msec"`
|
||||
Node string `json:"node"`
|
||||
RequestId string `json:"request_id"`
|
||||
RequestMethod string `json:"request_method"`
|
||||
RequestScheme string `json:"request_scheme"`
|
||||
RequestHeader string `json:"request_header"`
|
||||
RequestTime string `json:"request_time"`
|
||||
RequestUri string `json:"request_uri"`
|
||||
RequestBody string `json:"request_body"`
|
||||
ProxyBody string `json:"proxy_body"`
|
||||
ResponseBody string `json:"response_body"`
|
||||
ResponseHeader string `json:"response_header"`
|
||||
ProxyResponseBody string `json:"proxy_response_body"`
|
||||
Service string `json:"service"`
|
||||
Provider string `json:"provider"`
|
||||
Authorization string `json:"authorization"`
|
||||
SrcIp string `json:"src_ip"`
|
||||
Status string `json:"status"`
|
||||
AIProvider string `json:"ai_provider"`
|
||||
AIModel string `json:"ai_model"`
|
||||
AIModelInputToken interface{} `json:"ai_model_input_token"`
|
||||
AIModelOutputToken interface{} `json:"ai_model_output_token"`
|
||||
AIModelTotalToken interface{} `json:"ai_model_total_token"`
|
||||
}
|
||||
|
||||
+97
-21
@@ -81,13 +81,39 @@ func (d *Driver) LogInfo(clusterId string, id string) (*log_driver.LogInfo, erro
|
||||
return nil, fmt.Errorf("no log found")
|
||||
}
|
||||
stream := list[0].Stream
|
||||
requestBody := stream.RequestBody
|
||||
proxyRequestBody := stream.ProxyBody
|
||||
proxyResponseBody := stream.ProxyResponseBody
|
||||
responseBody := stream.ResponseBody
|
||||
if len(list[0].Values) > 0 {
|
||||
switch t := list[0].Values[0].(type) {
|
||||
case []interface{}:
|
||||
if len(t) > 1 {
|
||||
v, ok := t[1].(string)
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
var tmp LogBodyDetail
|
||||
err = json.Unmarshal([]byte(v), &tmp)
|
||||
if err == nil {
|
||||
requestBody = tmp.RequestBody
|
||||
proxyRequestBody = tmp.ProxyBody
|
||||
responseBody = tmp.ResponseBody
|
||||
proxyResponseBody = tmp.ProxyBody
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
msec, _ := strconv.ParseInt(stream.Msec, 10, 64)
|
||||
return &log_driver.LogInfo{
|
||||
ID: stream.RequestId,
|
||||
LogItem: ToLogItem(stream, msec),
|
||||
ContentType: stream.ContentType,
|
||||
RequestBody: stream.RequestBody,
|
||||
ProxyBody: stream.ProxyBody,
|
||||
ProxyResponseBody: stream.ProxyResponseBody,
|
||||
ResponseBody: stream.ResponseBody,
|
||||
RequestBody: requestBody,
|
||||
ProxyBody: proxyRequestBody,
|
||||
ProxyResponseBody: proxyResponseBody,
|
||||
ResponseBody: responseBody,
|
||||
RequestHeader: stream.RequestHeader,
|
||||
ResponseHeader: stream.ResponseHeader,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -132,7 +158,25 @@ func (d *Driver) LogCount(clusterId string, conditions map[string]string, spendH
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (d *Driver) Logs(clusterId string, conditions map[string]string, start time.Time, end time.Time, limit int64, offset int64) ([]*log_driver.Log, int64, error) {
|
||||
func (d *Driver) LogRecords(clusterId string, start time.Time, end time.Time) ([]*log_driver.LogItem, error) {
|
||||
if start.After(end) {
|
||||
return nil, fmt.Errorf("start time is greater than end time")
|
||||
}
|
||||
queries := url.Values{}
|
||||
queries.Set("query", fmt.Sprintf("{cluster=\"%s\"} | json", clusterId))
|
||||
queries.Set("direction", "backward")
|
||||
queries.Set("start", strconv.FormatInt(start.UnixNano(), 10))
|
||||
queries.Set("end", strconv.FormatInt(end.UnixNano(), 10))
|
||||
log.Debug("query is ", queries.Get("query"))
|
||||
logs, err := d.recuseLogs(queries, end, 1)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return logs, nil
|
||||
}
|
||||
|
||||
func (d *Driver) Logs(clusterId string, conditions map[string]string, start time.Time, end time.Time, limit int64, offset int64) ([]*log_driver.LogItem, int64, error) {
|
||||
if start.After(end) {
|
||||
return nil, 0, fmt.Errorf("start time is greater than end time")
|
||||
}
|
||||
@@ -177,7 +221,30 @@ func (d *Driver) Logs(clusterId string, conditions map[string]string, start time
|
||||
return logs, count, nil
|
||||
}
|
||||
|
||||
func (d *Driver) recuseLogs(queries url.Values, end time.Time, offset int64) ([]*log_driver.Log, error) {
|
||||
func ToLogItem(detail *LogDetail, msec int64) *log_driver.LogItem {
|
||||
return &log_driver.LogItem{
|
||||
ID: detail.RequestId,
|
||||
Strategy: detail.Strategy,
|
||||
Service: detail.Provider,
|
||||
API: detail.Api,
|
||||
Method: detail.RequestMethod,
|
||||
Url: detail.RequestUri,
|
||||
RemoteIP: detail.SrcIp,
|
||||
Consumer: detail.Application,
|
||||
Authorization: detail.Authorization,
|
||||
InputToken: parseToInt64(detail.AIModelInputToken),
|
||||
OutputToken: parseToInt64(detail.AIModelOutputToken),
|
||||
TotalToken: parseToInt64(detail.AIModelTotalToken),
|
||||
AIProvider: detail.AIProvider,
|
||||
AIModel: detail.AIModel,
|
||||
StatusCode: parseToInt64(detail.Status),
|
||||
ResponseTime: parseToInt64(detail.RequestTime),
|
||||
Traffic: int64(len(detail.ResponseBody) + len(detail.RequestBody)),
|
||||
RecordTime: time.UnixMilli(msec),
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Driver) recuseLogs(queries url.Values, end time.Time, offset int64) ([]*log_driver.LogItem, error) {
|
||||
queries.Set("end", strconv.FormatInt(end.UnixNano(), 10))
|
||||
list, err := send[LogInfo](http.MethodGet, fmt.Sprintf("%s/loki/api/v1/query_range", d.url), d.headers, queries, "")
|
||||
if err != nil {
|
||||
@@ -198,24 +265,13 @@ func (d *Driver) recuseLogs(queries url.Values, end time.Time, offset int64) ([]
|
||||
}
|
||||
return d.recuseLogs(queries, time.UnixMilli(msec), offset-1)
|
||||
}
|
||||
logs := make([]*log_driver.Log, 0, len(list))
|
||||
logs := make([]*log_driver.LogItem, 0, len(list))
|
||||
for _, l := range list {
|
||||
if l.Stream == nil {
|
||||
continue
|
||||
}
|
||||
detail := l.Stream
|
||||
msec, _ := strconv.ParseInt(detail.Msec, 10, 64)
|
||||
|
||||
logs = append(logs, &log_driver.Log{
|
||||
ID: detail.RequestId,
|
||||
Service: detail.Provider,
|
||||
Method: detail.RequestMethod,
|
||||
Url: detail.RequestUri,
|
||||
RemoteIP: detail.SrcIp,
|
||||
Consumer: detail.Application,
|
||||
Authorization: detail.Authorization,
|
||||
RecordTime: time.UnixMilli(msec),
|
||||
})
|
||||
msec, _ := strconv.ParseInt(l.Stream.Msec, 10, 64)
|
||||
logs = append(logs, ToLogItem(l.Stream, msec))
|
||||
}
|
||||
sort.Slice(logs, func(i, j int) bool {
|
||||
return logs[i].RecordTime.After(logs[j].RecordTime)
|
||||
@@ -223,6 +279,26 @@ func (d *Driver) recuseLogs(queries url.Values, end time.Time, offset int64) ([]
|
||||
return logs, nil
|
||||
}
|
||||
|
||||
func parseToInt64(v interface{}) int64 {
|
||||
switch t := v.(type) {
|
||||
case int:
|
||||
return int64(t)
|
||||
case int64:
|
||||
return t
|
||||
case string:
|
||||
if v == "" {
|
||||
return 0
|
||||
}
|
||||
i, err := strconv.ParseInt(t, 10, 64)
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
return i
|
||||
default:
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Driver) logCount(clusterId string, conditions map[string]string, start time.Time, end time.Time) (int64, error) {
|
||||
// 先查在这段时间内符合条件的日志数量
|
||||
queries := url.Values{}
|
||||
|
||||
@@ -44,12 +44,12 @@ func TestLoki(t *testing.T) {
|
||||
// if err != nil {
|
||||
// t.Fatalf("failed to send request: %v", err)
|
||||
// }
|
||||
// t.Log(time.Now().Sub(a))
|
||||
// t.LogItem(time.Now().Sub(a))
|
||||
// data, err := json.Marshal(result)
|
||||
// if err != nil {
|
||||
// t.Fatalf("failed to marshal data: %v", err)
|
||||
// }
|
||||
// t.Log(string(data))
|
||||
// t.LogItem(string(data))
|
||||
//}
|
||||
//
|
||||
//func TestLokiLogCount(t *testing.T) {
|
||||
@@ -67,7 +67,7 @@ func TestLoki(t *testing.T) {
|
||||
// if err != nil {
|
||||
// t.Fatalf("failed to marshal data: %v", err)
|
||||
// }
|
||||
// t.Log(string(data))
|
||||
// t.LogItem(string(data))
|
||||
//}
|
||||
//
|
||||
//func TestLokiLogs(t *testing.T) {
|
||||
@@ -83,7 +83,7 @@ func TestLoki(t *testing.T) {
|
||||
// queries.Set("limit", "1")
|
||||
// now = time.Now()
|
||||
// result, err := send[map[string]interface{}](http.MethodGet, "http://localhost:3100/loki/api/v1/query_range", headers, queries, "")
|
||||
// t.Log(time.Now().Sub(now))
|
||||
// t.LogItem(time.Now().Sub(now))
|
||||
// if err != nil {
|
||||
// t.Fatalf("failed to send request: %v", err)
|
||||
// }
|
||||
@@ -91,5 +91,5 @@ func TestLoki(t *testing.T) {
|
||||
// if err != nil {
|
||||
// t.Fatalf("failed to marshal data: %v", err)
|
||||
// }
|
||||
// t.Log(string(data))
|
||||
// t.LogItem(string(data))
|
||||
//}
|
||||
|
||||
+1
-1
@@ -124,7 +124,7 @@ func (t *Tool) RegisterMCP(s *server.MCPServer) {
|
||||
}
|
||||
apikey := utils.Label(ctx, "apikey")
|
||||
if apikey != "" {
|
||||
req.Header.Set("Authorization", utils.Md5(apikey))
|
||||
req.Header.Set("Authorization", apikey)
|
||||
}
|
||||
|
||||
resp, err := client.Do(req)
|
||||
|
||||
+2
-1
@@ -718,7 +718,8 @@ func (i *imlProviderModule) getAiProviders(ctx context.Context) ([]*gateway.Dyna
|
||||
}
|
||||
model, has := driver.GetModel(l.DefaultLLM)
|
||||
if !has {
|
||||
return nil, fmt.Errorf("model not found: %s", l.DefaultLLM)
|
||||
continue
|
||||
//return nil, fmt.Errorf("model not found: %s", l.DefaultLLM)
|
||||
}
|
||||
cfg := make(map[string]interface{})
|
||||
cfg["provider"] = l.Id
|
||||
|
||||
+179
-4
@@ -4,9 +4,14 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/eolinker/go-common/server"
|
||||
|
||||
log_driver "github.com/APIParkLab/APIPark/log-driver"
|
||||
"github.com/eolinker/go-common/register"
|
||||
"github.com/eolinker/go-common/utils"
|
||||
|
||||
"github.com/APIParkLab/APIPark/gateway"
|
||||
|
||||
@@ -16,11 +21,11 @@ import (
|
||||
|
||||
"github.com/APIParkLab/APIPark/service/cluster"
|
||||
|
||||
"github.com/eolinker/go-common/auto"
|
||||
|
||||
log_dto "github.com/APIParkLab/APIPark/module/log/dto"
|
||||
"github.com/APIParkLab/APIPark/service/log"
|
||||
eosc_log "github.com/eolinker/eosc/log"
|
||||
log_print "github.com/eolinker/eosc/log"
|
||||
"github.com/eolinker/go-common/auto"
|
||||
)
|
||||
|
||||
var _ ILogModule = (*imlLogModule)(nil)
|
||||
@@ -28,7 +33,10 @@ var _ ILogModule = (*imlLogModule)(nil)
|
||||
type imlLogModule struct {
|
||||
service log.ILogService `autowired:""`
|
||||
clusterService cluster.IClusterService `autowired:""`
|
||||
transaction store.ITransaction `autowired:""`
|
||||
|
||||
transaction store.ITransaction `autowired:""`
|
||||
//scheduleCtx context.Context
|
||||
scheduleCancel context.CancelFunc
|
||||
}
|
||||
|
||||
var labels = map[string]string{
|
||||
@@ -54,6 +62,7 @@ var logFormatter = map[string]interface{}{
|
||||
"$proxy_host",
|
||||
"$proxy_header",
|
||||
"$proxy_addr",
|
||||
"$response_header",
|
||||
"$response_headers",
|
||||
"$status",
|
||||
"$content_type",
|
||||
@@ -70,6 +79,11 @@ var logFormatter = map[string]interface{}{
|
||||
"$authorization",
|
||||
"$response_body",
|
||||
"$proxy_response_body",
|
||||
"$ai_provider",
|
||||
"$ai_model",
|
||||
"$ai_model_input_token",
|
||||
"$ai_model_output_token",
|
||||
"$ai_model_total_token",
|
||||
},
|
||||
}
|
||||
|
||||
@@ -135,6 +149,11 @@ func (i *imlLogModule) Save(ctx context.Context, driver string, input *log_dto.S
|
||||
return err
|
||||
}
|
||||
log_driver.SetDriver(driver, d)
|
||||
newCtx, cancel := context.WithCancel(context.Background())
|
||||
newCtx = utils.SetUserId(newCtx, "admin")
|
||||
i.scheduleCancel()
|
||||
i.scheduleCancel = cancel
|
||||
i.scheduleUpdateLogRecord(newCtx)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
@@ -164,8 +183,15 @@ func (i *imlLogModule) Get(ctx context.Context, driver string) (*log_dto.LogSour
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (i *imlLogModule) OnComplete() {
|
||||
func (i *imlLogModule) OnInit() {
|
||||
register.Handle(func(v server.Server) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
ctx = utils.SetUserId(ctx, "admin")
|
||||
//i.scheduleCtx = ctx
|
||||
i.scheduleCancel = cancel
|
||||
i.scheduleUpdateLogRecord(ctx)
|
||||
|
||||
})
|
||||
}
|
||||
|
||||
func (i *imlLogModule) initGateway(ctx context.Context, clusterId string, clientDriver gateway.IClientDriver) error {
|
||||
@@ -222,3 +248,152 @@ func (i *imlLogModule) initGateway(ctx context.Context, clusterId string, client
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
const (
|
||||
oneSecond = 1
|
||||
oneMinute = 60
|
||||
oneHour = 60 * oneMinute
|
||||
oneDay = 24 * oneHour
|
||||
)
|
||||
|
||||
// 定时更新历史记录
|
||||
func (i *imlLogModule) scheduleUpdateLogRecord(ctx context.Context) {
|
||||
driver, has := log_driver.GetDriver("loki")
|
||||
if !has {
|
||||
eosc_log.Error("driver loki not found")
|
||||
return
|
||||
}
|
||||
info, err := i.service.GetLogSource(ctx, "loki")
|
||||
if err != nil {
|
||||
if !errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
eosc_log.Errorf("get log source loki error: %s", err)
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
now := time.Now()
|
||||
before90Days := now.Add(-7 * 24 * time.Hour)
|
||||
beginTime := before90Days
|
||||
if info.LastPullTime.After(before90Days) {
|
||||
before90Days = info.LastPullTime
|
||||
}
|
||||
pauseTime := now
|
||||
historyFinish := false
|
||||
go func() {
|
||||
eosc_log.Infof("start update history log record,start time: %s", beginTime.Format("2006-01-02 15:04:05"))
|
||||
ticket := time.NewTicker(1 * time.Minute)
|
||||
defer ticket.Stop()
|
||||
for {
|
||||
now = time.Now()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticket.C:
|
||||
switch {
|
||||
case now.Sub(beginTime) > oneDay:
|
||||
endTime := beginTime.Add(oneDay)
|
||||
err = i.updateLogRecord(ctx, driver, beginTime, endTime)
|
||||
if err != nil {
|
||||
eosc_log.Errorf("update log record error: %s", err)
|
||||
continue
|
||||
}
|
||||
err = i.service.UpdateLogSource(ctx, "loki", &log.Save{
|
||||
LastPullTime: &endTime,
|
||||
})
|
||||
if err != nil {
|
||||
eosc_log.Errorf("update log source error: %s", err)
|
||||
continue
|
||||
}
|
||||
beginTime = endTime
|
||||
case now.Sub(pauseTime) <= oneDay:
|
||||
endTime := pauseTime
|
||||
err = i.updateLogRecord(ctx, driver, beginTime, endTime)
|
||||
if err != nil {
|
||||
eosc_log.Errorf("update log record error: %s", err)
|
||||
historyFinish = true
|
||||
return
|
||||
}
|
||||
historyFinish = true
|
||||
err = i.service.UpdateLogSource(ctx, "loki", &log.Save{
|
||||
LastPullTime: &endTime,
|
||||
})
|
||||
if err != nil {
|
||||
eosc_log.Errorf("update log source error: %s", err)
|
||||
return
|
||||
}
|
||||
eosc_log.Infof("update log record finish")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
eosc_log.Infof("start update running log record,start time: %s", pauseTime.Format("2006-01-02 15:04:05"))
|
||||
ticket := time.NewTicker(10 * time.Second)
|
||||
defer ticket.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticket.C:
|
||||
end := time.Now()
|
||||
start := end.Add(-1 * time.Minute)
|
||||
err = i.updateLogRecord(ctx, driver, start, end)
|
||||
if err != nil {
|
||||
eosc_log.Errorf("update log record error: %s", err)
|
||||
continue
|
||||
}
|
||||
if historyFinish {
|
||||
err = i.service.UpdateLogSource(ctx, "loki", &log.Save{
|
||||
LastPullTime: &end,
|
||||
})
|
||||
if err != nil {
|
||||
eosc_log.Errorf("update log source error: %s", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
}
|
||||
|
||||
func (i *imlLogModule) updateLogRecord(ctx context.Context, driver log_driver.ILogDriver, start, end time.Time) error {
|
||||
c, err := i.clusterService.Get(ctx, cluster.DefaultClusterID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cluster %s not found", cluster.DefaultClusterID)
|
||||
}
|
||||
logs, err := driver.LogRecords(c.Cluster, start, end)
|
||||
if err != nil {
|
||||
|
||||
return fmt.Errorf("get log records error: %s", err)
|
||||
}
|
||||
for _, l := range logs {
|
||||
err = i.service.InsertLog(ctx, "loki", &log.InsertLog{
|
||||
ID: l.ID,
|
||||
Driver: "loki",
|
||||
Strategy: l.Strategy,
|
||||
API: l.API,
|
||||
Service: l.Service,
|
||||
Method: l.Method,
|
||||
Url: l.Url,
|
||||
RemoteIP: l.RemoteIP,
|
||||
Consumer: l.Consumer,
|
||||
Authorization: l.Authorization,
|
||||
InputToken: l.InputToken,
|
||||
OutputToken: l.OutputToken,
|
||||
TotalToken: l.TotalToken,
|
||||
AIProvider: l.AIProvider,
|
||||
AIModel: l.AIModel,
|
||||
StatusCode: l.StatusCode,
|
||||
ResponseTime: l.ResponseTime,
|
||||
Traffic: l.Traffic,
|
||||
RecordTime: l.RecordTime,
|
||||
})
|
||||
if err != nil {
|
||||
eosc_log.Errorf("insert log record error: %s,log id: %s", err, l.ID)
|
||||
continue
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
+1
-1
@@ -270,7 +270,7 @@ func (i *imlMcpModule) Invoke(ctx context.Context, req mcp.CallToolRequest) (*mc
|
||||
queryParam.Add(k, value)
|
||||
}
|
||||
case float64:
|
||||
queryParam.Add(k, strconv.FormatFloat(v, 'e', -1, 64))
|
||||
queryParam.Add(k, strconv.FormatFloat(v, 'f', -1, 64))
|
||||
default:
|
||||
return nil, fmt.Errorf("invalid query param type: %T", v)
|
||||
}
|
||||
|
||||
@@ -21,4 +21,26 @@ type IExecutor interface {
|
||||
InvokeTrend(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) (*monitor.MonInvokeCountTrend, string, error)
|
||||
ProxyTrend(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) (*monitor.MonInvokeCountTrend, string, error)
|
||||
MessageTrend(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) (*monitor.MonMessageTrend, string, error)
|
||||
|
||||
IBasicOverview
|
||||
|
||||
IRestOverview
|
||||
|
||||
IAIOverview
|
||||
}
|
||||
|
||||
type IBasicOverview interface {
|
||||
RequestOverview(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.StatusCodeOverview, []*monitor.StatusCodeOverview, error)
|
||||
TopN(ctx context.Context, start time.Time, end time.Time, limit int, groupBy string, wheres []monitor.MonWhereItem) ([]*monitor.TopN, error)
|
||||
ConsumerOverview(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) (int64, map[time.Time]int64, error)
|
||||
}
|
||||
|
||||
type IRestOverview interface {
|
||||
TrafficOverviewByStatusCode(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.StatusCodeOverview, []*monitor.StatusCodeOverview, error)
|
||||
AvgResponseTimeOverview(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.Aggregate, []int64, error)
|
||||
SumResponseTimeOverview(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.Aggregate, []int64, error)
|
||||
}
|
||||
|
||||
type IAIOverview interface {
|
||||
TokenOverview(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.TokenOverview, []*monitor.TokenOverview, error)
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package influxdb_v2
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -23,6 +24,9 @@ import (
|
||||
"github.com/APIParkLab/APIPark/service/monitor"
|
||||
)
|
||||
|
||||
var _ driver.IAIOverview = (*executor)(nil)
|
||||
var _ driver.IRestOverview = (*executor)(nil)
|
||||
|
||||
func newExecutor(cfg string, fluxQuery flux.IFluxQuery) (driver.IExecutor, error) {
|
||||
var data InfluxdbV2Config
|
||||
err := json.Unmarshal([]byte(cfg), &data)
|
||||
@@ -147,7 +151,7 @@ func (e *executor) MessageTrend(ctx context.Context, start time.Time, end time.T
|
||||
|
||||
fieldsConditions := []string{"request", "response"}
|
||||
|
||||
dates, groupValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "request", filters, fieldsConditions, every, windowOffset)
|
||||
dates, groupValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "request", filters, fieldsConditions, every, windowOffset, flux.SumFn)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
@@ -166,9 +170,9 @@ func (e *executor) ProxyTrend(ctx context.Context, start time.Time, end time.Tim
|
||||
|
||||
filters := formatFilter(wheres)
|
||||
|
||||
proxyConditions := []string{"p_total", "p_success", "p_s4xx", "p_s5xx"}
|
||||
proxyConditions := []string{"p_total", "p_success", "p_s2xx", "p_s4xx", "p_s5xx"}
|
||||
|
||||
dates, proxyValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "proxy", filters, proxyConditions, every, windowOffset)
|
||||
dates, proxyValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "proxy", filters, proxyConditions, every, windowOffset, flux.SumFn)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
@@ -200,9 +204,9 @@ func (e *executor) InvokeTrend(ctx context.Context, start time.Time, end time.Ti
|
||||
newStartTime, every, windowOffset, bucket := getTimeIntervalAndBucket(start, end)
|
||||
filters := formatFilter(wheres)
|
||||
|
||||
requestConditions := []string{"total", "success", "s4xx", "s5xx"}
|
||||
requestConditions := []string{"total", "success", "2xx", "s4xx", "s5xx"}
|
||||
|
||||
dates, requestValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "request", filters, requestConditions, every, windowOffset)
|
||||
dates, requestValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "request", filters, requestConditions, every, windowOffset, flux.SumFn)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
@@ -221,7 +225,7 @@ func (e *executor) InvokeTrend(ctx context.Context, start time.Time, end time.Ti
|
||||
|
||||
proxyConditions := []string{"p_total", "p_success"}
|
||||
|
||||
_, proxyValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "proxy", filters, proxyConditions, every, windowOffset)
|
||||
_, proxyValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "proxy", filters, proxyConditions, every, windowOffset, flux.SumFn)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
@@ -359,5 +363,326 @@ func (e *executor) CommonStatistics(ctx context.Context, start, end time.Time, g
|
||||
}
|
||||
|
||||
return resultMap, nil
|
||||
}
|
||||
|
||||
func (e *executor) overviewByStatusCode(ctx context.Context, start, end time.Time, table string, wheres []monitor.MonWhereItem, statusCode []string, dataFields []string, fn flux.AggregateFn) ([]time.Time, map[string][]int64, error) {
|
||||
newStartTime, every, windowOffset, bucket := getTimeIntervalAndBucket(start, end)
|
||||
var returnDates []time.Time
|
||||
var returnResult = make(map[string][]int64)
|
||||
for _, s := range statusCode {
|
||||
newWheres := make([]monitor.MonWhereItem, 0, len(wheres)+1)
|
||||
newWheres = append(newWheres, wheres...)
|
||||
newWheres = append(newWheres, monitor.MonWhereItem{
|
||||
Key: "status_code",
|
||||
Operation: "=",
|
||||
Values: []string{s},
|
||||
})
|
||||
dates, result, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, table, formatFilter(newWheres), dataFields, every, windowOffset, fn)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if len(dates) > 0 {
|
||||
returnDates = dates
|
||||
}
|
||||
|
||||
for _, v := range dataFields {
|
||||
key := fmt.Sprintf("%s_%s", s, v)
|
||||
if _, ok := returnResult[key]; !ok {
|
||||
returnResult[key] = make([]int64, 0, len(returnDates))
|
||||
}
|
||||
returnResult[key] = append(returnResult[key], result[v]...)
|
||||
}
|
||||
}
|
||||
|
||||
return returnDates, returnResult, nil
|
||||
}
|
||||
|
||||
func (e *executor) TrafficOverviewByStatusCode(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.StatusCodeOverview, []*monitor.StatusCodeOverview, error) {
|
||||
|
||||
fieldsConditions := []string{"request", "response"}
|
||||
statusFilters := []string{"2xx", "4xx", "5xx"}
|
||||
dates, overview, err := e.overviewByStatusCode(ctx, start, end, "request", wheres, statusFilters, fieldsConditions, flux.SumFn)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
s2xxRequest := overview["2xx_request"]
|
||||
s2xxRequestLen := len(s2xxRequest)
|
||||
s4xxRequest := overview["4xx_request"]
|
||||
s4xxRequestLen := len(s4xxRequest)
|
||||
s5xxRequest := overview["5xx_request"]
|
||||
s5xxRequestLen := len(s5xxRequest)
|
||||
|
||||
s2xxResponse := overview["2xx_response"]
|
||||
s2xxResponseLen := len(s2xxResponse)
|
||||
s4xxResponse := overview["4xx_response"]
|
||||
s4xxResponseLen := len(s4xxResponse)
|
||||
s5xxResponse := overview["5xx_response"]
|
||||
s5xxResponseLen := len(s5xxResponse)
|
||||
|
||||
totalOverview := new(monitor.StatusCodeOverview)
|
||||
result := make([]*monitor.StatusCodeOverview, 0, len(dates))
|
||||
for i := range dates {
|
||||
r := new(monitor.StatusCodeOverview)
|
||||
if s2xxRequestLen > i {
|
||||
r.Status2xx = s2xxRequest[i]
|
||||
}
|
||||
if s4xxRequestLen > i {
|
||||
r.Status4xx = s4xxRequest[i]
|
||||
}
|
||||
if s5xxRequestLen > i {
|
||||
r.Status5xx = s5xxRequest[i]
|
||||
}
|
||||
if s2xxResponseLen > i {
|
||||
r.Status2xx += s2xxResponse[i]
|
||||
}
|
||||
if s4xxResponseLen > i {
|
||||
r.Status4xx += s4xxResponse[i]
|
||||
}
|
||||
if s5xxResponseLen > i {
|
||||
r.Status5xx += s5xxResponse[i]
|
||||
}
|
||||
r.StatusTotal += r.Status2xx + r.Status4xx + r.Status5xx
|
||||
totalOverview.Status2xx += r.Status2xx
|
||||
totalOverview.Status4xx += r.Status4xx
|
||||
totalOverview.Status5xx += r.Status5xx
|
||||
totalOverview.StatusTotal += r.StatusTotal
|
||||
|
||||
result = append(result, r)
|
||||
|
||||
}
|
||||
|
||||
return dates, totalOverview, result, nil
|
||||
}
|
||||
|
||||
func (e *executor) aggregateSummary(ctx context.Context, start time.Time, end time.Time, measurement string, bucket string, filters string, fields []string) (map[string]*monitor.Aggregate, error) {
|
||||
if len(fields) == 0 {
|
||||
return nil, fmt.Errorf("fields is empty")
|
||||
}
|
||||
maxFields := make([]string, 0, len(fields))
|
||||
minFields := make([]string, 0, len(fields))
|
||||
avgFields := make([]string, 0, len(fields))
|
||||
for _, field := range fields {
|
||||
maxFields = append(maxFields, field+"_max")
|
||||
minFields = append(minFields, field+"_min")
|
||||
avgFields = append(avgFields, field+"_avg")
|
||||
}
|
||||
maxRes, err := e.fluxQuery.CommonQueryOnce(ctx, e.openApi, start, end, bucket, filters, &flux.StatisticsFilterConf{
|
||||
Measurement: measurement,
|
||||
AggregateFn: "max()",
|
||||
Fields: maxFields,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
minRes, err := e.fluxQuery.CommonQueryOnce(ctx, e.openApi, start, end, bucket, filters, &flux.StatisticsFilterConf{
|
||||
Measurement: measurement,
|
||||
AggregateFn: "min()",
|
||||
Fields: minFields,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
avgRes, err := e.fluxQuery.CommonQueryOnce(ctx, e.openApi, start, end, bucket, filters, &flux.StatisticsFilterConf{
|
||||
Measurement: measurement,
|
||||
AggregateFn: "mean()",
|
||||
Fields: avgFields,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result := make(map[string]*monitor.Aggregate)
|
||||
for _, field := range fields {
|
||||
a := new(monitor.Aggregate)
|
||||
if v, ok := avgRes[field+"_avg"]; ok {
|
||||
a.Avg = int64(v.(float64))
|
||||
}
|
||||
if v, ok := maxRes[field+"_max"]; ok {
|
||||
a.Max = v.(int64)
|
||||
}
|
||||
if v, ok := minRes[field+"_min"]; ok {
|
||||
a.Min = v.(int64)
|
||||
}
|
||||
|
||||
result[field] = a
|
||||
}
|
||||
|
||||
return result, nil
|
||||
|
||||
}
|
||||
|
||||
func (e *executor) SumResponseTimeOverview(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.Aggregate, []int64, error) {
|
||||
newStartTime, every, windowOffset, bucket := getTimeIntervalAndBucket(start, end)
|
||||
filters := formatFilter(wheres)
|
||||
|
||||
fieldsConditions := []string{"timing"}
|
||||
|
||||
agg, err := e.aggregateSummary(ctx, newStartTime, end, "request", bucket, filters, []string{"timing"})
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
dates, groupValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "request", filters, fieldsConditions, every, windowOffset, flux.SumFn)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
timing := groupValues["timing"]
|
||||
timingLen := len(timing)
|
||||
result := make([]int64, 0, len(dates))
|
||||
for i := range dates {
|
||||
if timingLen > i {
|
||||
result = append(result, timing[i])
|
||||
}
|
||||
}
|
||||
|
||||
return dates, agg["timing"], result, nil
|
||||
}
|
||||
|
||||
func (e *executor) AvgResponseTimeOverview(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.Aggregate, []int64, error) {
|
||||
newStartTime, every, windowOffset, bucket := getTimeIntervalAndBucket(start, end)
|
||||
filters := formatFilter(wheres)
|
||||
|
||||
fieldsConditions := []string{"timing_avg"}
|
||||
|
||||
agg, err := e.aggregateSummary(ctx, newStartTime, end, "request", bucket, filters, []string{"timing"})
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
dates, groupValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "request", filters, fieldsConditions, every, windowOffset, flux.AvgFn)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
timingAvg := groupValues["timing_avg"]
|
||||
timingAvgLen := len(timingAvg)
|
||||
result := make([]int64, 0, len(dates))
|
||||
for i := range dates {
|
||||
if timingAvgLen > i {
|
||||
result = append(result, timingAvg[i])
|
||||
}
|
||||
}
|
||||
|
||||
return dates, agg["timing"], result, nil
|
||||
|
||||
}
|
||||
|
||||
func (e *executor) RequestOverview(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.StatusCodeOverview, []*monitor.StatusCodeOverview, error) {
|
||||
newStartTime, every, windowOffset, bucket := getTimeIntervalAndBucket(start, end)
|
||||
filters := formatFilter(wheres)
|
||||
|
||||
requestConditions := []string{"total", "s2xx", "s4xx", "s5xx"}
|
||||
|
||||
dates, requestValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "request", filters, requestConditions, every, windowOffset, flux.SumFn)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
total := requestValues["total"]
|
||||
totalLen := len(total)
|
||||
s2xx := requestValues["s2xx"]
|
||||
s2xxLen := len(s2xx)
|
||||
s4xx := requestValues["s4xx"]
|
||||
s4xxLen := len(s4xx)
|
||||
s5xx := requestValues["s5xx"]
|
||||
s5xxLen := len(s5xx)
|
||||
totalOverview := new(monitor.StatusCodeOverview)
|
||||
result := make([]*monitor.StatusCodeOverview, 0, len(dates))
|
||||
for i := range dates {
|
||||
r := new(monitor.StatusCodeOverview)
|
||||
if totalLen > i {
|
||||
r.StatusTotal = total[i]
|
||||
totalOverview.StatusTotal += r.StatusTotal
|
||||
}
|
||||
if s2xxLen > i {
|
||||
r.Status2xx = s2xx[i]
|
||||
totalOverview.Status2xx += r.Status2xx
|
||||
}
|
||||
if s4xxLen > i {
|
||||
r.Status4xx = s4xx[i]
|
||||
totalOverview.Status4xx += r.Status4xx
|
||||
}
|
||||
if s5xxLen > i {
|
||||
r.Status5xx = s5xx[i]
|
||||
totalOverview.Status5xx += r.Status5xx
|
||||
}
|
||||
result = append(result, r)
|
||||
}
|
||||
return dates, totalOverview, result, nil
|
||||
}
|
||||
|
||||
func (e *executor) TopN(ctx context.Context, start time.Time, end time.Time, limit int, groupBy string, wheres []monitor.MonWhereItem) ([]*monitor.TopN, error) {
|
||||
filters := formatFilter(wheres)
|
||||
newStartTime, _, _, bucket := getTimeIntervalAndBucket(start, end)
|
||||
|
||||
statisticsConf := []*flux.StatisticsFilterConf{
|
||||
{
|
||||
Measurement: "request",
|
||||
AggregateFn: "sum()",
|
||||
Fields: []string{"total", "request", "response", "input_token", "output_token"},
|
||||
},
|
||||
{
|
||||
Measurement: "proxy",
|
||||
AggregateFn: "sum()",
|
||||
Fields: []string{"p_total"},
|
||||
},
|
||||
}
|
||||
|
||||
results, err := e.fluxQuery.CommonStatistics(ctx, e.openApi, newStartTime, end, bucket, groupBy, filters, statisticsConf, limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
topN := make([]*monitor.TopN, 0, len(results))
|
||||
for key, result := range results {
|
||||
n := new(monitor.TopN)
|
||||
n.Key = key
|
||||
n.Request = result.Total
|
||||
n.Token = result.TotalToken
|
||||
n.Traffic = result.TotalRequest + result.TotalResponse
|
||||
topN = append(topN, n)
|
||||
}
|
||||
|
||||
return topN, nil
|
||||
}
|
||||
|
||||
func (e *executor) TokenOverview(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.TokenOverview, []*monitor.TokenOverview, error) {
|
||||
newStartTime, every, windowOffset, bucket := getTimeIntervalAndBucket(start, end)
|
||||
filters := formatFilter(wheres)
|
||||
|
||||
requestConditions := []string{"total_token", "input_token", "output_token"}
|
||||
|
||||
dates, requestValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "request", filters, requestConditions, every, windowOffset, flux.SumFn)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
//total := requestValues["total_token"]
|
||||
//totalLen := len(total)
|
||||
input := requestValues["input_token"]
|
||||
inputLen := len(input)
|
||||
output := requestValues["output_token"]
|
||||
outputLen := len(output)
|
||||
totalOverview := new(monitor.TokenOverview)
|
||||
result := make([]*monitor.TokenOverview, 0, len(dates))
|
||||
for i := range dates {
|
||||
r := new(monitor.TokenOverview)
|
||||
if inputLen > i {
|
||||
r.InputToken = input[i]
|
||||
}
|
||||
if outputLen > i {
|
||||
r.OutputToken = output[i]
|
||||
}
|
||||
r.TotalToken = r.InputToken + r.OutputToken
|
||||
totalOverview.InputToken += r.InputToken
|
||||
totalOverview.OutputToken += r.OutputToken
|
||||
totalOverview.TotalToken += r.TotalToken
|
||||
result = append(result, r)
|
||||
}
|
||||
return dates, totalOverview, result, nil
|
||||
}
|
||||
|
||||
func (e *executor) ConsumerOverview(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) (int64, map[time.Time]int64, error) {
|
||||
newStartTime, every, offset, bucket := getTimeIntervalAndBucket(start, end)
|
||||
filters := formatFilter(wheres)
|
||||
|
||||
return e.fluxQuery.CommonTendencyTag(ctx, e.openApi, newStartTime, end, bucket, "request", filters, every, offset, "app")
|
||||
|
||||
}
|
||||
|
||||
@@ -14,7 +14,8 @@ import (
|
||||
type IFluxQuery interface {
|
||||
CommonStatistics(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, groupBy, filters string, statisticsConf []*StatisticsFilterConf, limit int) (map[string]*FluxStatistics, error)
|
||||
CommonProxyStatistics(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, groupBy, filters string, statisticsConf []*StatisticsFilterConf, limit int) (map[string]*FluxStatistics, error)
|
||||
CommonTendency(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, table, filters string, dataFields []string, every, windowOffset string) ([]time.Time, map[string][]int64, error)
|
||||
CommonTendency(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, table, filters string, dataFields []string, every, windowOffset string, fn AggregateFn) ([]time.Time, map[string][]int64, error)
|
||||
CommonTendencyTag(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, table, filters, every, offset, tag string) (int64, map[time.Time]int64, error)
|
||||
// CommonQueryOnce 查询只返回一条结果
|
||||
CommonQueryOnce(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, filters string, fieldsConf *StatisticsFilterConf) (map[string]interface{}, error)
|
||||
CommonWarnStatistics(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, groupBy, filters string, statisticsConf *StatisticsFilterConf) (map[string]*FluxWarnStatistics, error)
|
||||
@@ -61,18 +62,30 @@ func (f *fluxQuery) CommonStatistics(ctx context.Context, queryApi api.QueryAPI,
|
||||
totalRequest := common.FmtIntFromInterface(maps["request"])
|
||||
maxRequest := common.FmtIntFromInterface(maps["request_max"])
|
||||
minRequest := common.FmtIntFromInterface(maps["request_min"])
|
||||
totalResponse := common.FmtIntFromInterface(maps["response"])
|
||||
maxResponse := common.FmtIntFromInterface(maps["response_max"])
|
||||
minResponse := common.FmtIntFromInterface(maps["response_min"])
|
||||
inputToken := common.FmtIntFromInterface(maps["input_token"])
|
||||
outputToken := common.FmtIntFromInterface(maps["output_token"])
|
||||
//totalToken := common.FmtIntFromInterface(maps["total_token"])
|
||||
//maxToken := common.FmtIntFromInterface(maps["total_token_max"])
|
||||
//minToken := common.FmtIntFromInterface(maps["total_token_min"])
|
||||
|
||||
resultMap[key] = &FluxStatistics{
|
||||
Total: total,
|
||||
Success: success,
|
||||
ProxyTotal: pTotal,
|
||||
ProxySuccess: pSuccess,
|
||||
TotalTiming: totalTiming,
|
||||
MaxTiming: maxMinTiming,
|
||||
MinTiming: minTiming,
|
||||
TotalRequest: totalRequest,
|
||||
RequestMax: maxRequest,
|
||||
RequestMin: minRequest,
|
||||
Total: total,
|
||||
Success: success,
|
||||
ProxyTotal: pTotal,
|
||||
ProxySuccess: pSuccess,
|
||||
TotalTiming: totalTiming,
|
||||
MaxTiming: maxMinTiming,
|
||||
MinTiming: minTiming,
|
||||
TotalRequest: totalRequest,
|
||||
RequestMax: maxRequest,
|
||||
RequestMin: minRequest,
|
||||
TotalResponse: totalResponse,
|
||||
ResponseMax: maxResponse,
|
||||
ResponseMin: minResponse,
|
||||
TotalToken: inputToken + outputToken,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -128,10 +141,10 @@ func (f *fluxQuery) CommonProxyStatistics(ctx context.Context, queryApi api.Quer
|
||||
return resultMap, nil
|
||||
}
|
||||
|
||||
func (f *fluxQuery) CommonTendency(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, table, filters string, dataFields []string, every, windowOffset string) ([]time.Time, map[string][]int64, error) {
|
||||
func (f *fluxQuery) CommonTendency(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, table, filters string, dataFields []string, every, windowOffset string, fn AggregateFn) ([]time.Time, map[string][]int64, error) {
|
||||
fieldConditions := f.assembleTendencyFieldCondition(dataFields)
|
||||
//拼装请求
|
||||
query := f.assembleTendencyFlux(start, end, bucket, table, filters, fieldConditions, every, windowOffset)
|
||||
query := f.assembleTendencyFlux(start, end, bucket, table, filters, fieldConditions, every, windowOffset, fn)
|
||||
|
||||
log.Info("flux sql=", query)
|
||||
result, err := queryApi.Query(ctx, query)
|
||||
@@ -148,21 +161,46 @@ func (f *fluxQuery) CommonTendency(ctx context.Context, queryApi api.QueryAPI, s
|
||||
//初始返回内容
|
||||
dates := make([]time.Time, 0, len(resultList))
|
||||
resultMap := make(map[string][]int64, len(dataFields))
|
||||
for _, field := range dataFields {
|
||||
resultMap[field] = make([]int64, 0, len(resultList))
|
||||
}
|
||||
|
||||
for _, res := range resultList {
|
||||
for _, field := range dataFields {
|
||||
resultMap[field] = append(resultMap[field], common.FmtIntFromInterface(res[field]))
|
||||
}
|
||||
t, _ := res["_time"].(time.Time)
|
||||
dates = append(dates, t)
|
||||
dates = append(dates, t.In(time.Local))
|
||||
}
|
||||
|
||||
return dates, resultMap, nil
|
||||
}
|
||||
|
||||
func (f *fluxQuery) CommonTendencyTag(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, table, filters, every, offset, tag string) (int64, map[time.Time]int64, error) {
|
||||
query := f.assembleTendencyTagFlux(start, end, bucket, table, filters, every, offset, tag)
|
||||
log.Info("flux sql=", query)
|
||||
result, err := queryApi.Query(ctx, query)
|
||||
if err != nil {
|
||||
log.Error("flux err=", err)
|
||||
return 0, nil, err
|
||||
}
|
||||
dateMap := map[time.Time]map[string]struct{}{}
|
||||
tagMap := make(map[string]struct{})
|
||||
defer result.Close()
|
||||
for result.Next() {
|
||||
date := result.Record().Values()["_start"].(time.Time).In(time.Local)
|
||||
if _, ok := dateMap[date]; !ok {
|
||||
dateMap[date] = map[string]struct{}{}
|
||||
}
|
||||
if vv, ok := result.Record().Values()[tag]; ok {
|
||||
v := vv.(string)
|
||||
tagMap[v] = struct{}{}
|
||||
dateMap[date][v] = struct{}{}
|
||||
}
|
||||
}
|
||||
returnMap := make(map[time.Time]int64)
|
||||
for k, v := range dateMap {
|
||||
returnMap[k] = int64(len(v))
|
||||
}
|
||||
return int64(len(tagMap)), returnMap, nil
|
||||
}
|
||||
|
||||
func (f *fluxQuery) CommonQueryOnce(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, filters string, fieldsConf *StatisticsFilterConf) (map[string]interface{}, error) {
|
||||
query := f.getCircularMapFlux(start, end, bucket, filters, fieldsConf)
|
||||
|
||||
@@ -172,6 +210,7 @@ func (f *fluxQuery) CommonQueryOnce(ctx context.Context, queryApi api.QueryAPI,
|
||||
log.Error("flux err=", err)
|
||||
return nil, err
|
||||
}
|
||||
defer result.Close()
|
||||
|
||||
for result.Next() {
|
||||
return result.Record().Values(), nil
|
||||
@@ -270,7 +309,7 @@ from(bucket: "%s")
|
||||
}
|
||||
|
||||
return fmt.Sprintf(`
|
||||
union(tables: [
|
||||
union(tables: [
|
||||
%s
|
||||
])
|
||||
|> pivot(rowKey: ["%s"], columnKey: ["_field"], valueColumn: "_value")
|
||||
@@ -278,23 +317,59 @@ union(tables: [
|
||||
`, strings.Join(streams, ",\n"), groupBy, limitStr)
|
||||
}
|
||||
|
||||
func (f *fluxQuery) assembleTendencyFlux(start, end time.Time, bucket, table, filters, fieldConditions, every string, windowOffset string) string {
|
||||
type AggregateFn string
|
||||
|
||||
const (
|
||||
SumFn AggregateFn = "sum"
|
||||
MaxFn AggregateFn = "max"
|
||||
MinFn AggregateFn = "min"
|
||||
AvgFn AggregateFn = "mean"
|
||||
)
|
||||
|
||||
var (
|
||||
fns = map[AggregateFn]struct{}{
|
||||
SumFn: {},
|
||||
MaxFn: {},
|
||||
MinFn: {},
|
||||
}
|
||||
)
|
||||
|
||||
func (f *fluxQuery) assembleTendencyFlux(start, end time.Time, bucket, table, filters, fieldConditions, every, windowOffset string, fn AggregateFn) string {
|
||||
windowOffsetFlux := ""
|
||||
if windowOffset != "" {
|
||||
windowOffsetFlux = fmt.Sprintf(", offset: %s", windowOffset)
|
||||
}
|
||||
if _, ok := fns[fn]; !ok {
|
||||
fn = SumFn
|
||||
}
|
||||
|
||||
return fmt.Sprintf(`from(bucket: "%s")
|
||||
|> range(start: %d, stop: %d)
|
||||
|> filter(fn: (r) => r["_measurement"] == "%s")
|
||||
%s
|
||||
%s
|
||||
|> group(columns: ["_field"])
|
||||
|> aggregateWindow(every: %s, fn: sum, location: {offset: 0ns, zone: "Asia/Shanghai"}, timeSrc: "_start"%s)
|
||||
|> aggregateWindow(every: %s, fn: %s, location: {offset: 0ns, zone: "Asia/Shanghai"}, timeSrc: "_start"%s)
|
||||
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")`, bucket, start.Unix(), end.Unix(), table,
|
||||
filters, fieldConditions, every, windowOffsetFlux)
|
||||
filters, fieldConditions, every, string(fn), windowOffsetFlux)
|
||||
|
||||
}
|
||||
|
||||
func (f *fluxQuery) assembleTendencyTagFlux(start, end time.Time, bucket, table, filters string, every, offset, tag string) string {
|
||||
windowOffset := ""
|
||||
if len(offset) > 0 {
|
||||
windowOffset = fmt.Sprintf(", offset: %s", offset)
|
||||
}
|
||||
return fmt.Sprintf(`
|
||||
from(bucket: "%s")
|
||||
|> range(start: %d, stop: %d)
|
||||
|> filter(fn: (r) => r["_measurement"] == "%s")
|
||||
%s
|
||||
|> keep(columns: ["_time", "%s"])
|
||||
|> window(every: %s%s)
|
||||
|> distinct(column: "%s")`, bucket, start.Unix(), end.Unix(), table, filters, tag, every, windowOffset, tag)
|
||||
}
|
||||
|
||||
// assembleTendencyFieldCondition 封装趋势图需要的Field数据
|
||||
func (f *fluxQuery) assembleTendencyFieldCondition(fieldConditions []string) string {
|
||||
/*
|
||||
|
||||
@@ -2,22 +2,30 @@ package flux
|
||||
|
||||
// FluxStatistics flux统计通用字段
|
||||
type FluxStatistics struct {
|
||||
Total int64 `json:"total"` //总数
|
||||
Success int64 `json:"success"` //成功数
|
||||
ProxyTotal int64 `json:"p_total"` //转发总数
|
||||
ProxySuccess int64 `json:"p_success"` //转发成功数
|
||||
TotalTiming int64 `json:"timing"` //平均响应时间
|
||||
MaxTiming int64 `json:"timing_max"` //最大响应时间
|
||||
MinTiming int64 `json:"timing_min"` //最小响应时间
|
||||
TotalRequest int64 `json:"request"` //总请求流量
|
||||
RequestMax int64 `json:"request_max"` //最大流量
|
||||
RequestMin int64 `json:"request_min"` //最小流量
|
||||
Total int64 `json:"total"` //总数
|
||||
Success int64 `json:"success"` //成功数
|
||||
S2xx int64 `json:"s2xx"` //2xx
|
||||
ProxyTotal int64 `json:"p_total"` //转发总数
|
||||
ProxySuccess int64 `json:"p_success"` //转发成功数
|
||||
TotalTiming int64 `json:"timing"` //平均响应时间
|
||||
MaxTiming int64 `json:"timing_max"` //最大响应时间
|
||||
MinTiming int64 `json:"timing_min"` //最小响应时间
|
||||
TotalRequest int64 `json:"request"` //总请求流量
|
||||
RequestMax int64 `json:"request_max"` //最大流量
|
||||
RequestMin int64 `json:"request_min"` //最小流量
|
||||
TotalResponse int64 `json:"response"` //总请求流量
|
||||
ResponseMax int64 `json:"response_max"` //最大流量
|
||||
ResponseMin int64 `json:"response_min"` //最小流量
|
||||
TotalToken int64 `json:"total_token"` //总token流量
|
||||
TokenMax int64 `json:"total_token_max"` //最大token流量
|
||||
TokenMin int64 `json:"total_token_min"` //最小token流量
|
||||
}
|
||||
|
||||
// FluxWarnStatistics flux统计告警通用字段
|
||||
type FluxWarnStatistics struct {
|
||||
Total int64 `json:"total"` //总数
|
||||
Success int64 `json:"success"` //成功数
|
||||
S2xx int64 `json:"s2xx"`
|
||||
S4xx int64 `json:"s4xx"`
|
||||
S5xx int64 `json:"s5xx"`
|
||||
ProxyTotal int64 `json:"p_total"` //转发总数
|
||||
|
||||
@@ -0,0 +1,309 @@
|
||||
-
|
||||
task_name: "apinto_day_request_v1"
|
||||
cron: "0 0 * * *"
|
||||
offset: "2m30s"
|
||||
flux: |
|
||||
|
||||
from(bucket: "apinto/hour")
|
||||
|> range(start: -1d)
|
||||
|> filter(fn: (r) => r._measurement == "request")
|
||||
|> filter(
|
||||
fn: (r) =>
|
||||
r._field == "total" or r._field == "success" or r._field == "s2xx" or r._field == "s4xx" or r._field == "s5xx"
|
||||
or
|
||||
r._field == "timing" or r._field == "request" or r._field == "response" or r._field
|
||||
==
|
||||
"retry"
|
||||
or r._field == "total_token" or r._field == "input_token" or r._field == "output_token",
|
||||
)
|
||||
|> group(
|
||||
columns: [
|
||||
"api",
|
||||
"app",
|
||||
"upstream",
|
||||
"method",
|
||||
"node",
|
||||
"cluster",
|
||||
"provider",
|
||||
"api_kind",
|
||||
"status_code",
|
||||
"_field",
|
||||
],
|
||||
)
|
||||
|> sum()
|
||||
|> set(key: "_measurement", value: "request")
|
||||
|> to(
|
||||
bucket: "apinto/day",
|
||||
timeColumn: "_start",
|
||||
tagColumns: [
|
||||
"api",
|
||||
"app",
|
||||
"method",
|
||||
"upstream",
|
||||
"node",
|
||||
"cluster",
|
||||
"provider",
|
||||
"api_kind",
|
||||
"status_code",
|
||||
],
|
||||
)
|
||||
from(bucket: "apinto/hour")
|
||||
|> range(start: -1d)
|
||||
|> filter(fn: (r) => r._measurement == "request")
|
||||
|> filter(
|
||||
fn: (r) =>
|
||||
r._field == "timing_max" or r._field == "request_max" or r._field == "response_max"
|
||||
or
|
||||
r._field == "retry_max"
|
||||
or
|
||||
r._field == "input_token_max" or r._field == "output_token_max" or r._field == "total_token_max",
|
||||
)
|
||||
|> group(
|
||||
columns: [
|
||||
"api",
|
||||
"app",
|
||||
"upstream",
|
||||
"method",
|
||||
"node",
|
||||
"cluster",
|
||||
"provider",
|
||||
"api_kind",
|
||||
"status_code",
|
||||
"_field",
|
||||
],
|
||||
)
|
||||
|> max()
|
||||
|> set(key: "_measurement", value: "request")
|
||||
|> to(
|
||||
bucket: "apinto/day",
|
||||
timeColumn: "_start",
|
||||
tagColumns: [
|
||||
"api",
|
||||
"app",
|
||||
"method",
|
||||
"upstream",
|
||||
"node",
|
||||
"cluster",
|
||||
"provider",
|
||||
"api_kind",
|
||||
"status_code",
|
||||
],
|
||||
)
|
||||
from(bucket: "apinto/hour")
|
||||
|> range(start: -1d)
|
||||
|> filter(fn: (r) => r._measurement == "request")
|
||||
|> filter(
|
||||
fn: (r) =>
|
||||
r._field == "timing_min" or r._field == "request_min" or r._field == "response_min"
|
||||
or
|
||||
r._field == "retry_min"
|
||||
or
|
||||
r._field == "input_token_min" or r._field == "output_token_min" or r._field == "total_token_min",
|
||||
)
|
||||
|> group(
|
||||
columns: [
|
||||
"api",
|
||||
"app",
|
||||
"upstream",
|
||||
"method",
|
||||
"node",
|
||||
"cluster",
|
||||
"provider",
|
||||
"api_kind",
|
||||
"status_code",
|
||||
"_field",
|
||||
],
|
||||
)
|
||||
|> min()
|
||||
|> set(key: "_measurement", value: "request")
|
||||
|> to(
|
||||
bucket: "apinto/day",
|
||||
timeColumn: "_start",
|
||||
tagColumns: [
|
||||
"api",
|
||||
"app",
|
||||
"method",
|
||||
"upstream",
|
||||
"node",
|
||||
"cluster",
|
||||
"provider",
|
||||
"api_kind",
|
||||
"status_code",
|
||||
],
|
||||
)
|
||||
from(bucket: "apinto/hour")
|
||||
|> range(start: -1d)
|
||||
|> filter(fn: (r) => r._measurement == "request")
|
||||
|> filter(
|
||||
fn: (r) =>
|
||||
r._field == "timing_avg" or r._field == "request_avg" or r._field == "response_avg"
|
||||
or
|
||||
r._field == "input_token_avg" or r._field == "output_token_avg" or r._field == "total_token_avg",
|
||||
)
|
||||
|> group(
|
||||
columns: [
|
||||
"api",
|
||||
"app",
|
||||
"upstream",
|
||||
"method",
|
||||
"node",
|
||||
"cluster",
|
||||
"provider",
|
||||
"api_kind",
|
||||
"status_code",
|
||||
"_field",
|
||||
],
|
||||
)
|
||||
|> mean()
|
||||
|> set(key: "_measurement", value: "request")
|
||||
|> to(
|
||||
bucket: "apinto/day",
|
||||
timeColumn: "_start",
|
||||
tagColumns: [
|
||||
"api",
|
||||
"app",
|
||||
"method",
|
||||
"upstream",
|
||||
"node",
|
||||
"cluster",
|
||||
"provider",
|
||||
"api_kind",
|
||||
"status_code",
|
||||
],
|
||||
)
|
||||
-
|
||||
task_name: "apinto_day_proxy_v1"
|
||||
cron: "0 0 * * *"
|
||||
offset: "2m45s"
|
||||
flux: |
|
||||
|
||||
from(bucket: "apinto/hour")
|
||||
|> range(start: -1d)
|
||||
|> filter(fn: (r) => r._measurement == "proxy")
|
||||
|> filter(
|
||||
fn: (r) =>
|
||||
r._field == "p_total" or r._field == "p_success" or r._field == "p_s2xx" or r._field == "p_s4xx" or r._field
|
||||
==
|
||||
"p_s5xx" or r._field == "p_timing" or r._field == "p_request" or r._field
|
||||
==
|
||||
"p_response" or r._field == "p_retry",
|
||||
)
|
||||
|> group(
|
||||
columns: [
|
||||
"api",
|
||||
"app",
|
||||
"upstream",
|
||||
"addr",
|
||||
"method",
|
||||
"node",
|
||||
"cluster",
|
||||
"provider",
|
||||
"api_kind",
|
||||
"status_code",
|
||||
"_field",
|
||||
],
|
||||
)
|
||||
|> sum()
|
||||
|> set(key: "_measurement", value: "proxy")
|
||||
|> to(
|
||||
bucket: "apinto/day",
|
||||
timeColumn: "_start",
|
||||
tagColumns: [
|
||||
"api",
|
||||
"app",
|
||||
"method",
|
||||
"upstream",
|
||||
"addr",
|
||||
"node",
|
||||
"cluster",
|
||||
"provider",
|
||||
"api_kind",
|
||||
"status_code",
|
||||
],
|
||||
)
|
||||
|
||||
from(bucket: "apinto/hour")
|
||||
|> range(start: -1d)
|
||||
|> filter(fn: (r) => r._measurement == "proxy")
|
||||
|> filter(
|
||||
fn: (r) =>
|
||||
r._field == "p_timing_max" or r._field == "p_request_max" or r._field
|
||||
==
|
||||
"p_response_max" or r._field == "p_retry_max",
|
||||
)
|
||||
|> group(
|
||||
columns: [
|
||||
"api",
|
||||
"app",
|
||||
"upstream",
|
||||
"addr",
|
||||
"method",
|
||||
"node",
|
||||
"cluster",
|
||||
"provider",
|
||||
"api_kind",
|
||||
"status_code",
|
||||
"_field",
|
||||
],
|
||||
)
|
||||
|> max()
|
||||
|> set(key: "_measurement", value: "proxy")
|
||||
|> to(
|
||||
bucket: "apinto/day",
|
||||
timeColumn: "_start",
|
||||
tagColumns: [
|
||||
"api",
|
||||
"app",
|
||||
"method",
|
||||
"upstream",
|
||||
"addr",
|
||||
"node",
|
||||
"cluster",
|
||||
"provider",
|
||||
"api_kind",
|
||||
"status_code",
|
||||
],
|
||||
)
|
||||
|
||||
from(bucket: "apinto/hour")
|
||||
|> range(start: -1d)
|
||||
|> filter(fn: (r) => r._measurement == "proxy")
|
||||
|> filter(
|
||||
fn: (r) =>
|
||||
r._field == "p_timing_min" or r._field == "p_request_min" or r._field
|
||||
==
|
||||
"p_response_min" or r._field == "p_retry_min",
|
||||
)
|
||||
|> group(
|
||||
columns: [
|
||||
"api",
|
||||
"app",
|
||||
"upstream",
|
||||
"addr",
|
||||
"method",
|
||||
"node",
|
||||
"cluster",
|
||||
"provider",
|
||||
"api_kind",
|
||||
"status_code",
|
||||
"_field",
|
||||
],
|
||||
)
|
||||
|> min()
|
||||
|> set(key: "_measurement", value: "proxy")
|
||||
|> to(
|
||||
bucket: "apinto/day",
|
||||
timeColumn: "_start",
|
||||
tagColumns: [
|
||||
"api",
|
||||
"app",
|
||||
"method",
|
||||
"upstream",
|
||||
"addr",
|
||||
"node",
|
||||
"cluster",
|
||||
"provider",
|
||||
"api_kind",
|
||||
"status_code",
|
||||
],
|
||||
)
|
||||
@@ -0,0 +1,309 @@
|
||||
-
|
||||
task_name: "apinto_hour_request_v1"
|
||||
cron: "0 * * * *"
|
||||
offset: "1m30s"
|
||||
flux: |
|
||||
|
||||
from(bucket: "apinto/minute")
|
||||
|> range(start: -1h)
|
||||
|> filter(fn: (r) => r._measurement == "request")
|
||||
|> filter(
|
||||
fn: (r) =>
|
||||
r._field == "total" or r._field == "success" or r._field == "s2xx" or r._field == "s4xx" or r._field == "s5xx"
|
||||
or
|
||||
r._field == "timing" or r._field == "request" or r._field == "response" or r._field
|
||||
==
|
||||
"retry"
|
||||
or r._field == "total_token" or r._field == "input_token" or r._field == "output_token",
|
||||
)
|
||||
|> group(
|
||||
columns: [
|
||||
"api",
|
||||
"app",
|
||||
"upstream",
|
||||
"method",
|
||||
"node",
|
||||
"cluster",
|
||||
"provider",
|
||||
"api_kind",
|
||||
"status_code",
|
||||
"_field",
|
||||
"_measurement",
|
||||
],
|
||||
)
|
||||
|> sum()
|
||||
|> to(
|
||||
bucket: "apinto/hour",
|
||||
timeColumn: "_start",
|
||||
tagColumns: [
|
||||
"api",
|
||||
"app",
|
||||
"method",
|
||||
"upstream",
|
||||
"node",
|
||||
"cluster",
|
||||
"provider",
|
||||
"api_kind",
|
||||
"status_code",
|
||||
],
|
||||
)
|
||||
from(bucket: "apinto/minute")
|
||||
|> range(start: -1h)
|
||||
|> filter(fn: (r) => r._measurement == "request")
|
||||
|> filter(
|
||||
fn: (r) =>
|
||||
r._field == "timing_max" or r._field == "request_max" or r._field == "response_max"
|
||||
or
|
||||
r._field == "retry_max"
|
||||
or
|
||||
r._field == "input_token_max" or r._field == "output_token_max" or r._field == "total_token_max",
|
||||
)
|
||||
|> group(
|
||||
columns: [
|
||||
"api",
|
||||
"app",
|
||||
"upstream",
|
||||
"method",
|
||||
"node",
|
||||
"cluster",
|
||||
"provider",
|
||||
"api_kind",
|
||||
"status_code",
|
||||
"_field",
|
||||
"_measurement",
|
||||
],
|
||||
)
|
||||
|> max()
|
||||
|> to(
|
||||
bucket: "apinto/hour",
|
||||
timeColumn: "_start",
|
||||
tagColumns: [
|
||||
"api",
|
||||
"app",
|
||||
"method",
|
||||
"upstream",
|
||||
"node",
|
||||
"cluster",
|
||||
"provider",
|
||||
"api_kind",
|
||||
"status_code",
|
||||
],
|
||||
)
|
||||
from(bucket: "apinto/minute")
|
||||
|> range(start: -1h)
|
||||
|> filter(fn: (r) => r._measurement == "request")
|
||||
|> filter(
|
||||
fn: (r) =>
|
||||
r._field == "timing_min" or r._field == "request_min" or r._field == "response_min"
|
||||
or
|
||||
r._field == "retry_min"
|
||||
or
|
||||
r._field == "input_token_min" or r._field == "output_token_min" or r._field == "total_token_min",
|
||||
)
|
||||
|> group(
|
||||
columns: [
|
||||
"api",
|
||||
"app",
|
||||
"upstream",
|
||||
"method",
|
||||
"node",
|
||||
"cluster",
|
||||
"provider",
|
||||
"api_kind",
|
||||
"status_code",
|
||||
"_field",
|
||||
"_measurement",
|
||||
],
|
||||
)
|
||||
|> min()
|
||||
|> to(
|
||||
bucket: "apinto/hour",
|
||||
timeColumn: "_start",
|
||||
tagColumns: [
|
||||
"api",
|
||||
"app",
|
||||
"method",
|
||||
"upstream",
|
||||
"node",
|
||||
"cluster",
|
||||
"provider",
|
||||
"api_kind",
|
||||
"status_code",
|
||||
],
|
||||
)
|
||||
from(bucket: "apinto/minute")
|
||||
|> range(start: -1h)
|
||||
|> filter(fn: (r) => r._measurement == "request")
|
||||
|> filter(
|
||||
fn: (r) =>
|
||||
r._field == "timing_avg" or r._field == "request_avg" or r._field == "response_avg"
|
||||
or
|
||||
r._field == "input_token_avg" or r._field == "output_token_avg" or r._field == "total_token_avg",
|
||||
)
|
||||
|> group(
|
||||
columns: [
|
||||
"api",
|
||||
"app",
|
||||
"upstream",
|
||||
"method",
|
||||
"node",
|
||||
"cluster",
|
||||
"provider",
|
||||
"api_kind",
|
||||
"status_code",
|
||||
"_field",
|
||||
"_measurement",
|
||||
],
|
||||
)
|
||||
|> mean()
|
||||
|> to(
|
||||
bucket: "apinto/hour",
|
||||
timeColumn: "_start",
|
||||
tagColumns: [
|
||||
"api",
|
||||
"app",
|
||||
"method",
|
||||
"upstream",
|
||||
"node",
|
||||
"cluster",
|
||||
"provider",
|
||||
"api_kind",
|
||||
"status_code",
|
||||
],
|
||||
)
|
||||
-
|
||||
task_name: "apinto_hour_proxy_v1"
|
||||
cron: "0 * * * *"
|
||||
offset: "1m45s"
|
||||
flux: |
|
||||
|
||||
from(bucket: "apinto/minute")
|
||||
|> range(start: -1h)
|
||||
|> filter(fn: (r) => r._measurement == "proxy")
|
||||
|> filter(
|
||||
fn: (r) =>
|
||||
r._field == "p_total" or r._field == "p_success" or r._field == "p_s2xx" or r._field == "p_s4xx" or r._field
|
||||
==
|
||||
"p_s5xx" or r._field == "p_timing" or r._field == "p_request" or r._field
|
||||
==
|
||||
"p_response" or r._field == "p_retry",
|
||||
)
|
||||
|> group(
|
||||
columns: [
|
||||
"api",
|
||||
"app",
|
||||
"upstream",
|
||||
"addr",
|
||||
"method",
|
||||
"node",
|
||||
"cluster",
|
||||
"provider",
|
||||
"api_kind",
|
||||
"status_code",
|
||||
"_field",
|
||||
"_measurement",
|
||||
],
|
||||
)
|
||||
|> sum()
|
||||
|> to(
|
||||
bucket: "apinto/hour",
|
||||
timeColumn: "_start",
|
||||
tagColumns: [
|
||||
"api",
|
||||
"app",
|
||||
"method",
|
||||
"upstream",
|
||||
"addr",
|
||||
"node",
|
||||
"cluster",
|
||||
"provider",
|
||||
"api_kind",
|
||||
"status_code",
|
||||
],
|
||||
)
|
||||
|
||||
from(bucket: "apinto/minute")
|
||||
|> range(start: -1h)
|
||||
|> filter(fn: (r) => r._measurement == "proxy")
|
||||
|> filter(
|
||||
fn: (r) =>
|
||||
r._field == "p_timing_max" or r._field == "p_request_max" or r._field
|
||||
==
|
||||
"p_response_max" or r._field == "p_retry_max",
|
||||
)
|
||||
|> group(
|
||||
columns: [
|
||||
"api",
|
||||
"app",
|
||||
"upstream",
|
||||
"addr",
|
||||
"method",
|
||||
"node",
|
||||
"cluster",
|
||||
"provider",
|
||||
"api_kind",
|
||||
"status_code",
|
||||
"_field",
|
||||
"_measurement",
|
||||
],
|
||||
)
|
||||
|> max()
|
||||
|> to(
|
||||
bucket: "apinto/hour",
|
||||
timeColumn: "_start",
|
||||
tagColumns: [
|
||||
"api",
|
||||
"app",
|
||||
"method",
|
||||
"upstream",
|
||||
"addr",
|
||||
"node",
|
||||
"cluster",
|
||||
"provider",
|
||||
"api_kind",
|
||||
"status_code",
|
||||
],
|
||||
)
|
||||
|
||||
from(bucket: "apinto/minute")
|
||||
|> range(start: -1h)
|
||||
|> filter(fn: (r) => r._measurement == "proxy")
|
||||
|> filter(
|
||||
fn: (r) =>
|
||||
r._field == "p_timing_min" or r._field == "p_request_min" or r._field
|
||||
==
|
||||
"p_response_min" or r._field == "p_retry_min",
|
||||
)
|
||||
|> group(
|
||||
columns: [
|
||||
"api",
|
||||
"app",
|
||||
"upstream",
|
||||
"addr",
|
||||
"method",
|
||||
"node",
|
||||
"cluster",
|
||||
"provider",
|
||||
"api_kind",
|
||||
"status_code",
|
||||
"_field",
|
||||
"_measurement",
|
||||
],
|
||||
)
|
||||
|> min()
|
||||
|> to(
|
||||
bucket: "apinto/hour",
|
||||
timeColumn: "_start",
|
||||
tagColumns: [
|
||||
"api",
|
||||
"app",
|
||||
"method",
|
||||
"upstream",
|
||||
"addr",
|
||||
"node",
|
||||
"cluster",
|
||||
"provider",
|
||||
"api_kind",
|
||||
"status_code",
|
||||
],
|
||||
)
|
||||
+455
-680
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,307 @@
|
||||
-
|
||||
task_name: "apinto_week_request_v1"
|
||||
cron: "0 0 * * 1"
|
||||
offset: "3m30s"
|
||||
flux: |
|
||||
|
||||
from(bucket: "apinto/day")
|
||||
|> range(start: -1w)
|
||||
|> filter(fn: (r) => r._measurement == "request")
|
||||
|> filter(
|
||||
fn: (r) =>
|
||||
r._field == "total" or r._field == "success" or r._field == "s2xx" or r._field == "s4xx" or r._field == "s5xx"
|
||||
or
|
||||
r._field == "timing" or r._field == "request" or r._field == "response" or r._field
|
||||
==
|
||||
"retry"
|
||||
or r._field == "total_token" or r._field == "input_token" or r._field == "output_token",
|
||||
)
|
||||
|> group(
|
||||
columns: [
|
||||
"api",
|
||||
"app",
|
||||
"upstream",
|
||||
"method",
|
||||
"node",
|
||||
"cluster",
|
||||
"provider",
|
||||
"api_kind",
|
||||
"status_code",
|
||||
"_field",
|
||||
],
|
||||
)
|
||||
|> sum()
|
||||
|> set(key: "_measurement", value: "request")
|
||||
|> to(
|
||||
bucket: "apinto/week",
|
||||
timeColumn: "_start",
|
||||
tagColumns: [
|
||||
"api",
|
||||
"app",
|
||||
"method",
|
||||
"upstream",
|
||||
"node",
|
||||
"cluster",
|
||||
"provider",
|
||||
"api_kind",
|
||||
"status_code",
|
||||
],
|
||||
)
|
||||
from(bucket: "apinto/day")
|
||||
|> range(start: -1w)
|
||||
|> filter(fn: (r) => r._measurement == "request")
|
||||
|> filter(
|
||||
fn: (r) =>
|
||||
r._field == "timing_max" or r._field == "request_max" or r._field == "response_max"
|
||||
or
|
||||
r._field == "retry_max"
|
||||
or
|
||||
r._field == "input_token_max" or r._field == "output_token_max" or r._field == "total_token_max",
|
||||
)
|
||||
|> group(
|
||||
columns: [
|
||||
"api",
|
||||
"app",
|
||||
"upstream",
|
||||
"method",
|
||||
"node",
|
||||
"cluster",
|
||||
"provider",
|
||||
"api_kind",
|
||||
"status_code",
|
||||
"_field",
|
||||
],
|
||||
)
|
||||
|> max()
|
||||
|> set(key: "_measurement", value: "request")
|
||||
|> to(
|
||||
bucket: "apinto/week",
|
||||
timeColumn: "_start",
|
||||
tagColumns: [
|
||||
"api",
|
||||
"app",
|
||||
"method",
|
||||
"upstream",
|
||||
"node",
|
||||
"cluster",
|
||||
"provider",
|
||||
"api_kind",
|
||||
"status_code",
|
||||
],
|
||||
)
|
||||
from(bucket: "apinto/day")
|
||||
|> range(start: -1w)
|
||||
|> filter(fn: (r) => r._measurement == "request")
|
||||
|> filter(
|
||||
fn: (r) =>
|
||||
r._field == "timing_min" or r._field == "request_min" or r._field == "response_min"
|
||||
or
|
||||
r._field == "retry_min"
|
||||
or
|
||||
r._field == "input_token_min" or r._field == "output_token_min" or r._field == "total_token_min",
|
||||
)
|
||||
|> group(
|
||||
columns: [
|
||||
"api",
|
||||
"app",
|
||||
"upstream",
|
||||
"method",
|
||||
"node",
|
||||
"cluster",
|
||||
"provider",
|
||||
"api_kind",
|
||||
"status_code",
|
||||
"_field",
|
||||
],
|
||||
)
|
||||
|> min()
|
||||
|> set(key: "_measurement", value: "request")
|
||||
|> to(
|
||||
bucket: "apinto/week",
|
||||
timeColumn: "_start",
|
||||
tagColumns: [
|
||||
"api",
|
||||
"app",
|
||||
"method",
|
||||
"upstream",
|
||||
"node",
|
||||
"cluster",
|
||||
"provider",
|
||||
"api_kind",
|
||||
"status_code",
|
||||
],
|
||||
)
|
||||
from(bucket: "apinto/day")
|
||||
|> range(start: -1w)
|
||||
|> filter(fn: (r) => r._measurement == "request")
|
||||
|> filter(
|
||||
fn: (r) =>
|
||||
r._field == "timing_avg" or r._field == "request_avg" or r._field == "response_avg"
|
||||
or
|
||||
r._field == "input_token_avg" or r._field == "output_token_avg" or r._field == "total_token_avg",
|
||||
)
|
||||
|> group(
|
||||
columns: [
|
||||
"api",
|
||||
"app",
|
||||
"upstream",
|
||||
"method",
|
||||
"node",
|
||||
"cluster",
|
||||
"provider",
|
||||
"api_kind",
|
||||
"status_code",
|
||||
"_field",
|
||||
],
|
||||
)
|
||||
|> mean()
|
||||
|> set(key: "_measurement", value: "request")
|
||||
|> to(
|
||||
bucket: "apinto/week",
|
||||
timeColumn: "_start",
|
||||
tagColumns: [
|
||||
"api",
|
||||
"app",
|
||||
"method",
|
||||
"upstream",
|
||||
"node",
|
||||
"cluster",
|
||||
"provider",
|
||||
"api_kind",
|
||||
"status_code",
|
||||
],
|
||||
)
|
||||
-
|
||||
task_name: "apinto_week_proxy_v1"
|
||||
cron: "0 0 * * 1"
|
||||
offset: "3m45s"
|
||||
flux: |
|
||||
|
||||
from(bucket: "apinto/day")
|
||||
|> range(start: -1w)
|
||||
|> filter(fn: (r) => r._measurement == "proxy")
|
||||
|> filter(
|
||||
fn: (r) =>
|
||||
r._field == "p_total" or r._field == "p_success" or r._field == "p_s2xx" or r._field == "p_s4xx" or r._field
|
||||
==
|
||||
"p_s5xx" or r._field == "p_timing" or r._field == "p_request" or r._field
|
||||
==
|
||||
"p_response" or r._field == "p_retry",
|
||||
)
|
||||
|> group(
|
||||
columns: [
|
||||
"api",
|
||||
"app",
|
||||
"upstream",
|
||||
"addr",
|
||||
"method",
|
||||
"node",
|
||||
"cluster",
|
||||
"provider",
|
||||
"api_kind",
|
||||
"status_code",
|
||||
"_field",
|
||||
],
|
||||
)
|
||||
|> sum()
|
||||
|> set(key: "_measurement", value: "proxy")
|
||||
|> to(
|
||||
bucket: "apinto/week",
|
||||
timeColumn: "_start",
|
||||
tagColumns: [
|
||||
"api",
|
||||
"app",
|
||||
"method",
|
||||
"upstream",
|
||||
"addr",
|
||||
"node",
|
||||
"cluster",
|
||||
"provider",
|
||||
"api_kind",
|
||||
"status_code",
|
||||
],
|
||||
)
|
||||
from(bucket: "apinto/day")
|
||||
|> range(start: -1w)
|
||||
|> filter(fn: (r) => r._measurement == "proxy")
|
||||
|> filter(
|
||||
fn: (r) =>
|
||||
r._field == "p_timing_max" or r._field == "p_request_max" or r._field
|
||||
==
|
||||
"p_response_max" or r._field == "p_retry_max",
|
||||
)
|
||||
|> group(
|
||||
columns: [
|
||||
"api",
|
||||
"app",
|
||||
"upstream",
|
||||
"addr",
|
||||
"method",
|
||||
"node",
|
||||
"cluster",
|
||||
"provider",
|
||||
"api_kind",
|
||||
"status_code",
|
||||
"_field",
|
||||
],
|
||||
)
|
||||
|> max()
|
||||
|> set(key: "_measurement", value: "proxy")
|
||||
|> to(
|
||||
bucket: "apinto/week",
|
||||
timeColumn: "_start",
|
||||
tagColumns: [
|
||||
"api",
|
||||
"app",
|
||||
"method",
|
||||
"upstream",
|
||||
"addr",
|
||||
"node",
|
||||
"cluster",
|
||||
"provider",
|
||||
"api_kind",
|
||||
"status_code",
|
||||
],
|
||||
)
|
||||
from(bucket: "apinto/day")
|
||||
|> range(start: -1w)
|
||||
|> filter(fn: (r) => r._measurement == "proxy")
|
||||
|> filter(
|
||||
fn: (r) =>
|
||||
r._field == "p_timing_min" or r._field == "p_request_min" or r._field
|
||||
==
|
||||
"p_response_min" or r._field == "p_retry_min",
|
||||
)
|
||||
|> group(
|
||||
columns: [
|
||||
"api",
|
||||
"app",
|
||||
"upstream",
|
||||
"addr",
|
||||
"method",
|
||||
"node",
|
||||
"cluster",
|
||||
"provider",
|
||||
"api_kind",
|
||||
"status_code",
|
||||
"_field",
|
||||
],
|
||||
)
|
||||
|> min()
|
||||
|> set(key: "_measurement", value: "proxy")
|
||||
|> to(
|
||||
bucket: "apinto/week",
|
||||
timeColumn: "_start",
|
||||
tagColumns: [
|
||||
"api",
|
||||
"app",
|
||||
"method",
|
||||
"upstream",
|
||||
"addr",
|
||||
"node",
|
||||
"cluster",
|
||||
"provider",
|
||||
"api_kind",
|
||||
"status_code",
|
||||
],
|
||||
)
|
||||
@@ -1,13 +1,18 @@
|
||||
package flux
|
||||
|
||||
import (
|
||||
"embed"
|
||||
_ "embed"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"gopkg.in/yaml.v3"
|
||||
"github.com/eolinker/eosc/log"
|
||||
|
||||
yaml "gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
//go:embed influxdb_config/tasks.yaml
|
||||
var tasksData []byte
|
||||
//go:embed tasks/*.yaml
|
||||
var taskReader embed.FS
|
||||
|
||||
var (
|
||||
taskList []*TaskConf
|
||||
@@ -22,9 +27,28 @@ type TaskConf struct {
|
||||
|
||||
func initTasksConfig() {
|
||||
conf := make([]*TaskConf, 0, 15)
|
||||
err := yaml.Unmarshal(tasksData, &conf)
|
||||
files, err := taskReader.ReadDir("tasks")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
panic(fmt.Sprintf("read tasks dir error: %v", err))
|
||||
}
|
||||
for _, file := range files {
|
||||
if file.IsDir() || !strings.HasSuffix(file.Name(), ".yaml") {
|
||||
continue
|
||||
}
|
||||
name := fmt.Sprintf("tasks/%s", file.Name())
|
||||
data, err := taskReader.ReadFile(name)
|
||||
if err != nil {
|
||||
log.Errorf("read file(%s) error: %v", name, err)
|
||||
continue
|
||||
}
|
||||
tmp := make([]*TaskConf, 0, 15)
|
||||
err = yaml.Unmarshal(data, &tmp)
|
||||
if err != nil {
|
||||
log.Errorf("unmarshal file(%s) error: %v", name, err)
|
||||
continue
|
||||
}
|
||||
conf = append(conf, tmp...)
|
||||
|
||||
}
|
||||
taskList = conf
|
||||
}
|
||||
|
||||
@@ -15,9 +15,9 @@ const (
|
||||
tenDay = 10 * oneDay
|
||||
oneYear = 365 * oneDay
|
||||
|
||||
bucketMinuteRetention = (7 - 1) * oneDay
|
||||
bucketHourRetention = (90 - 1) * oneDay
|
||||
bucketDayRetention = (5*365 - 1) * oneDay
|
||||
bucketMinuteRetention = (7) * oneDay
|
||||
bucketHourRetention = (90) * oneDay
|
||||
bucketDayRetention = (5 * 365) * oneDay
|
||||
|
||||
bucketMinute = "apinto/minute"
|
||||
bucketHour = "apinto/hour"
|
||||
@@ -127,11 +127,11 @@ func getTimeIntervalAndBucket(startTime, endTime time.Time) (time.Time, string,
|
||||
switch minimumBucket {
|
||||
case bucketMinute:
|
||||
offset := ""
|
||||
offsetTime := startTime.Minute() % 5
|
||||
offsetTime := startTime.Minute() % 10
|
||||
if offsetTime != 0 {
|
||||
offset = fmt.Sprintf("%dm", offsetTime)
|
||||
}
|
||||
return startTime, "5m", offset, bucketMinute
|
||||
return startTime, "10m", offset, bucketMinute
|
||||
|
||||
case bucketHour:
|
||||
newStart := formatStartTimeHour(startTime, location)
|
||||
@@ -148,7 +148,15 @@ func getTimeIntervalAndBucket(startTime, endTime time.Time) (time.Time, string,
|
||||
} else if diff <= tenDay {
|
||||
|
||||
switch minimumBucket {
|
||||
case bucketMinute, bucketHour:
|
||||
case bucketMinute:
|
||||
offset := ""
|
||||
offsetTime := startTime.Minute()
|
||||
if offsetTime != 0 {
|
||||
offset = fmt.Sprintf("%dm", offsetTime)
|
||||
}
|
||||
return startTime, "1h", offset, bucketMinute
|
||||
|
||||
case bucketHour:
|
||||
newStart := formatStartTimeHour(startTime, location)
|
||||
return newStart, "1h", "", bucketHour
|
||||
case bucketDay:
|
||||
|
||||
@@ -138,3 +138,101 @@ type MonitorCluster struct {
|
||||
Name string `json:"name"`
|
||||
Enable bool `json:"enable"`
|
||||
}
|
||||
|
||||
type ChartOverview struct {
|
||||
}
|
||||
|
||||
type StatusCodeOverview struct {
|
||||
Status2xx int64 `json:"2xx"` //状态码2xx数
|
||||
Status4xx int64 `json:"4xx"`
|
||||
Status5xx int64 `json:"5xx"` //状态码5xx数
|
||||
}
|
||||
|
||||
type TokenOverview struct {
|
||||
TotalToken int64 `json:"total_token"` //总token流量
|
||||
OutputToken int64 `json:"output_token"`
|
||||
InputToken int64 `json:"input_token"` //最小token流量
|
||||
}
|
||||
|
||||
type TokenFloatOverview struct {
|
||||
TotalToken float64 `json:"total_token"` //总token流量
|
||||
OutputToken float64 `json:"output_token"`
|
||||
InputToken float64 `json:"input_token"` //最小token流量
|
||||
}
|
||||
|
||||
type ChartAIOverview struct {
|
||||
RequestOverview []*StatusCodeOverview `json:"request_overview"`
|
||||
AvgRequestPerSubscriberOverview []float64 `json:"avg_request_per_subscriber_overview"` //平均响应时间概况
|
||||
MaxRequestPerSubscriber float64 `json:"max_request_per_subscriber"`
|
||||
MinRequestPerSubscriber float64 `json:"min_request_per_subscriber"`
|
||||
|
||||
RequestTotal int64 `json:"request_total"`
|
||||
Request2xxTotal int64 `json:"request_2xx_total"`
|
||||
Request4xxTotal int64 `json:"request_4xx_total"`
|
||||
Request5xxTotal int64 `json:"request_5xx_total"`
|
||||
TokenTotal int64 `json:"token_total"` //总token流量
|
||||
InputTokenTotal int64 `json:"input_token_total"`
|
||||
OutputTokenTotal int64 `json:"output_token_total"` //最大token流量
|
||||
TokenOverview []*TokenOverview `json:"token_overview"` //token概况
|
||||
AvgTokenOverview []float64 `json:"avg_token_overview"`
|
||||
AvgTokenPerSubscriberOverview []*TokenFloatOverview `json:"avg_token_per_subscriber_overview"`
|
||||
AvgToken float64 `json:"avg_token"`
|
||||
MaxToken float64 `json:"max_token"`
|
||||
MinToken float64 `json:"min_token"`
|
||||
Date []string `json:"date"`
|
||||
MaxTokenPerSubscriber float64 `json:"max_token_per_subscriber"`
|
||||
MinTokenPerSubscriber float64 `json:"min_token_per_subscriber"`
|
||||
}
|
||||
|
||||
type ChartRestOverview struct {
|
||||
RequestOverview []*StatusCodeOverview `json:"request_overview"` //请求概况
|
||||
AvgRequestPerSubscriberOverview []float64 `json:"avg_request_per_subscriber_overview"` //平均响应时间概况
|
||||
MaxRequestPerSubscriber float64 `json:"max_request_per_subscriber"`
|
||||
MinRequestPerSubscriber float64 `json:"min_request_per_subscriber"`
|
||||
|
||||
RequestTotal int64 `json:"request_total"`
|
||||
Request2xxTotal int64 `json:"request_2xx_total"`
|
||||
Request4xxTotal int64 `json:"request_4xx_total"`
|
||||
Request5xxTotal int64 `json:"request_5xx_total"`
|
||||
|
||||
TrafficOverview []*StatusCodeOverview `json:"traffic_overview"` //流量概况
|
||||
Traffic2xxTotal int64 `json:"traffic_2xx_total"`
|
||||
Traffic4xxTotal int64 `json:"traffic_4xx_total"` //流量概况
|
||||
Traffic5xxTotal int64 `json:"traffic_5xx_total"` //流量概况
|
||||
|
||||
AvgResponseTimeOverview []int64 `json:"avg_response_time_overview"` //平均响应时间概况
|
||||
AvgTrafficPerSubscriberOverview []float64 `json:"avg_traffic_per_subscriber_overview"`
|
||||
TrafficTotal int64 `json:"traffic_total"`
|
||||
AvgResponseTime int64 `json:"avg_response_time"` //平均响应时间
|
||||
MaxResponseTime int64 `json:"max_response_time"` //最大响应时间
|
||||
MinResponseTime int64 `json:"min_response_time"` //最小响应时间
|
||||
Date []string `json:"date"`
|
||||
MaxTrafficPerSubscriber float64 `json:"max_traffic_per_subscriber"`
|
||||
MinTrafficPerSubscriber float64 `json:"min_traffic_per_subscriber"`
|
||||
}
|
||||
|
||||
type ServiceChartRestOverview struct {
|
||||
EnableMCP bool `json:"enable_mcp"`
|
||||
SubscriberNum int64 `json:"subscriber_num"`
|
||||
APINum int64 `json:"api_num"`
|
||||
ServiceKind string `json:"service_kind"`
|
||||
AvailableMonitor bool `json:"available_monitor"`
|
||||
*ChartRestOverview
|
||||
}
|
||||
|
||||
type ServiceChartAIOverview struct {
|
||||
EnableMCP bool `json:"enable_mcp"`
|
||||
SubscriberNum int64 `json:"subscriber_num"`
|
||||
APINum int64 `json:"api_num"`
|
||||
ServiceKind string `json:"service_kind"`
|
||||
AvailableMonitor bool `json:"available_monitor"`
|
||||
*ChartAIOverview
|
||||
}
|
||||
|
||||
type TopN struct {
|
||||
Id string `json:"id"`
|
||||
Name string `json:"name"`
|
||||
Request string `json:"request"`
|
||||
Traffic string `json:"traffic,omitempty"`
|
||||
Token string `json:"token,omitempty"`
|
||||
}
|
||||
|
||||
+471
-17
@@ -5,9 +5,13 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/APIParkLab/APIPark/common"
|
||||
|
||||
"github.com/APIParkLab/APIPark/gateway"
|
||||
"github.com/eolinker/eosc/log"
|
||||
"github.com/eolinker/go-common/auto"
|
||||
@@ -43,6 +47,439 @@ type imlMonitorStatisticModule struct {
|
||||
apiService api.IAPIService `autowired:""`
|
||||
}
|
||||
|
||||
func (i *imlMonitorStatisticModule) genOverviewWhere(ctx context.Context, serviceId string, apiKind []string) ([]monitor.MonWhereItem, error) {
|
||||
clusterId := cluster.DefaultClusterID
|
||||
_, err := i.clusterService.Get(ctx, clusterId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
wheres, err := i.genCommonWheres(ctx, clusterId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if serviceId != "" {
|
||||
wheres = append(wheres, monitor.MonWhereItem{
|
||||
Key: "provider",
|
||||
Operation: "=",
|
||||
Values: []string{serviceId},
|
||||
})
|
||||
}
|
||||
if len(apiKind) > 0 {
|
||||
wheres = append(wheres, monitor.MonWhereItem{
|
||||
Key: "api_kind",
|
||||
Operation: "in",
|
||||
Values: apiKind,
|
||||
})
|
||||
}
|
||||
return wheres, nil
|
||||
}
|
||||
|
||||
func (i *imlMonitorStatisticModule) AIChartOverview(ctx context.Context, serviceId string, start int64, end int64) (*monitor_dto.ChartAIOverview, error) {
|
||||
wheres, err := i.genOverviewWhere(ctx, serviceId, []string{"ai"})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
executor, err := i.getExecutor(ctx, cluster.DefaultClusterID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
_, consumerMap, err := executor.ConsumerOverview(ctx, formatTimeByMinute(start), formatTimeByMinute(end), wheres)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(3)
|
||||
errChan := make(chan error, 3)
|
||||
result := new(monitor_dto.ChartAIOverview)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
date, summary, items, err := executor.RequestOverview(ctx, formatTimeByMinute(start), formatTimeByMinute(end), wheres)
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
result.Date = utils.SliceToSlice(date, func(t time.Time) string {
|
||||
return t.Format("2006/01/02 15:04")
|
||||
})
|
||||
result.AvgRequestPerSubscriberOverview = make([]float64, 0, len(items))
|
||||
result.RequestOverview = make([]*monitor_dto.StatusCodeOverview, 0, len(items))
|
||||
for index, item := range items {
|
||||
consumerNum := consumerMap[date[index]]
|
||||
avgRequestPerSubscriber := 0.0
|
||||
if consumerNum != 0 {
|
||||
avgRequestPerSubscriber = float64(item.StatusTotal) / float64(consumerNum)
|
||||
if avgRequestPerSubscriber > result.MaxRequestPerSubscriber {
|
||||
result.MaxRequestPerSubscriber = avgRequestPerSubscriber
|
||||
}
|
||||
if result.MinRequestPerSubscriber == 0 || result.MinRequestPerSubscriber > avgRequestPerSubscriber {
|
||||
result.MinRequestPerSubscriber = avgRequestPerSubscriber
|
||||
}
|
||||
}
|
||||
|
||||
result.AvgRequestPerSubscriberOverview = append(result.AvgRequestPerSubscriberOverview, avgRequestPerSubscriber)
|
||||
result.RequestOverview = append(result.RequestOverview, &monitor_dto.StatusCodeOverview{
|
||||
Status2xx: item.Status2xx,
|
||||
Status4xx: item.Status4xx,
|
||||
Status5xx: item.Status5xx,
|
||||
})
|
||||
}
|
||||
|
||||
result.RequestTotal = summary.StatusTotal
|
||||
result.Request2xxTotal = summary.Status2xx
|
||||
result.Request4xxTotal = summary.Status4xx
|
||||
result.Request5xxTotal = summary.Status5xx
|
||||
}()
|
||||
sumResponseTimes := make([]int64, 0)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
_, _, items, err := executor.SumResponseTimeOverview(ctx, formatTimeByMinute(start), formatTimeByMinute(end), wheres)
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
for _, item := range items {
|
||||
sumResponseTimes = append(sumResponseTimes, item)
|
||||
}
|
||||
}()
|
||||
totalTokens := make([]int64, 0)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
startTime := formatTimeByMinute(start)
|
||||
endTime := formatTimeByMinute(end)
|
||||
date, summary, items, err := executor.TokenOverview(ctx, startTime, endTime, wheres)
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
result.TokenOverview = make([]*monitor_dto.TokenOverview, 0, len(items))
|
||||
result.AvgTokenOverview = make([]float64, 0, len(items))
|
||||
result.AvgTokenPerSubscriberOverview = make([]*monitor_dto.TokenFloatOverview, 0, len(items))
|
||||
var maxToken, minToken int64 = 0, 0
|
||||
for index, item := range items {
|
||||
if maxToken < item.TotalToken {
|
||||
maxToken = item.TotalToken
|
||||
}
|
||||
if minToken == 0 || minToken > item.TotalToken {
|
||||
minToken = item.TotalToken
|
||||
}
|
||||
result.TokenOverview = append(result.TokenOverview, &monitor_dto.TokenOverview{
|
||||
TotalToken: item.TotalToken,
|
||||
OutputToken: item.OutputToken,
|
||||
InputToken: item.InputToken,
|
||||
})
|
||||
totalTokens = append(totalTokens, item.TotalToken)
|
||||
consumerNum := consumerMap[date[index]]
|
||||
avgTotalPerSubscriber := 0.0
|
||||
avgOutputPerSubscriber := 0.0
|
||||
avgInputPerSubscriber := 0.0
|
||||
if consumerNum != 0 {
|
||||
avgTotalPerSubscriber = float64(item.TotalToken) / float64(consumerNum)
|
||||
avgOutputPerSubscriber = float64(item.OutputToken) / float64(consumerNum)
|
||||
avgInputPerSubscriber = float64(item.InputToken) / float64(consumerNum)
|
||||
if avgTotalPerSubscriber > result.MaxTokenPerSubscriber {
|
||||
result.MaxTokenPerSubscriber = avgTotalPerSubscriber
|
||||
}
|
||||
if result.MinTokenPerSubscriber == 0 || result.MinTokenPerSubscriber > avgTotalPerSubscriber {
|
||||
result.MinTokenPerSubscriber = avgTotalPerSubscriber
|
||||
}
|
||||
}
|
||||
|
||||
result.AvgTokenPerSubscriberOverview = append(result.AvgTokenPerSubscriberOverview, &monitor_dto.TokenFloatOverview{
|
||||
TotalToken: avgTotalPerSubscriber,
|
||||
OutputToken: avgOutputPerSubscriber,
|
||||
InputToken: avgInputPerSubscriber,
|
||||
})
|
||||
|
||||
}
|
||||
result.TokenTotal = summary.TotalToken
|
||||
result.InputTokenTotal = summary.InputToken
|
||||
result.OutputTokenTotal = summary.OutputToken
|
||||
}()
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(errChan)
|
||||
}()
|
||||
errs := make([]error, 0, 3)
|
||||
// 收集错误
|
||||
for err := range errChan {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
|
||||
if len(errs) > 0 {
|
||||
return nil, fmt.Errorf("errors occurred: %v", errs)
|
||||
}
|
||||
sumResponseTime := 0.0
|
||||
var maxTokenPerSecond, minTokenPerSecond float64 = 0, 0
|
||||
|
||||
for index, token := range totalTokens {
|
||||
var p float64 = 0
|
||||
if len(sumResponseTimes) > index && sumResponseTimes[index] > 0 {
|
||||
p = math.Round(float64(token)*1000*100/float64(sumResponseTimes[index])) / 100
|
||||
sumResponseTime += float64(sumResponseTimes[index])
|
||||
}
|
||||
result.AvgTokenOverview = append(result.AvgTokenOverview, p)
|
||||
if maxTokenPerSecond < p {
|
||||
maxTokenPerSecond = p
|
||||
}
|
||||
|
||||
if p > 0 && (minTokenPerSecond == 0 || minTokenPerSecond > p) {
|
||||
minTokenPerSecond = p
|
||||
}
|
||||
}
|
||||
if sumResponseTime > 0 {
|
||||
result.AvgToken = math.Round(float64(result.TokenTotal)*1000*100/sumResponseTime) / 100
|
||||
}
|
||||
result.MaxToken = maxTokenPerSecond
|
||||
result.MinToken = minTokenPerSecond
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (i *imlMonitorStatisticModule) RestChartOverview(ctx context.Context, serviceId string, start int64, end int64) (*monitor_dto.ChartRestOverview, error) {
|
||||
wheres, err := i.genOverviewWhere(ctx, serviceId, []string{"rest"})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
executor, err := i.getExecutor(ctx, cluster.DefaultClusterID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
_, consumerMap, err := executor.ConsumerOverview(ctx, formatTimeByMinute(start), formatTimeByMinute(end), wheres)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(3)
|
||||
errChan := make(chan error, 2)
|
||||
result := new(monitor_dto.ChartRestOverview)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
date, summary, items, err := executor.RequestOverview(ctx, formatTimeByMinute(start), formatTimeByMinute(end), wheres)
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
result.Date = utils.SliceToSlice(date, func(t time.Time) string {
|
||||
return t.Format("2006/01/02 15:04")
|
||||
})
|
||||
result.AvgRequestPerSubscriberOverview = make([]float64, 0, len(items))
|
||||
result.RequestOverview = make([]*monitor_dto.StatusCodeOverview, 0, len(items))
|
||||
for index, item := range items {
|
||||
consumerNum := consumerMap[date[index]]
|
||||
avgRequestPerSubscriber := 0.0
|
||||
if consumerNum != 0 {
|
||||
avgRequestPerSubscriber = float64(item.StatusTotal) / float64(consumerNum)
|
||||
if avgRequestPerSubscriber > result.MaxRequestPerSubscriber {
|
||||
result.MaxRequestPerSubscriber = avgRequestPerSubscriber
|
||||
}
|
||||
if result.MinRequestPerSubscriber == 0 || avgRequestPerSubscriber < result.MinRequestPerSubscriber {
|
||||
result.MinRequestPerSubscriber = avgRequestPerSubscriber
|
||||
}
|
||||
}
|
||||
|
||||
result.AvgRequestPerSubscriberOverview = append(result.AvgRequestPerSubscriberOverview, avgRequestPerSubscriber)
|
||||
result.RequestOverview = append(result.RequestOverview, &monitor_dto.StatusCodeOverview{
|
||||
Status2xx: item.Status2xx,
|
||||
Status4xx: item.Status4xx,
|
||||
Status5xx: item.Status5xx,
|
||||
})
|
||||
}
|
||||
|
||||
result.RequestTotal = summary.StatusTotal
|
||||
result.Request2xxTotal = summary.Status2xx
|
||||
result.Request4xxTotal = summary.Status4xx
|
||||
result.Request5xxTotal = summary.Status5xx
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
startTime := formatTimeByMinute(start)
|
||||
endTime := formatTimeByMinute(end)
|
||||
_, summary, items, err := executor.AvgResponseTimeOverview(ctx, startTime, endTime, wheres)
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
for _, item := range items {
|
||||
if item > result.MaxResponseTime {
|
||||
result.MaxResponseTime = item
|
||||
}
|
||||
|
||||
if item > 0 && (result.MinResponseTime == 0 || item < result.MinResponseTime) {
|
||||
result.MinResponseTime = item
|
||||
}
|
||||
}
|
||||
result.AvgResponseTimeOverview = items
|
||||
result.AvgResponseTime = summary.Avg
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
startTime := formatTimeByMinute(start)
|
||||
endTime := formatTimeByMinute(end)
|
||||
date, summary, items, err := executor.TrafficOverviewByStatusCode(ctx, startTime, endTime, wheres)
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
result.TrafficOverview = make([]*monitor_dto.StatusCodeOverview, 0, len(items))
|
||||
result.AvgTrafficPerSubscriberOverview = make([]float64, 0, len(items))
|
||||
for index, item := range items {
|
||||
result.TrafficOverview = append(result.TrafficOverview, &monitor_dto.StatusCodeOverview{
|
||||
Status2xx: item.Status2xx,
|
||||
Status4xx: item.Status4xx,
|
||||
Status5xx: item.Status5xx,
|
||||
})
|
||||
consumerNum := consumerMap[date[index]]
|
||||
avgTrafficPerSubscriber := 0.0
|
||||
if consumerNum != 0 {
|
||||
avgTrafficPerSubscriber = float64(item.StatusTotal) / float64(consumerNum)
|
||||
if avgTrafficPerSubscriber > result.MaxTrafficPerSubscriber {
|
||||
result.MaxTrafficPerSubscriber = avgTrafficPerSubscriber
|
||||
}
|
||||
if result.MinTrafficPerSubscriber == 0 || result.MinTrafficPerSubscriber > avgTrafficPerSubscriber {
|
||||
result.MinTrafficPerSubscriber = avgTrafficPerSubscriber
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
result.AvgTrafficPerSubscriberOverview = append(result.AvgTrafficPerSubscriberOverview, avgTrafficPerSubscriber)
|
||||
}
|
||||
result.TrafficTotal = summary.StatusTotal
|
||||
result.Traffic2xxTotal = summary.Status2xx
|
||||
result.Traffic4xxTotal = summary.Status4xx
|
||||
result.Traffic5xxTotal = summary.Status5xx
|
||||
|
||||
}()
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(errChan)
|
||||
}()
|
||||
errs := make([]error, 0, 3)
|
||||
// 收集错误
|
||||
for err := range errChan {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
|
||||
if len(errs) > 0 {
|
||||
return nil, fmt.Errorf("errors occurred: %v", errs)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func generateTopN(id string, name string, item *monitor.TopN, apiKind string) *monitor_dto.TopN {
|
||||
n := &monitor_dto.TopN{
|
||||
Id: id,
|
||||
Name: name,
|
||||
Request: common.FormatCountInt64(item.Request),
|
||||
}
|
||||
switch apiKind {
|
||||
case "rest":
|
||||
n.Traffic = common.FormatByte(item.Traffic)
|
||||
case "ai":
|
||||
n.Token = common.FormatCountInt64(item.Token)
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
func (i *imlMonitorStatisticModule) Top(ctx context.Context, serviceId string, start int64, end int64, limit int, apiKind string) ([]*monitor_dto.TopN, []*monitor_dto.TopN, error) {
|
||||
wheres, err := i.genOverviewWhere(ctx, serviceId, []string{apiKind})
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
executor, err := i.getExecutor(ctx, cluster.DefaultClusterID)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
errChan := make(chan error, 2)
|
||||
var wg sync.WaitGroup
|
||||
apisResult, consumersResult := make([]*monitor_dto.TopN, 0), make([]*monitor_dto.TopN, 0)
|
||||
var errs []error
|
||||
|
||||
wg.Add(2)
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
result, err := executor.TopN(ctx, formatTimeByMinute(start), formatTimeByMinute(end), limit, "api", wheres)
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
if len(result) < 1 {
|
||||
return
|
||||
}
|
||||
apiIds := utils.SliceToSlice(result, func(t *monitor.TopN) string {
|
||||
return t.Key
|
||||
})
|
||||
apis, err := i.apiService.ListInfo(ctx, apiIds...)
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
apiMap := utils.SliceToMap(apis, func(t *api.Info) string {
|
||||
return t.UUID
|
||||
})
|
||||
for _, item := range result {
|
||||
if v, ok := apiMap[item.Key]; ok {
|
||||
apisResult = append(apisResult, generateTopN(v.UUID, v.Name, item, apiKind))
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
result, err := executor.TopN(ctx, formatTimeByMinute(start), formatTimeByMinute(end), limit, "app", wheres)
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
if len(result) < 1 {
|
||||
return
|
||||
}
|
||||
appIds := utils.SliceToSlice(result, func(t *monitor.TopN) string {
|
||||
return t.Key
|
||||
})
|
||||
apps, err := i.serviceService.AppList(ctx, appIds...)
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
appMap := utils.SliceToMap(apps, func(t *service.Service) string {
|
||||
return t.Id
|
||||
})
|
||||
appMap["apipark-global"] = &service.Service{
|
||||
Id: "apipark-global",
|
||||
Name: "System Consumer",
|
||||
}
|
||||
for _, item := range result {
|
||||
if v, ok := appMap[item.Key]; ok {
|
||||
consumersResult = append(consumersResult, generateTopN(v.Id, v.Name, item, apiKind))
|
||||
}
|
||||
|
||||
}
|
||||
}()
|
||||
|
||||
// 收集所有错误
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(errChan)
|
||||
}()
|
||||
|
||||
// 收集错误
|
||||
for err := range errChan {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
|
||||
if len(errs) > 0 {
|
||||
return nil, nil, fmt.Errorf("errors occurred: %v", errs)
|
||||
}
|
||||
return apisResult, consumersResult, nil
|
||||
}
|
||||
|
||||
func (i *imlMonitorStatisticModule) ApiStatistics(ctx context.Context, input *monitor_dto.StatisticInput) ([]*monitor_dto.ApiStatisticBasicItem, error) {
|
||||
clusterId := cluster.DefaultClusterID
|
||||
_, err := i.clusterService.Get(ctx, clusterId)
|
||||
@@ -142,6 +579,10 @@ func (i *imlMonitorStatisticModule) SubscriberStatistics(ctx context.Context, in
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
apps = append(apps, &service.Service{
|
||||
Id: "apipark-global",
|
||||
Name: "System Consumer",
|
||||
})
|
||||
appIds := utils.SliceToSlice(apps, func(p *service.Service) string {
|
||||
return p.Id
|
||||
})
|
||||
@@ -350,18 +791,27 @@ func (i *imlMonitorStatisticModule) statisticOnApi(ctx context.Context, clusterI
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var service []*service.Service
|
||||
var services []*service.Service
|
||||
switch groupBy {
|
||||
case "app":
|
||||
service, err = i.serviceService.AppList(ctx)
|
||||
services, err = i.serviceService.AppList(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
services = append(services, &service.Service{
|
||||
Id: "apipark-global",
|
||||
Name: "System Consumer",
|
||||
})
|
||||
|
||||
case "provider":
|
||||
service, err = i.serviceService.ServiceList(ctx)
|
||||
services, err = i.serviceService.ServiceList(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
default:
|
||||
return nil, errors.New("invalid group by")
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
wheres, err := i.genCommonWheres(ctx, clusterId)
|
||||
if err != nil {
|
||||
@@ -379,7 +829,7 @@ func (i *imlMonitorStatisticModule) statisticOnApi(ctx context.Context, clusterI
|
||||
}
|
||||
|
||||
result := make([]*monitor_dto.ServiceStatisticBasicItem, 0, len(statisticMap))
|
||||
for _, item := range service {
|
||||
for _, item := range services {
|
||||
|
||||
statisticItem := &monitor_dto.ServiceStatisticBasicItem{
|
||||
Id: item.Id,
|
||||
@@ -445,17 +895,21 @@ func (i *imlMonitorStatisticModule) ApiStatisticsOnSubscriber(ctx context.Contex
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// 根据订阅ID查询订阅的服务列表
|
||||
subscriptions, err := i.subscribeService.MySubscribeServices(ctx, subscriberId, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
serviceIds := utils.SliceToSlice(subscriptions, func(t *subscribe.Subscribe) string {
|
||||
return t.Service
|
||||
})
|
||||
if len(serviceIds) < 1 {
|
||||
return nil, nil
|
||||
serviceIds := make([]string, 0)
|
||||
if subscriberId != "apipark-global" {
|
||||
// 根据订阅ID查询订阅的服务列表
|
||||
subscriptions, err := i.subscribeService.MySubscribeServices(ctx, subscriberId, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
serviceIds = utils.SliceToSlice(subscriptions, func(t *subscribe.Subscribe) string {
|
||||
return t.Service
|
||||
})
|
||||
if len(serviceIds) < 1 {
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
apiInfos, err := i.apiService.ListInfoForServices(ctx, serviceIds...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -43,6 +43,10 @@ type IMonitorStatisticModule interface {
|
||||
ApiStatisticsOnProvider(ctx context.Context, providerId string, input *monitor_dto.StatisticInput) ([]*monitor_dto.ApiStatisticBasicItem, error)
|
||||
ApiStatisticsOnSubscriber(ctx context.Context, subscriberId string, input *monitor_dto.StatisticInput) ([]*monitor_dto.ApiStatisticBasicItem, error)
|
||||
SubscriberStatisticsOnApi(ctx context.Context, apiId string, input *monitor_dto.StatisticInput) ([]*monitor_dto.ServiceStatisticBasicItem, error)
|
||||
|
||||
AIChartOverview(ctx context.Context, serviceId string, start int64, end int64) (*monitor_dto.ChartAIOverview, error)
|
||||
RestChartOverview(ctx context.Context, serviceId string, start int64, end int64) (*monitor_dto.ChartRestOverview, error)
|
||||
Top(ctx context.Context, serviceId string, start int64, end int64, limit int, apiKind string) ([]*monitor_dto.TopN, []*monitor_dto.TopN, error)
|
||||
}
|
||||
|
||||
type IMonitorConfigModule interface {
|
||||
|
||||
+29
-22
@@ -66,28 +66,28 @@ type imlPublishModule struct {
|
||||
|
||||
func (m *imlPublishModule) initGateway(ctx context.Context, partitionId string, clientDriver gateway.IClientDriver) error {
|
||||
return nil
|
||||
projects, err := m.serviceService.List(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
projectIds := utils.SliceToSlice(projects, func(p *service.Service) string {
|
||||
return p.Id
|
||||
})
|
||||
for _, projectId := range projectIds {
|
||||
releaseInfo, err := m.GetProjectRelease(ctx, projectId, partitionId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if releaseInfo == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
err = clientDriver.Project().Online(ctx, releaseInfo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
//projects, err := m.serviceService.List(ctx)
|
||||
//if err != nil {
|
||||
// return err
|
||||
//}
|
||||
//projectIds := utils.SliceToSlice(projects, func(p *service.Service) string {
|
||||
// return p.Id
|
||||
//})
|
||||
//for _, projectId := range projectIds {
|
||||
// releaseInfo, err := m.GetProjectRelease(ctx, projectId, partitionId)
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
// if releaseInfo == nil {
|
||||
// continue
|
||||
// }
|
||||
//
|
||||
// err = clientDriver.Project().Online(ctx, releaseInfo)
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
//}
|
||||
//return nil
|
||||
}
|
||||
|
||||
func (m *imlPublishModule) getProjectRelease(ctx context.Context, projectID string, commitId string) (*gateway.ProjectRelease, error) {
|
||||
@@ -110,6 +110,10 @@ func (m *imlPublishModule) getProjectRelease(ctx context.Context, projectID stri
|
||||
strategyCommitIds = append(strategyCommitIds, c.Commit)
|
||||
}
|
||||
}
|
||||
serviceInfo, err := m.serviceService.Get(ctx, projectID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
apiInfos, err := m.apiService.ListInfo(ctx, apiIds...)
|
||||
if err != nil {
|
||||
@@ -140,6 +144,9 @@ func (m *imlPublishModule) getProjectRelease(ctx context.Context, projectID stri
|
||||
},
|
||||
Path: a.Path,
|
||||
Methods: a.Methods,
|
||||
Labels: map[string]string{
|
||||
"api_kind": serviceInfo.Kind.String(),
|
||||
},
|
||||
//Service: a.Upstream,
|
||||
}
|
||||
if hasUpstream {
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
|
||||
type Item struct {
|
||||
Id string `json:"id"`
|
||||
Name string `json:"name"`
|
||||
Methods []string `json:"methods"`
|
||||
Protocols []string `json:"protocols"`
|
||||
Path string `json:"request_path"`
|
||||
|
||||
@@ -205,6 +205,7 @@ func (i *imlRouterModule) Search(ctx context.Context, keyword string, serviceId
|
||||
}
|
||||
return &router_dto.Item{
|
||||
Id: item.UUID,
|
||||
Name: item.Name,
|
||||
Methods: item.Methods,
|
||||
Protocols: protocols,
|
||||
Path: item.Path,
|
||||
|
||||
@@ -221,3 +221,80 @@ type ExportApp struct {
|
||||
Description string `json:"description"`
|
||||
Team string `json:"team"`
|
||||
}
|
||||
|
||||
type Overview struct {
|
||||
Id string `json:"id"`
|
||||
Name string `json:"name"`
|
||||
Description string `json:"description"`
|
||||
EnableMCP bool `json:"enable_mcp"`
|
||||
ServiceKind string `json:"service_kind"`
|
||||
SubscriberNum int64 `json:"subscriber_num"`
|
||||
InvokeNum int64 `json:"invoke_num"`
|
||||
Logo string `json:"logo"`
|
||||
AvailableMonitor bool `json:"available_monitor"`
|
||||
IsReleased bool `json:"is_released"`
|
||||
Catalogue auto.Label `json:"catalogue" aolabel:"catalogue"`
|
||||
APINum int64 `json:"api_num"`
|
||||
}
|
||||
|
||||
type AILogItem struct {
|
||||
Id string `json:"id"`
|
||||
API auto.Label `json:"api" aolabel:"api"`
|
||||
Status int64 `json:"status"`
|
||||
LogTime auto.TimeLabel `json:"log_time"`
|
||||
Ip string `json:"ip"`
|
||||
Token int64 `json:"token"`
|
||||
TokenPerSecond int64 `json:"token_per_second"`
|
||||
Consumer auto.Label `json:"consumer" aolabel:"service"`
|
||||
Provider auto.Label `json:"provider" aolabel:"ai_provider"`
|
||||
Model string `json:"model"`
|
||||
}
|
||||
type RestLogItem struct {
|
||||
Id string `json:"id"`
|
||||
API auto.Label `json:"api" aolabel:"api"`
|
||||
Status int64 `json:"status"`
|
||||
LogTime auto.TimeLabel `json:"log_time"`
|
||||
Ip string `json:"ip"`
|
||||
Consumer auto.Label `json:"consumer" aolabel:"service"`
|
||||
ResponseTime string `json:"response_time"`
|
||||
Traffic string `json:"traffic"`
|
||||
}
|
||||
|
||||
type RestLogInfo struct {
|
||||
Id string `json:"id"`
|
||||
API auto.Label `json:"api" aolabel:"api"`
|
||||
Consumer auto.Label `json:"consumer" aolabel:"service"`
|
||||
IsSystemConsumer bool `json:"is_system_consumer"`
|
||||
Status int64 `json:"status"`
|
||||
Ip string `json:"ip"`
|
||||
ResponseTime string `json:"response_time"`
|
||||
Traffic string `json:"traffic"`
|
||||
LogTime auto.TimeLabel `json:"log_time"`
|
||||
Request OriginRequest `json:"request"`
|
||||
Response OriginRequest `json:"response"`
|
||||
}
|
||||
|
||||
type AILogInfo struct {
|
||||
Id string `json:"id"`
|
||||
API auto.Label `json:"api" aolabel:"api"`
|
||||
Consumer auto.Label `json:"consumer" aolabel:"service"`
|
||||
IsSystemConsumer bool `json:"is_system_consumer"`
|
||||
Status int64 `json:"status"`
|
||||
Ip string `json:"ip"`
|
||||
Provider auto.Label `json:"provider" aolabel:"ai_provider"`
|
||||
Model string `json:"model"`
|
||||
LogTime auto.TimeLabel `json:"log_time"`
|
||||
Request OriginAIRequest `json:"request"`
|
||||
Response OriginAIRequest `json:"response"`
|
||||
}
|
||||
|
||||
type OriginRequest struct {
|
||||
Header string `json:"header"`
|
||||
Origin string `json:"origin"`
|
||||
Body string `json:"body"`
|
||||
}
|
||||
|
||||
type OriginAIRequest struct {
|
||||
OriginRequest
|
||||
Token int64 `json:"token"`
|
||||
}
|
||||
|
||||
+215
-6
@@ -5,10 +5,13 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/APIParkLab/APIPark/common"
|
||||
|
||||
"github.com/mitchellh/mapstructure"
|
||||
|
||||
"github.com/eolinker/go-common/register"
|
||||
@@ -27,6 +30,7 @@ import (
|
||||
model_runtime "github.com/APIParkLab/APIPark/ai-provider/model-runtime"
|
||||
|
||||
"github.com/APIParkLab/APIPark/resources/access"
|
||||
log_service "github.com/APIParkLab/APIPark/service/log"
|
||||
"github.com/eolinker/eosc/log"
|
||||
"github.com/eolinker/go-common/server"
|
||||
|
||||
@@ -79,14 +83,217 @@ type imlServiceModule struct {
|
||||
tagService tag.ITagService `autowired:""`
|
||||
localModelService ai_local.ILocalModelService `autowired:""`
|
||||
|
||||
serviceTagService service_tag.ITagService `autowired:""`
|
||||
apiService api.IAPIService `autowired:""`
|
||||
apiDocService api_doc.IAPIDocService `autowired:""`
|
||||
clusterService cluster.IClusterService `autowired:""`
|
||||
transaction store.ITransaction `autowired:""`
|
||||
serviceTagService service_tag.ITagService `autowired:""`
|
||||
apiService api.IAPIService `autowired:""`
|
||||
apiDocService api_doc.IAPIDocService `autowired:""`
|
||||
clusterService cluster.IClusterService `autowired:""`
|
||||
subscribeServer subscribe.ISubscribeService `autowired:""`
|
||||
|
||||
releaseService release.IReleaseService `autowired:""`
|
||||
serviceModelMappingService service_model_mapping.IServiceModelMappingService `autowired:""`
|
||||
logService log_service.ILogService `autowired:""`
|
||||
|
||||
transaction store.ITransaction `autowired:""`
|
||||
}
|
||||
|
||||
func formatHeader(header string) string {
|
||||
result, err := url.QueryUnescape(header)
|
||||
if err != nil {
|
||||
return header
|
||||
}
|
||||
result = strings.ReplaceAll(result, "&", "\n")
|
||||
result = strings.ReplaceAll(result, "=", ": ")
|
||||
return result
|
||||
}
|
||||
|
||||
func (i *imlServiceModule) RestLogInfo(ctx context.Context, serviceId string, logId string) (*service_dto.RestLogInfo, error) {
|
||||
c, err := i.clusterService.Get(ctx, cluster.DefaultClusterID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cluster %s not found", cluster.DefaultClusterID)
|
||||
}
|
||||
info, err := i.logService.LogInfo(ctx, "loki", c.Cluster, logId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if info.Service != serviceId {
|
||||
return nil, errors.New("service not match")
|
||||
}
|
||||
|
||||
logInfo := &service_dto.RestLogInfo{
|
||||
Id: info.ID,
|
||||
API: auto.UUID(info.API),
|
||||
Consumer: auto.UUID(info.Consumer),
|
||||
Status: info.StatusCode,
|
||||
Ip: info.RemoteIP,
|
||||
ResponseTime: common.FormatTime(info.ResponseTime),
|
||||
Traffic: common.FormatByte(info.Traffic),
|
||||
LogTime: auto.TimeLabel(info.RecordTime),
|
||||
Request: service_dto.OriginRequest{
|
||||
Header: formatHeader(info.RequestHeader),
|
||||
Origin: info.RequestBody,
|
||||
Body: info.RequestBody,
|
||||
},
|
||||
Response: service_dto.OriginRequest{
|
||||
Header: formatHeader(info.ResponseHeader),
|
||||
Origin: info.ResponseBody,
|
||||
Body: info.ResponseBody,
|
||||
},
|
||||
}
|
||||
|
||||
if info.Consumer == "apipark-global" {
|
||||
logInfo.IsSystemConsumer = true
|
||||
logInfo.Consumer = auto.Label{
|
||||
Id: info.Consumer,
|
||||
Name: "System Consumer",
|
||||
}
|
||||
}
|
||||
return logInfo, nil
|
||||
}
|
||||
|
||||
func (i *imlServiceModule) AILogInfo(ctx context.Context, serviceId string, logId string) (*service_dto.AILogInfo, error) {
|
||||
c, err := i.clusterService.Get(ctx, cluster.DefaultClusterID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cluster %s not found", cluster.DefaultClusterID)
|
||||
}
|
||||
info, err := i.logService.LogInfo(ctx, "loki", c.Cluster, logId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if info.Service != serviceId {
|
||||
return nil, errors.New("service not match")
|
||||
}
|
||||
response, err := parseAIResponse(info.ResponseBody)
|
||||
if err != nil {
|
||||
response = info.ResponseBody
|
||||
}
|
||||
|
||||
logInfo := &service_dto.AILogInfo{
|
||||
Id: info.ID,
|
||||
API: auto.UUID(info.API),
|
||||
Consumer: auto.UUID(info.Consumer),
|
||||
Status: info.StatusCode,
|
||||
Ip: info.RemoteIP,
|
||||
Provider: auto.UUID(info.AIProvider),
|
||||
Model: info.AIModel,
|
||||
LogTime: auto.TimeLabel(info.RecordTime),
|
||||
Request: service_dto.OriginAIRequest{
|
||||
OriginRequest: service_dto.OriginRequest{
|
||||
Header: formatHeader(info.RequestHeader),
|
||||
Origin: info.RequestBody,
|
||||
Body: parseAIRequest(info.RequestBody),
|
||||
},
|
||||
|
||||
Token: info.InputToken,
|
||||
},
|
||||
Response: service_dto.OriginAIRequest{
|
||||
OriginRequest: service_dto.OriginRequest{
|
||||
Header: formatHeader(info.ResponseHeader),
|
||||
Origin: info.ResponseBody,
|
||||
Body: response,
|
||||
},
|
||||
|
||||
Token: info.OutputToken,
|
||||
},
|
||||
}
|
||||
if info.Consumer == "apipark-global" {
|
||||
logInfo.IsSystemConsumer = true
|
||||
logInfo.Consumer = auto.Label{
|
||||
Id: info.Consumer,
|
||||
Name: "System Consumer",
|
||||
}
|
||||
}
|
||||
return logInfo, nil
|
||||
}
|
||||
|
||||
func (i *imlServiceModule) RestLogs(ctx context.Context, serviceId string, start int64, end int64, page int, size int) ([]*service_dto.RestLogItem, int64, error) {
|
||||
list, total, err := i.logService.LogRecordsByService(ctx, serviceId, time.Unix(start, 0), time.Unix(end, 0), page, size)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
return utils.SliceToSlice(list, func(s *log_service.Item) *service_dto.RestLogItem {
|
||||
item := &service_dto.RestLogItem{
|
||||
Id: s.ID,
|
||||
API: auto.UUID(s.API),
|
||||
Status: s.StatusCode,
|
||||
LogTime: auto.TimeLabel(s.RecordTime),
|
||||
Ip: s.RemoteIP,
|
||||
Consumer: auto.UUID(s.Consumer),
|
||||
ResponseTime: common.FormatTime(s.ResponseTime),
|
||||
Traffic: common.FormatByte(s.Traffic),
|
||||
}
|
||||
if s.Consumer == "apipark-global" {
|
||||
item.Consumer = auto.Label{
|
||||
Id: s.Consumer,
|
||||
Name: "System Consumer",
|
||||
}
|
||||
}
|
||||
return item
|
||||
}), total, nil
|
||||
}
|
||||
|
||||
func (i *imlServiceModule) AILogs(ctx context.Context, serviceId string, start int64, end int64, page int, size int) ([]*service_dto.AILogItem, int64, error) {
|
||||
list, total, err := i.logService.LogRecordsByService(ctx, serviceId, time.Unix(start, 0), time.Unix(end, 0), page, size)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
return utils.SliceToSlice(list, func(s *log_service.Item) *service_dto.AILogItem {
|
||||
item := &service_dto.AILogItem{
|
||||
Id: s.ID,
|
||||
API: auto.UUID(s.API),
|
||||
Status: s.StatusCode,
|
||||
LogTime: auto.TimeLabel(s.RecordTime),
|
||||
Ip: s.RemoteIP,
|
||||
Token: s.TotalToken,
|
||||
TokenPerSecond: s.TotalToken * 1000 / s.ResponseTime,
|
||||
Consumer: auto.UUID(s.Consumer),
|
||||
Provider: auto.UUID(s.AIProvider),
|
||||
Model: s.AIModel,
|
||||
}
|
||||
if s.Consumer == "apipark-global" {
|
||||
item.Consumer = auto.Label{
|
||||
Id: s.Consumer,
|
||||
Name: "System Consumer",
|
||||
}
|
||||
}
|
||||
return item
|
||||
}), total, nil
|
||||
}
|
||||
|
||||
func (i *imlServiceModule) ServiceOverview(ctx context.Context, id string) (*service_dto.Overview, error) {
|
||||
info, err := i.serviceService.Get(ctx, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
apiCountMap, err := i.apiDocService.APICountByServices(ctx, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
subscribeMap, err := i.subscribeServer.CountMapByService(ctx, subscribe.ApplyStatusSubscribe, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result := &service_dto.Overview{
|
||||
Id: info.Id,
|
||||
Name: info.Name,
|
||||
Description: info.Description,
|
||||
EnableMCP: info.EnableMCP,
|
||||
ServiceKind: info.Kind.String(),
|
||||
SubscriberNum: subscribeMap[id],
|
||||
Logo: info.Logo,
|
||||
Catalogue: auto.UUID(info.Catalogue),
|
||||
APINum: apiCountMap[id],
|
||||
}
|
||||
_, err = i.releaseService.GetRunning(ctx, id)
|
||||
if err != nil {
|
||||
if !errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
result.IsReleased = true
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (i *imlServiceModule) OnInit() {
|
||||
@@ -746,7 +953,9 @@ func (i *imlServiceModule) Delete(ctx context.Context, id string) error {
|
||||
Id: id,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
if err.Error() != "nil" {
|
||||
return err
|
||||
}
|
||||
}
|
||||
err = client.Subscribe().Offline(ctx, &gateway.SubscribeRelease{
|
||||
Service: id,
|
||||
|
||||
@@ -34,6 +34,16 @@ type IServiceModule interface {
|
||||
|
||||
//MySimple 获取我的简易项目列表
|
||||
MySimple(ctx context.Context) ([]*service_dto.SimpleServiceItem, error)
|
||||
|
||||
ServiceOverview(ctx context.Context, id string) (*service_dto.Overview, error)
|
||||
ILogModule
|
||||
}
|
||||
|
||||
type ILogModule interface {
|
||||
AILogs(ctx context.Context, serviceId string, start int64, end int64, page int, size int) ([]*service_dto.AILogItem, int64, error)
|
||||
RestLogs(ctx context.Context, serviceId string, start int64, end int64, page int, size int) ([]*service_dto.RestLogItem, int64, error)
|
||||
RestLogInfo(ctx context.Context, serviceId string, logId string) (*service_dto.RestLogInfo, error)
|
||||
AILogInfo(ctx context.Context, serviceId string, logId string) (*service_dto.AILogInfo, error)
|
||||
}
|
||||
|
||||
type IServiceDocModule interface {
|
||||
|
||||
@@ -0,0 +1,115 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// ChatCompletionChunk represents the structure of a single chunk in the streaming response
|
||||
type ChatCompletionChunk struct {
|
||||
Object string `json:"object"`
|
||||
Choices []Choice `json:"choices"`
|
||||
}
|
||||
|
||||
// ChatCompletion represents the structure of a non-streaming response
|
||||
type ChatCompletion struct {
|
||||
Object string `json:"object"`
|
||||
Choices []FullChoice `json:"choices"`
|
||||
}
|
||||
|
||||
// Choice represents a choice in the streaming chunk
|
||||
type Choice struct {
|
||||
Delta Delta `json:"delta"`
|
||||
FinishReason *string `json:"finish_reason"`
|
||||
}
|
||||
|
||||
// FullChoice represents a choice in the non-streaming response
|
||||
type FullChoice struct {
|
||||
Message Message `json:"message"`
|
||||
}
|
||||
|
||||
// Delta represents the delta content in a streaming choice
|
||||
type Delta struct {
|
||||
Content string `json:"content"`
|
||||
Role string `json:"role,omitempty"`
|
||||
}
|
||||
|
||||
// Message represents the message content in a non-streaming choice
|
||||
type Message struct {
|
||||
Content string `json:"content"`
|
||||
Role string `json:"role"`
|
||||
}
|
||||
|
||||
// ParseAIResponse parses both streaming and non-streaming AI responses and returns the concatenated content
|
||||
func parseAIResponse(input string) (string, error) {
|
||||
// First, try to parse as a non-streaming response
|
||||
var nonStreaming ChatCompletion
|
||||
if err := json.Unmarshal([]byte(input), &nonStreaming); err == nil && nonStreaming.Object == "chat.completion" {
|
||||
var result strings.Builder
|
||||
for _, choice := range nonStreaming.Choices {
|
||||
result.WriteString(choice.Message.Content)
|
||||
}
|
||||
return result.String(), nil
|
||||
}
|
||||
|
||||
// If not non-streaming, parse as streaming response
|
||||
var result strings.Builder
|
||||
scanner := bufio.NewScanner(strings.NewReader(input))
|
||||
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
// Skip empty lines or [DONE]
|
||||
if line == "" || line == "data: [DONE]" {
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if line starts with "data: "
|
||||
if !strings.HasPrefix(line, "data: ") {
|
||||
continue
|
||||
}
|
||||
|
||||
// Extract JSON data
|
||||
jsonData := strings.TrimPrefix(line, "data: ")
|
||||
var chunk ChatCompletionChunk
|
||||
if err := json.Unmarshal([]byte(jsonData), &chunk); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// Process each choice
|
||||
for _, choice := range chunk.Choices {
|
||||
// Append content from delta
|
||||
result.WriteString(choice.Delta.Content)
|
||||
// Check if this is the final chunk
|
||||
if choice.FinishReason != nil && *choice.FinishReason == "stop" {
|
||||
return result.String(), nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := scanner.Err(); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return result.String(), nil
|
||||
}
|
||||
|
||||
func parseAIRequest(ori string) string {
|
||||
type aiRequest struct {
|
||||
Messages []struct {
|
||||
Role string `json:"role"`
|
||||
Content string `json:"content"`
|
||||
} `json:"messages"`
|
||||
}
|
||||
var req aiRequest
|
||||
err := json.Unmarshal([]byte(ori), &req)
|
||||
if err != nil {
|
||||
return ori
|
||||
}
|
||||
size := len(req.Messages)
|
||||
if size == 0 {
|
||||
return ""
|
||||
}
|
||||
|
||||
return req.Messages[size-1].Content
|
||||
}
|
||||
@@ -22,5 +22,10 @@ func (p *plugin) monitorStatisticApis() []pm3.Api {
|
||||
pm3.CreateApiWidthDoc(http.MethodPost, "/api/v1/monitor/:data_type/trend", []string{"context", "rest:data_type", "query:id", "body"}, []string{"tendency", "time_interval"}, p.monitorStatisticController.InvokeTrend),
|
||||
pm3.CreateApiWidthDoc(http.MethodPost, "/api/v1/monitor/:data_type/trend/:typ", []string{"context", "rest:data_type", "rest:typ", "query:api", "query:provider", "query:subscriber", "body"}, []string{"tendency", "time_interval"}, p.monitorStatisticController.InvokeTrendInner),
|
||||
pm3.CreateApiWidthDoc(http.MethodPost, "/api/v1/monitor/:data_type/statistics/:typ", []string{"context", "rest:data_type", "rest:typ", "query:id", "body"}, []string{"statistics"}, p.monitorStatisticController.StatisticsInner),
|
||||
|
||||
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/monitor/overview/chart/rest", []string{"context", "query:start", "query:end"}, []string{"overview"}, p.monitorStatisticController.ChartRestOverview, access.SystemAnalysisRunViewView),
|
||||
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/monitor/overview/chart/ai", []string{"context", "query:start", "query:end"}, []string{"overview"}, p.monitorStatisticController.ChartAIOverview, access.SystemAnalysisRunViewView),
|
||||
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/monitor/overview/top10/rest", []string{"context", "query:start", "query:end", "query:limit"}, []string{"apis", "consumers"}, p.monitorStatisticController.RestTopN, access.SystemAnalysisRunViewView),
|
||||
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/monitor/overview/top10/ai", []string{"context", "query:start", "query:end", "query:limit"}, []string{"apis", "consumers"}, p.monitorStatisticController.AITopN, access.SystemAnalysisRunViewView),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,5 +39,15 @@ func (p *plugin) ServiceApis() []pm3.Api {
|
||||
pm3.CreateApiSimple(http.MethodGet, "/api/v1/service/swagger/:id", p.serviceController.Swagger),
|
||||
pm3.CreateApiSimple(http.MethodGet, "/api/v1/service/apidoc/:id", p.serviceController.Swagger),
|
||||
pm3.CreateApiSimple(http.MethodGet, "/api/v1/export/openapi/:id", p.serviceController.ExportSwagger),
|
||||
|
||||
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/monitor/top10", []string{"context", "query:service", "query:start", "query:end"}, []string{"apis", "consumers"}, p.serviceController.Top10, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView),
|
||||
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/overview/monitor/ai", []string{"context", "query:service", "query:start", "query:end"}, []string{"overview"}, p.serviceController.AIChartOverview, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView),
|
||||
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/overview/monitor/rest", []string{"context", "query:service", "query:start", "query:end"}, []string{"overview"}, p.serviceController.RestChartOverview, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView),
|
||||
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/overview/basic", []string{"context", "query:service"}, []string{"overview"}, p.serviceController.ServiceOverview, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView),
|
||||
|
||||
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/logs/ai", []string{"context", "query:service", "query:start", "query:end", "query:page", "query:page_size"}, []string{"logs", "total"}, p.serviceController.AILogs, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView),
|
||||
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/logs/rest", []string{"context", "query:service", "query:start", "query:end", "query:page", "query:page_size"}, []string{"logs", "total"}, p.serviceController.RestLogs, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView),
|
||||
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/log/rest", []string{"context", "query:service", "query:log"}, []string{"log"}, p.serviceController.RestLogInfo, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView),
|
||||
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/log/ai", []string{"context", "query:service", "query:log"}, []string{"log"}, p.serviceController.AILogInfo, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView),
|
||||
}
|
||||
}
|
||||
|
||||
+8
-4
@@ -1,10 +1,14 @@
|
||||
# 名称:apipark通用镜像
|
||||
# 创建时间:2022-10-25
|
||||
FROM centos:7.9.2009
|
||||
MAINTAINER liujian
|
||||
FROM alpine:latest
|
||||
|
||||
RUN sed -i 's|https://dl-cdn.alpinelinux.org/alpine|https://mirrors.aliyun.com/alpine|g' /etc/apk/repositories \
|
||||
&& apk update \
|
||||
&& apk add --no-cache curl tzdata bind-tools
|
||||
|
||||
ENV TZ=Asia/Shanghai
|
||||
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
|
||||
|
||||
RUN ln -sf /usr/share/zoneinfo/${TZ} /etc/localtime \
|
||||
&& echo "Asia/Shanghai" > /etc/timezone
|
||||
|
||||
ARG APP
|
||||
|
||||
|
||||
+1
-1
@@ -73,7 +73,7 @@ build_frontend() {
|
||||
echo_info "Install dependencies..."
|
||||
pnpm install --registry https://registry.npmmirror.com --dir ./frontend
|
||||
echo_info "Build frontend..."
|
||||
cd ./frontend && pnpm run build
|
||||
cd ./frontend && pnpm run build --verbose
|
||||
cd ..
|
||||
else
|
||||
echo_info "Need not build frontend."
|
||||
|
||||
@@ -10,6 +10,11 @@ BuildMode=$3
|
||||
if [[ "${BuildMode}" == "" ]];then
|
||||
BuildMode="all"
|
||||
fi
|
||||
|
||||
if [[ "${ARCH}" == "" ]];then
|
||||
ARCH="amd64"
|
||||
fi
|
||||
|
||||
# 编译可执行文件
|
||||
./scripts/build.sh "cmd" "" "${BuildMode}" ${ARCH}
|
||||
|
||||
@@ -22,12 +27,11 @@ mkdir -p scripts/cmd/ && cp cmd/${APP} scripts/cmd/ && cp cmd/apipark_ai_event_l
|
||||
VERSION=$(gen_version)
|
||||
|
||||
|
||||
if [[ "${ARCH}" == "" ]];then
|
||||
ARCH="amd64"
|
||||
fi
|
||||
|
||||
OPTIONS=""
|
||||
if [[ "${ARCH}" == "arm" ]];then
|
||||
SYS_ARCH=$(arch)
|
||||
if [[ (${SYS_ARCH} == "aarch64" || ${SYS_ARCH} == "arm64") && $ARCH == "amd64" ]];then
|
||||
OPTIONS="--platform=linux/amd64"
|
||||
elif [[ ${SYS_ARCH} == "amd64" && $ARCH == "arm64" ]];then
|
||||
OPTIONS="--platform=linux/arm64"
|
||||
fi
|
||||
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
#!/bin/sh
|
||||
|
||||
set -e
|
||||
|
||||
source ./init_config.sh
|
||||
|
||||
OLD_IFS="$IFS"
|
||||
IFS=","
|
||||
arr=(${REDIS_ADDR})
|
||||
IFS="$OLD_IFS"
|
||||
|
||||
|
||||
@@ -21,10 +20,11 @@ echo -e "redis:" >> config.yml
|
||||
echo -e " user_name: ${REDIS_USER_NAME}" >> config.yml
|
||||
echo -e " password: ${REDIS_PWD}" >> config.yml
|
||||
echo -e " addr: " >> config.yml
|
||||
for s in ${arr[@]}
|
||||
for s in $REDIS_ADDR
|
||||
do
|
||||
echo -e " - $s" >> config.yml
|
||||
done
|
||||
|
||||
echo -e "nsq:" >> config.yml
|
||||
echo -e " addr: ${NSQ_ADDR}" >> config.yml
|
||||
echo -e " topic_prefix: ${NSQ_TOPIC_PREFIX}" >> config.yml
|
||||
@@ -38,5 +38,31 @@ echo -e " log_period: ${ERROR_PERIOD}" >> config.yml
|
||||
|
||||
cat config.yml
|
||||
nohup ./apipark >> run.log 2>&1 &
|
||||
wait_for_apipark
|
||||
|
||||
nohup ./apipark_ai_event_listen >> run.log 2>&1 &
|
||||
|
||||
if [[ ${Init} == "true" ]];then
|
||||
login_apipark
|
||||
r=$(is_init)
|
||||
if [[ $r == "true" ]];then
|
||||
echo "Already initialized, skipping initialization."
|
||||
else
|
||||
wait_for_influxdb
|
||||
|
||||
wait_for_apinto
|
||||
set_cluster
|
||||
|
||||
wait_for_influxdb
|
||||
set_influxdb
|
||||
|
||||
set_loki
|
||||
set_nsq
|
||||
set_openapi_config
|
||||
# 重启apipark
|
||||
kill -9 $(pgrep apipark)
|
||||
nohup ./apipark >> run.log 2>&1 &
|
||||
fi
|
||||
fi
|
||||
|
||||
tail -F run.log
|
||||
Executable
+292
@@ -0,0 +1,292 @@
|
||||
#!/bin/sh
|
||||
|
||||
Cookie=""
|
||||
|
||||
if [[ "$ApiparkAddress" == "" ]]; then
|
||||
ApiparkAddress="http://127.0.0.1:8288"
|
||||
fi
|
||||
if [[ "${ApintoAddress}" == "" ]]; then
|
||||
ApintoAddress="http://apipark-apinto:9400"
|
||||
fi
|
||||
if [[ "$InfluxdbAddress" == "" ]]; then
|
||||
InfluxdbAddress="http://apipark-influxdb:8086"
|
||||
fi
|
||||
|
||||
|
||||
|
||||
if [[ "$NSQAddress" == "" ]]; then
|
||||
NSQAddress="apipark-nsq:4150"
|
||||
fi
|
||||
if [[ "$LokiAddress" == "" ]]; then
|
||||
LokiAddress="http://apipark-loki:3100"
|
||||
fi
|
||||
|
||||
echo_fail() {
|
||||
printf "\e[91m✘ Error:\e[0m $@\n" >&2
|
||||
}
|
||||
|
||||
echo_pass() {
|
||||
printf "\e[92m✔ Passed:\e[0m $@\n" >&2
|
||||
}
|
||||
|
||||
echo_warn() {
|
||||
printf "\e[93m⚠ Warning:\e[0m $@\n" >&2
|
||||
}
|
||||
|
||||
echo_pause() {
|
||||
printf "\e[94m⏸ Pause:\e[0m $1\n" >&2
|
||||
}
|
||||
|
||||
echo_question() {
|
||||
printf "\e[95m? Question:\e[0m $@\n" >&2
|
||||
}
|
||||
|
||||
echo_info() {
|
||||
printf "\e[96mℹ Info:\e[0m $1\n" >&2
|
||||
}
|
||||
|
||||
echo_point() {
|
||||
printf "\e[94m➜ Point:\e[0m $1\n" >&2
|
||||
}
|
||||
|
||||
echo_bullet() {
|
||||
printf "\e[94m• Step:\e[0m $1\n" >&2
|
||||
}
|
||||
|
||||
echo_wait() {
|
||||
printf "\e[95m⏳ Waiting:\e[0m $1\n" >&2
|
||||
}
|
||||
|
||||
echo_split() {
|
||||
echo "" >&2
|
||||
echo "" >&2
|
||||
echo -e "\e[94m────────────────────────────────────────────────────────────\e[0m" >&2
|
||||
}
|
||||
|
||||
request_apipark() {
|
||||
path=$1
|
||||
body=$2
|
||||
method=$3
|
||||
if [[ "$method" == "" ]]; then
|
||||
method="POST"
|
||||
fi
|
||||
if [[ "$Cookie" == "" ]]; then
|
||||
cmd="curl -X ${method} -s -i -H \"Content-Type: application/json\" -d '$body' \"${ApiparkAddress}${path}\""
|
||||
echo_info "Executing: $cmd" # 打印命令
|
||||
response=$(eval "$cmd")
|
||||
else
|
||||
cmd="curl -X ${method} -s -i -H \"Content-Type: application/json\" -H \"Cookie: $Cookie\" -d '$body' \"${ApiparkAddress}${path}\""
|
||||
echo_info "Executing: $cmd" # 打印命令
|
||||
response=$(eval "$cmd")
|
||||
fi
|
||||
echo "$response"
|
||||
}
|
||||
|
||||
request_apinto() {
|
||||
path="$1"
|
||||
body="$2"
|
||||
echo_info "Executing: curl -i -X POST -H \"Content-Type: application/json\" \"${ApintoAddress}${path}\" -d '$body'"
|
||||
response=$(curl -i -X POST -H "Content-Type: application/json" "${ApintoAddress}${path}" -d "$body")
|
||||
status_code=$(echo "$response" | grep -E 'HTTP/[0-9.]+ [0-9]+' | awk '{print $2}' || echo "0")
|
||||
echo_info "$response"
|
||||
echo "$status_code"
|
||||
}
|
||||
|
||||
login_apipark() {
|
||||
# 执行登录请求并捕获响应头
|
||||
body='{"name":"admin","password":"'"${ADMIN_PASSWORD}"'"}'
|
||||
response=$(request_apipark "/api/v1/account/login/username" "$body")
|
||||
|
||||
# 从响应中提取 Set-Cookie 头
|
||||
cookie=$(echo "$response" | grep -i "Set-Cookie" | sed 's/Set-Cookie: //;s/;.*//')
|
||||
|
||||
# 提取 JSON 主体(假设 JSON 在最后一行或响应中可识别)
|
||||
json_body=$(echo "$response" | grep '^{.*}$')
|
||||
# 提取 code 值
|
||||
code=$(echo "$json_body" | sed 's/.*"code":\([0-9]*\).*/\1/')
|
||||
|
||||
# 检查 code 是否为 0
|
||||
if [ "$code" -eq 0 ]; then
|
||||
Cookie=$cookie
|
||||
echo_pass "login success"
|
||||
else
|
||||
echo_fail "login failed: $json_body"
|
||||
exit 1
|
||||
fi
|
||||
}
|
||||
|
||||
set_cluster() {
|
||||
# 设置集群地址
|
||||
body='{"manager_address":"'"${ApintoAddress}"'"}'
|
||||
path="/api/v1/cluster/reset"
|
||||
response=$(request_apipark "$path" "$body" "PUT")
|
||||
# 从响应中提取 code
|
||||
code=$(echo "${response}" | grep '^{.*}$' | sed 's/.*"code":\([0-9]*\).*/\1/')
|
||||
if [ "$code" -eq 0 ]; then
|
||||
echo_pass "Set cluster successfully"
|
||||
else
|
||||
echo_fail "Set cluster failed: ${response}"
|
||||
exit 1
|
||||
fi
|
||||
}
|
||||
|
||||
set_loki() {
|
||||
# 设置 loki 地址
|
||||
body='{"config":{"url":"'"${LokiAddress}"'"}}'
|
||||
path="/api/v1/log/loki"
|
||||
response=$(request_apipark "$path" "$body")
|
||||
# 从响应中提取 code
|
||||
code=$(echo "${response}" | grep '^{.*}$' | sed 's/.*"code":\([0-9]*\).*/\1/')
|
||||
if [ "$code" -eq 0 ]; then
|
||||
echo_pass "Set loki successfully"
|
||||
else
|
||||
echo_fail "Set loki failed: ${response}"
|
||||
exit 1
|
||||
fi
|
||||
}
|
||||
|
||||
set_nsq() {
|
||||
body="{
|
||||
\"address\": [
|
||||
\"${NSQAddress}\"
|
||||
],
|
||||
\"description\": \"auto init nsqd config\",
|
||||
\"driver\": \"nsqd\",
|
||||
\"formatter\": {
|
||||
\"ai\": [
|
||||
\"\$ai_provider\",
|
||||
\"\$ai_model\",
|
||||
\"\$ai_model_input_token\",
|
||||
\"\$ai_model_output_token\",
|
||||
\"\$ai_model_total_token\",
|
||||
\"\$ai_model_cost\",
|
||||
\"\$ai_provider_statuses\"
|
||||
],
|
||||
\"fields\": [
|
||||
\"\$time_iso8601\",
|
||||
\"\$request_id\",
|
||||
\"\$api\",
|
||||
\"\$provider\",
|
||||
\"@ai\"
|
||||
]
|
||||
},
|
||||
\"scopes\": [
|
||||
\"access_log\"
|
||||
],
|
||||
\"topic\": \"apipark_ai_event\",
|
||||
\"type\": \"json\"
|
||||
}"
|
||||
|
||||
status_code=$(request_apinto "/api/output/ai_event" "$body")
|
||||
echo "Status code: $status_code"
|
||||
if [ "$status_code" -eq 200 ]; then
|
||||
echo_pass "Update nsq successfully"
|
||||
else
|
||||
echo_fail "Update nsq failed: ${status_code}"
|
||||
exit 1
|
||||
fi
|
||||
}
|
||||
|
||||
set_influxdb() {
|
||||
if [ -z "$InfluxdbToken" ]; then
|
||||
echo_fail "Influxdb token is empty"
|
||||
exit 1
|
||||
fi
|
||||
if [ -z "$InfluxdbOrg" ]; then
|
||||
InfluxdbOrg="apipark"
|
||||
fi
|
||||
body='{"driver":"influxdb-v2","config":{"addr":"'"${InfluxdbAddress}"'","org":"'"${InfluxdbOrg}"'","token":"'${InfluxdbToken}'"}}'
|
||||
response=$(request_apipark "/api/v1/monitor/config" "$body")
|
||||
# 从响应中提取 code
|
||||
code=$(echo "${response}" | grep '^{.*}$' | sed 's/.*"code":\([0-9]*\).*/\1/')
|
||||
if [ "$code" -eq 0 ]; then
|
||||
echo_pass "Update influxdb config successfully"
|
||||
else
|
||||
echo_fail "Update influxdb config failed: ${response}"
|
||||
exit 1
|
||||
fi
|
||||
}
|
||||
|
||||
retry() {
|
||||
max_wait="$1"
|
||||
shift
|
||||
cmd="$@"
|
||||
|
||||
sleep_interval=2
|
||||
curr_wait=0
|
||||
|
||||
until $cmd
|
||||
do
|
||||
if [ "$curr_wait" -ge "$max_wait" ]
|
||||
then
|
||||
echo "Command '$cmd' failed after $curr_wait seconds."
|
||||
return 1
|
||||
else
|
||||
curr_wait=$((curr_wait + sleep_interval))
|
||||
sleep "$sleep_interval"
|
||||
fi
|
||||
done
|
||||
}
|
||||
|
||||
is_init() {
|
||||
path="/api/v1/system/general"
|
||||
method="GET"
|
||||
response=$(request_apipark "$path" "" "$method")
|
||||
# 从响应中提取 site_prefix
|
||||
site_prefix=$(echo "${response}" | grep '^{.*}$' | sed 's/.*"site_prefix":"\{0,1\}\([^"]*\)"\{0,1\}.*/\1/')
|
||||
if [ -z "$site_prefix" ]; then
|
||||
echo_pass "No apipark openapi address set"
|
||||
echo false
|
||||
else
|
||||
echo_pass "Apipark openapi address found: $site_prefix"
|
||||
echo true
|
||||
fi
|
||||
}
|
||||
|
||||
set_openapi_config() {
|
||||
IP=$(dig +short myip.opendns.com @resolver1.opendns.com | grep -E '^[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}$')
|
||||
if [ -z "$IP" ]; then
|
||||
echo_fail "Failed to resolve IP address"
|
||||
exit 1
|
||||
fi
|
||||
body='{"site_prefix":"http://'"${IP}"':18288"}'
|
||||
response=$(request_apipark "/api/v1/system/general" "$body")
|
||||
# 从响应中提取 code
|
||||
code=$(echo "${response}" | grep '^{.*}$' | sed 's/.*"code":\([0-9]*\).*/\1/')
|
||||
if [ "$code" -eq 0 ]; then
|
||||
echo_pass "Update apipark openapi address successfully"
|
||||
else
|
||||
echo_fail "Update apipark openapi address failed: ${response}"
|
||||
exit 1
|
||||
fi
|
||||
}
|
||||
|
||||
wait_for() {
|
||||
waitName=$1
|
||||
cmd=$2
|
||||
echo ${cmd}
|
||||
echo_wait "Waiting for ${waitName} to start..."
|
||||
retry 30 ${cmd}
|
||||
if [ $? -eq 0 ]; then
|
||||
echo_pass "${waitName} has been installed successfully"
|
||||
else
|
||||
echo_fail "${waitName} installation failed"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
}
|
||||
|
||||
wait_for_apipark() {
|
||||
wait_for "apipark" "curl -s -o /dev/null ${ApiparkAddress}/api/v1/account/login"
|
||||
}
|
||||
|
||||
wait_for_apinto() {
|
||||
wait_for "apinto" "curl -s -o /dev/null ${ApintoAddress}/api/router"
|
||||
}
|
||||
|
||||
wait_for_influxdb() {
|
||||
wait_for "influxdb" "curl -s -o /dev/null ${InfluxdbAddress}/api/v2/health"
|
||||
}
|
||||
|
||||
|
||||
|
||||
+107
-3
@@ -23,7 +23,77 @@ var (
|
||||
)
|
||||
|
||||
type imlLogService struct {
|
||||
store log_source.ILogSourceStore `autowired:""`
|
||||
store log_source.ILogSourceStore `autowired:""`
|
||||
logRecordStore log_source.ILogRecordStore `autowired:""`
|
||||
}
|
||||
|
||||
func (i *imlLogService) LogRecordsByService(ctx context.Context, serviceId string, start time.Time, end time.Time, page int, size int) ([]*Item, int64, error) {
|
||||
list, total, err := i.logRecordStore.ListPage(ctx, "`record_time` between ? and ? and `service` = ?", page, size, []interface{}{
|
||||
start,
|
||||
end,
|
||||
serviceId,
|
||||
}, "record_time desc")
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
return utils.SliceToSlice(list, func(s *log_source.LogRecord) *Item {
|
||||
return &Item{
|
||||
ID: s.UUID,
|
||||
Strategy: s.Strategy,
|
||||
Service: s.Service,
|
||||
API: s.API,
|
||||
Method: s.Method,
|
||||
Url: s.Url,
|
||||
RemoteIP: s.RemoteIP,
|
||||
Consumer: s.Consumer,
|
||||
Authorization: s.Authorization,
|
||||
InputToken: s.InputToken,
|
||||
OutputToken: s.OutputToken,
|
||||
TotalToken: s.TotalToken,
|
||||
AIProvider: s.AIProvider,
|
||||
AIModel: s.AIModel,
|
||||
StatusCode: s.StatusCode,
|
||||
ResponseTime: s.ResponseTime,
|
||||
Traffic: s.Traffic,
|
||||
RecordTime: s.RecordTime,
|
||||
}
|
||||
}), total, nil
|
||||
|
||||
}
|
||||
|
||||
func (i *imlLogService) InsertLog(ctx context.Context, driver string, input *InsertLog) error {
|
||||
// 判断日志是否已存在,若已存在,则不插入
|
||||
_, err := i.logRecordStore.First(ctx, map[string]interface{}{"uuid": input.ID})
|
||||
if err != nil {
|
||||
if !errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
log_print.Errorf("get log record %s error: %s", input.ID, err)
|
||||
return err
|
||||
}
|
||||
return i.logRecordStore.Insert(ctx, &log_source.LogRecord{
|
||||
UUID: input.ID,
|
||||
Driver: input.Driver,
|
||||
Service: input.Service,
|
||||
API: input.API,
|
||||
Strategy: input.Strategy,
|
||||
Method: input.Method,
|
||||
Url: input.Url,
|
||||
RemoteIP: input.RemoteIP,
|
||||
Consumer: input.Consumer,
|
||||
Authorization: input.Authorization,
|
||||
InputToken: input.InputToken,
|
||||
OutputToken: input.OutputToken,
|
||||
TotalToken: input.TotalToken,
|
||||
AIProvider: input.AIProvider,
|
||||
AIModel: input.AIModel,
|
||||
StatusCode: input.StatusCode,
|
||||
ResponseTime: input.ResponseTime,
|
||||
|
||||
Traffic: input.Traffic,
|
||||
RecordTime: input.RecordTime,
|
||||
})
|
||||
}
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
func (i *imlLogService) OnComplete() {
|
||||
@@ -67,9 +137,10 @@ func (i *imlLogService) UpdateLogSource(ctx context.Context, driver string, inpu
|
||||
if input.Config == nil || *input.Config == "" {
|
||||
return errors.New("config is required")
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
userId := utils.UserId(ctx)
|
||||
s = &log_source.Log{
|
||||
s = &log_source.LogSource{
|
||||
UUID: input.ID,
|
||||
Cluster: *input.Cluster,
|
||||
Driver: driver,
|
||||
@@ -79,11 +150,19 @@ func (i *imlLogService) UpdateLogSource(ctx context.Context, driver string, inpu
|
||||
CreateAt: now,
|
||||
UpdateAt: now,
|
||||
}
|
||||
if input.LastPullTime == nil {
|
||||
s.LastPullAt = time.Now().Add(-24 * time.Hour)
|
||||
} else {
|
||||
s.LastPullAt = *input.LastPullTime
|
||||
}
|
||||
|
||||
} else {
|
||||
if input.Config != nil && *input.Config != "" {
|
||||
s.Config = *input.Config
|
||||
}
|
||||
if input.LastPullTime != nil {
|
||||
s.LastPullAt = *input.LastPullTime
|
||||
}
|
||||
s.Updater = utils.UserId(ctx)
|
||||
s.UpdateAt = time.Now()
|
||||
}
|
||||
@@ -129,6 +208,10 @@ func (i *imlLogService) Logs(ctx context.Context, driver string, cluster string,
|
||||
return result, count, nil
|
||||
}
|
||||
|
||||
func (i *imlLogService) LogRecords(ctx context.Context, driver string, keyword string, start time.Time, end time.Time) ([]*Item, int64, error) {
|
||||
panic(errors.New("not implemented"))
|
||||
}
|
||||
|
||||
func (i *imlLogService) LogCount(ctx context.Context, driver string, cluster string, conditions map[string]string, spendHour int64, group string) (map[string]int64, error) {
|
||||
d, has := log_driver.GetDriver(driver)
|
||||
if !has {
|
||||
@@ -147,11 +230,32 @@ func (i *imlLogService) LogInfo(ctx context.Context, driver string, cluster stri
|
||||
return nil, err
|
||||
}
|
||||
return &Info{
|
||||
ID: info.ID,
|
||||
Item: Item{
|
||||
ID: info.ID,
|
||||
Strategy: info.Strategy,
|
||||
Service: info.Service,
|
||||
API: info.API,
|
||||
Method: info.Method,
|
||||
Url: info.Url,
|
||||
RemoteIP: info.RemoteIP,
|
||||
Consumer: info.Consumer,
|
||||
Authorization: info.Authorization,
|
||||
InputToken: info.InputToken,
|
||||
OutputToken: info.OutputToken,
|
||||
TotalToken: info.TotalToken,
|
||||
AIProvider: info.AIProvider,
|
||||
AIModel: info.AIModel,
|
||||
StatusCode: info.StatusCode,
|
||||
ResponseTime: info.ResponseTime,
|
||||
Traffic: info.Traffic,
|
||||
RecordTime: info.RecordTime,
|
||||
},
|
||||
ContentType: info.ContentType,
|
||||
RequestBody: info.RequestBody,
|
||||
ProxyBody: info.ProxyBody,
|
||||
ProxyResponseBody: info.ProxyResponseBody,
|
||||
ResponseBody: info.ResponseBody,
|
||||
RequestHeader: info.RequestHeader,
|
||||
ResponseHeader: info.ResponseHeader,
|
||||
}, nil
|
||||
}
|
||||
|
||||
+59
-22
@@ -7,51 +7,88 @@ import (
|
||||
)
|
||||
|
||||
type Save struct {
|
||||
ID string
|
||||
Cluster *string
|
||||
Config *string
|
||||
ID string
|
||||
Cluster *string
|
||||
Config *string
|
||||
LastPullTime *time.Time
|
||||
}
|
||||
|
||||
type Source struct {
|
||||
ID string
|
||||
Cluster string
|
||||
Driver string
|
||||
Config string
|
||||
Creator string
|
||||
Updater string
|
||||
CreateAt time.Time
|
||||
UpdateAt time.Time
|
||||
ID string
|
||||
Cluster string
|
||||
Driver string
|
||||
Config string
|
||||
Creator string
|
||||
Updater string
|
||||
CreateAt time.Time
|
||||
UpdateAt time.Time
|
||||
LastPullTime time.Time
|
||||
}
|
||||
|
||||
func FromEntity(ov *log_source.Log) *Source {
|
||||
func FromEntity(ov *log_source.LogSource) *Source {
|
||||
return &Source{
|
||||
ID: ov.UUID,
|
||||
Cluster: ov.Cluster,
|
||||
Driver: ov.Driver,
|
||||
Config: ov.Config,
|
||||
Creator: ov.Creator,
|
||||
Updater: ov.Updater,
|
||||
CreateAt: ov.CreateAt,
|
||||
UpdateAt: ov.UpdateAt,
|
||||
ID: ov.UUID,
|
||||
Cluster: ov.Cluster,
|
||||
Driver: ov.Driver,
|
||||
Config: ov.Config,
|
||||
Creator: ov.Creator,
|
||||
Updater: ov.Updater,
|
||||
LastPullTime: ov.LastPullAt,
|
||||
CreateAt: ov.CreateAt,
|
||||
UpdateAt: ov.UpdateAt,
|
||||
}
|
||||
}
|
||||
|
||||
type Item struct {
|
||||
type InsertLog struct {
|
||||
ID string
|
||||
Driver string
|
||||
Strategy string
|
||||
Service string
|
||||
API string
|
||||
Method string
|
||||
Url string
|
||||
RemoteIP string
|
||||
Consumer string
|
||||
Authorization string
|
||||
InputToken int64
|
||||
OutputToken int64
|
||||
TotalToken int64
|
||||
AIProvider string
|
||||
AIModel string
|
||||
StatusCode int64
|
||||
ResponseTime int64
|
||||
Traffic int64
|
||||
RecordTime time.Time
|
||||
}
|
||||
|
||||
type Item struct {
|
||||
ID string
|
||||
Strategy string
|
||||
Service string
|
||||
API string
|
||||
Method string
|
||||
Url string
|
||||
RemoteIP string
|
||||
Consumer string
|
||||
Authorization string
|
||||
InputToken int64
|
||||
OutputToken int64
|
||||
TotalToken int64
|
||||
AIProvider string
|
||||
AIModel string
|
||||
StatusCode int64
|
||||
ResponseTime int64
|
||||
Traffic int64
|
||||
RecordTime time.Time
|
||||
}
|
||||
|
||||
type Info struct {
|
||||
ID string
|
||||
Item
|
||||
ContentType string
|
||||
RequestBody string
|
||||
ProxyBody string
|
||||
ProxyResponseBody string
|
||||
ResponseBody string
|
||||
RequestHeader string
|
||||
ResponseHeader string
|
||||
}
|
||||
|
||||
@@ -13,8 +13,14 @@ type ILogService interface {
|
||||
UpdateLogSource(ctx context.Context, driver string, input *Save) error
|
||||
GetLogSource(ctx context.Context, driver string) (*Source, error)
|
||||
Logs(ctx context.Context, driver string, cluster string, conditions map[string]string, start time.Time, end time.Time, limit int64, offset int64) ([]*Item, int64, error)
|
||||
LogRecordsByService(ctx context.Context, serviceId string, start time.Time, end time.Time, page int, size int) ([]*Item, int64, error)
|
||||
LogCount(ctx context.Context, driver string, cluster string, conditions map[string]string, spendHour int64, group string) (map[string]int64, error)
|
||||
LogInfo(ctx context.Context, driver string, cluster string, id string) (*Info, error)
|
||||
InsertLog(ctx context.Context, driver string, input *InsertLog) error
|
||||
}
|
||||
|
||||
type ILogUpdateService interface {
|
||||
UpdateLogSource(ctx context.Context, driver string, input *Save) error
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
||||
@@ -157,3 +157,29 @@ type MonTrendValues struct {
|
||||
Names []string
|
||||
Values [][]interface{}
|
||||
}
|
||||
|
||||
type StatusCodeOverview struct {
|
||||
Status2xx int64
|
||||
Status4xx int64
|
||||
Status5xx int64
|
||||
StatusTotal int64
|
||||
}
|
||||
|
||||
type TokenOverview struct {
|
||||
InputToken int64
|
||||
OutputToken int64
|
||||
TotalToken int64
|
||||
}
|
||||
|
||||
type TopN struct {
|
||||
Key string
|
||||
Request int64
|
||||
Token int64
|
||||
Traffic int64
|
||||
}
|
||||
|
||||
type Aggregate struct {
|
||||
Max int64
|
||||
Min int64
|
||||
Avg int64
|
||||
}
|
||||
|
||||
+44
-12
@@ -2,22 +2,54 @@ package log_source
|
||||
|
||||
import "time"
|
||||
|
||||
type Log struct {
|
||||
Id int64 `gorm:"column:id;type:BIGINT(20);AUTO_INCREMENT;NOT NULL;comment:id;primary_key;comment:主键ID;"`
|
||||
UUID string `gorm:"type:varchar(36);not null;column:uuid;uniqueIndex:uuid;comment:UUID;"`
|
||||
Cluster string `gorm:"column:cluster;type:varchar(36);NOT NULL;comment:集群ID"`
|
||||
Driver string `gorm:"column:driver;type:VARCHAR(36);NOT NULL;comment:驱动"`
|
||||
Config string `gorm:"column:config;type:TEXT;NOT NULL;comment:配置"`
|
||||
Creator string `gorm:"type:varchar(36);not null;column:creator;comment:creator" aovalue:"creator"`
|
||||
Updater string `gorm:"type:varchar(36);not null;column:updater;comment:updater" aovalue:"updater"`
|
||||
CreateAt time.Time `gorm:"type:timestamp;NOT NULL;DEFAULT:CURRENT_TIMESTAMP;column:create_at;comment:创建时间"`
|
||||
UpdateAt time.Time `gorm:"type:timestamp;NOT NULL;DEFAULT:CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;column:update_at;comment:修改时间" json:"update_at"`
|
||||
type LogSource struct {
|
||||
Id int64 `gorm:"column:id;type:BIGINT(20);AUTO_INCREMENT;NOT NULL;comment:id;primary_key;comment:主键ID;"`
|
||||
UUID string `gorm:"type:varchar(36);not null;column:uuid;uniqueIndex:uuid;comment:UUID;"`
|
||||
Cluster string `gorm:"column:cluster;type:varchar(36);NOT NULL;comment:集群ID"`
|
||||
Driver string `gorm:"column:driver;type:VARCHAR(36);NOT NULL;comment:驱动"`
|
||||
Config string `gorm:"column:config;type:TEXT;NOT NULL;comment:配置"`
|
||||
Creator string `gorm:"type:varchar(36);not null;column:creator;comment:creator" aovalue:"creator"`
|
||||
Updater string `gorm:"type:varchar(36);not null;column:updater;comment:updater" aovalue:"updater"`
|
||||
LastPullAt time.Time `gorm:"type:timestamp;NOT NULL;DEFAULT:CURRENT_TIMESTAMP;column:last_pull_at;comment:最后拉取时间"`
|
||||
CreateAt time.Time `gorm:"type:timestamp;NOT NULL;DEFAULT:CURRENT_TIMESTAMP;column:create_at;comment:创建时间"`
|
||||
UpdateAt time.Time `gorm:"type:timestamp;NOT NULL;DEFAULT:CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;column:update_at;comment:修改时间" json:"update_at"`
|
||||
}
|
||||
|
||||
func (c *Log) IdValue() int64 {
|
||||
func (c *LogSource) IdValue() int64 {
|
||||
return c.Id
|
||||
}
|
||||
|
||||
func (c *Log) TableName() string {
|
||||
func (c *LogSource) TableName() string {
|
||||
return "log"
|
||||
}
|
||||
|
||||
type LogRecord struct {
|
||||
Id int64 `gorm:"column:id;type:BIGINT(20);AUTO_INCREMENT;NOT NULL;comment:id;primary_key;comment:主键ID;"`
|
||||
UUID string `gorm:"type:varchar(36);not null;column:uuid;uniqueIndex:uuid;comment:UUID;"`
|
||||
Driver string `gorm:"column:driver;type:VARCHAR(36);NOT NULL;comment:驱动"`
|
||||
Service string `gorm:"column:service;type:varchar(36);NOT NULL;comment:服务ID"`
|
||||
API string `gorm:"column:api;type:varchar(36);NOT NULL;comment:接口ID"`
|
||||
Strategy string `gorm:"column:strategy;type:varchar(36);NOT NULL;comment:策略ID"`
|
||||
Method string `gorm:"column:method;type:varchar(36);NOT NULL;comment:请求方法"`
|
||||
Url string `gorm:"column:url;type:varchar(255);NOT NULL;comment:请求URL"`
|
||||
RemoteIP string `gorm:"column:remote_ip;type:varchar(255);NOT NULL;comment:请求IP"`
|
||||
Consumer string `gorm:"column:consumer;type:varchar(255);NOT NULL;comment:消费者ID"`
|
||||
Authorization string `gorm:"column:authorization;type:varchar(255);NOT NULL;comment:鉴权ID"`
|
||||
InputToken int64 `gorm:"column:input_token;type:int(11);NOT NULL;comment:输入令牌"`
|
||||
OutputToken int64 `gorm:"column:output_token;type:int(11);NOT NULL;comment:输出令牌"`
|
||||
TotalToken int64 `gorm:"column:total_token;type:int(11);NOT NULL;comment:总令牌"`
|
||||
AIProvider string `gorm:"column:ai_provider;type:varchar(255);NOT NULL;comment:AI提供商"`
|
||||
AIModel string `gorm:"column:ai_model;type:varchar(255);NOT NULL;comment:AI模型"`
|
||||
StatusCode int64 `gorm:"column:status_code;type:int(11);NOT NULL;comment:请求状态码"`
|
||||
ResponseTime int64 `gorm:"column:response_time;type:int(11);NOT NULL;comment:响应时间"`
|
||||
Traffic int64 `gorm:"column:traffic;type:BIGINT(20);NOT NULL;comment:流量"`
|
||||
RecordTime time.Time `gorm:"column:record_time;type:timestamp;NOT NULL;DEFAULT:CURRENT_TIMESTAMP;comment:记录时间"`
|
||||
}
|
||||
|
||||
func (c *LogRecord) IdValue() int64 {
|
||||
return c.Id
|
||||
}
|
||||
|
||||
func (c *LogRecord) TableName() string {
|
||||
return "log_record"
|
||||
}
|
||||
|
||||
@@ -8,15 +8,26 @@ import (
|
||||
)
|
||||
|
||||
type ILogSourceStore interface {
|
||||
store.IBaseStore[Log]
|
||||
store.IBaseStore[LogSource]
|
||||
}
|
||||
|
||||
type storeLogSource struct {
|
||||
store.Store[Log]
|
||||
store.Store[LogSource]
|
||||
}
|
||||
|
||||
type ILogRecordStore interface {
|
||||
store.IBaseStore[LogRecord]
|
||||
}
|
||||
|
||||
type storeLogRecord struct {
|
||||
store.Store[LogRecord]
|
||||
}
|
||||
|
||||
func init() {
|
||||
autowire.Auto[ILogSourceStore](func() reflect.Value {
|
||||
return reflect.ValueOf(new(storeLogSource))
|
||||
})
|
||||
autowire.Auto[ILogRecordStore](func() reflect.Value {
|
||||
return reflect.ValueOf(new(storeLogRecord))
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user