finish service log module

This commit is contained in:
Liujian
2025-04-29 19:08:02 +08:00
parent a3bebde83c
commit 604a8312ef
15 changed files with 346 additions and 38 deletions
+8
View File
@@ -84,6 +84,14 @@ type imlServiceController struct {
transaction store.ITransaction `autowired:""`
}
func (i *imlServiceController) RestLogInfo(ctx *gin.Context, serviceId string, logId string) (*service_dto.RestLogInfo, error) {
return i.module.RestLogInfo(ctx, serviceId, logId)
}
func (i *imlServiceController) AILogInfo(ctx *gin.Context, serviceId string, logId string) (*service_dto.AILogInfo, error) {
return i.module.AILogInfo(ctx, serviceId, logId)
}
func (i *imlServiceController) AILogs(ctx *gin.Context, serviceId string, start string, end string, page string, size string) ([]*service_dto.AILogItem, int64, error) {
s, e, err := formatTime(start, end)
if err != nil {
+2
View File
@@ -45,6 +45,8 @@ type IServiceController interface {
AILogs(ctx *gin.Context, serviceId string, start string, end string, page string, size string) ([]*service_dto.AILogItem, int64, error)
RestLogs(ctx *gin.Context, serviceId string, start string, end string, page string, size string) ([]*service_dto.RestLogItem, int64, error)
RestLogInfo(ctx *gin.Context, serviceId string, logId string) (*service_dto.RestLogInfo, error)
AILogInfo(ctx *gin.Context, serviceId string, logId string) (*service_dto.AILogInfo, error)
}
type IAppController interface {
+2 -2
View File
@@ -9,8 +9,8 @@ import (
type ILogDriver interface {
LogInfo(clusterId string, id string) (*LogInfo, error)
LogCount(clusterId string, conditions map[string]string, spendHour int64, group string) (map[string]int64, error)
Logs(clusterId string, conditions map[string]string, start time.Time, end time.Time, limit int64, offset int64) ([]*Log, int64, error)
LogRecords(clusterId string, start time.Time, end time.Time) ([]*Log, error)
Logs(clusterId string, conditions map[string]string, start time.Time, end time.Time, limit int64, offset int64) ([]*LogItem, int64, error)
LogRecords(clusterId string, start time.Time, end time.Time) ([]*LogItem, error)
}
var (
+4 -2
View File
@@ -4,7 +4,7 @@ import (
"time"
)
type Log struct {
type LogItem struct {
ID string
Strategy string
Service string
@@ -26,10 +26,12 @@ type Log struct {
}
type LogInfo struct {
ID string
*LogItem
ContentType string
RequestBody string
ProxyBody string
ProxyResponseBody string
ResponseBody string
RequestHeader string
ResponseHeader string
}
+2
View File
@@ -63,11 +63,13 @@ type LogDetail struct {
RequestId string `json:"request_id"`
RequestMethod string `json:"request_method"`
RequestScheme string `json:"request_scheme"`
RequestHeader string `json:"request_header"`
RequestTime string `json:"request_time"`
RequestUri string `json:"request_uri"`
RequestBody string `json:"request_body"`
ProxyBody string `json:"proxy_body"`
ResponseBody string `json:"response_body"`
ResponseHeader string `json:"response_header"`
ProxyResponseBody string `json:"proxy_response_body"`
Service string `json:"service"`
Provider string `json:"provider"`
+33 -27
View File
@@ -81,13 +81,16 @@ func (d *Driver) LogInfo(clusterId string, id string) (*log_driver.LogInfo, erro
return nil, fmt.Errorf("no log found")
}
stream := list[0].Stream
msec, _ := strconv.ParseInt(stream.Msec, 10, 64)
return &log_driver.LogInfo{
ID: stream.RequestId,
LogItem: ToLogItem(stream, msec),
ContentType: stream.ContentType,
RequestBody: stream.RequestBody,
ProxyBody: stream.ProxyBody,
ProxyResponseBody: stream.ProxyResponseBody,
ResponseBody: stream.ResponseBody,
RequestHeader: stream.RequestHeader,
ResponseHeader: stream.ResponseHeader,
}, nil
}
@@ -132,7 +135,7 @@ func (d *Driver) LogCount(clusterId string, conditions map[string]string, spendH
return result, nil
}
func (d *Driver) LogRecords(clusterId string, start time.Time, end time.Time) ([]*log_driver.Log, error) {
func (d *Driver) LogRecords(clusterId string, start time.Time, end time.Time) ([]*log_driver.LogItem, error) {
if start.After(end) {
return nil, fmt.Errorf("start time is greater than end time")
}
@@ -150,7 +153,7 @@ func (d *Driver) LogRecords(clusterId string, start time.Time, end time.Time) ([
return logs, nil
}
func (d *Driver) Logs(clusterId string, conditions map[string]string, start time.Time, end time.Time, limit int64, offset int64) ([]*log_driver.Log, int64, error) {
func (d *Driver) Logs(clusterId string, conditions map[string]string, start time.Time, end time.Time, limit int64, offset int64) ([]*log_driver.LogItem, int64, error) {
if start.After(end) {
return nil, 0, fmt.Errorf("start time is greater than end time")
}
@@ -195,7 +198,30 @@ func (d *Driver) Logs(clusterId string, conditions map[string]string, start time
return logs, count, nil
}
func (d *Driver) recuseLogs(queries url.Values, end time.Time, offset int64) ([]*log_driver.Log, error) {
func ToLogItem(detail *LogDetail, msec int64) *log_driver.LogItem {
return &log_driver.LogItem{
ID: detail.RequestId,
Strategy: detail.Strategy,
Service: detail.Provider,
API: detail.Api,
Method: detail.RequestMethod,
Url: detail.RequestUri,
RemoteIP: detail.SrcIp,
Consumer: detail.Application,
Authorization: detail.Authorization,
InputToken: parseToInt64(detail.AIModelInputToken),
OutputToken: parseToInt64(detail.AIModelOutputToken),
TotalToken: parseToInt64(detail.AIModelTotalToken),
AIProvider: detail.AIProvider,
AIModel: detail.AIModel,
StatusCode: parseToInt64(detail.Status),
ResponseTime: parseToInt64(detail.RequestTime),
Traffic: int64(len(detail.ResponseBody) + len(detail.RequestBody)),
RecordTime: time.UnixMilli(msec),
}
}
func (d *Driver) recuseLogs(queries url.Values, end time.Time, offset int64) ([]*log_driver.LogItem, error) {
queries.Set("end", strconv.FormatInt(end.UnixNano(), 10))
list, err := send[LogInfo](http.MethodGet, fmt.Sprintf("%s/loki/api/v1/query_range", d.url), d.headers, queries, "")
if err != nil {
@@ -216,33 +242,13 @@ func (d *Driver) recuseLogs(queries url.Values, end time.Time, offset int64) ([]
}
return d.recuseLogs(queries, time.UnixMilli(msec), offset-1)
}
logs := make([]*log_driver.Log, 0, len(list))
logs := make([]*log_driver.LogItem, 0, len(list))
for _, l := range list {
if l.Stream == nil {
continue
}
detail := l.Stream
msec, _ := strconv.ParseInt(detail.Msec, 10, 64)
logs = append(logs, &log_driver.Log{
ID: detail.RequestId,
Strategy: detail.Strategy,
Service: detail.Provider,
API: detail.Api,
Method: detail.RequestMethod,
Url: detail.RequestUri,
RemoteIP: detail.SrcIp,
Consumer: detail.Application,
Authorization: detail.Authorization,
InputToken: parseToInt64(detail.AIModelInputToken),
OutputToken: parseToInt64(detail.AIModelOutputToken),
TotalToken: parseToInt64(detail.AIModelTotalToken),
AIProvider: detail.AIProvider,
AIModel: detail.AIModel,
StatusCode: parseToInt64(detail.Status),
ResponseTime: parseToInt64(detail.RequestTime),
Traffic: int64(len(detail.ResponseBody) + len(detail.RequestBody)),
RecordTime: time.UnixMilli(msec),
})
msec, _ := strconv.ParseInt(l.Stream.Msec, 10, 64)
logs = append(logs, ToLogItem(l.Stream, msec))
}
sort.Slice(logs, func(i, j int) bool {
return logs[i].RecordTime.After(logs[j].RecordTime)
+5 -5
View File
@@ -44,12 +44,12 @@ func TestLoki(t *testing.T) {
// if err != nil {
// t.Fatalf("failed to send request: %v", err)
// }
// t.Log(time.Now().Sub(a))
// t.LogItem(time.Now().Sub(a))
// data, err := json.Marshal(result)
// if err != nil {
// t.Fatalf("failed to marshal data: %v", err)
// }
// t.Log(string(data))
// t.LogItem(string(data))
//}
//
//func TestLokiLogCount(t *testing.T) {
@@ -67,7 +67,7 @@ func TestLoki(t *testing.T) {
// if err != nil {
// t.Fatalf("failed to marshal data: %v", err)
// }
// t.Log(string(data))
// t.LogItem(string(data))
//}
//
//func TestLokiLogs(t *testing.T) {
@@ -83,7 +83,7 @@ func TestLoki(t *testing.T) {
// queries.Set("limit", "1")
// now = time.Now()
// result, err := send[map[string]interface{}](http.MethodGet, "http://localhost:3100/loki/api/v1/query_range", headers, queries, "")
// t.Log(time.Now().Sub(now))
// t.LogItem(time.Now().Sub(now))
// if err != nil {
// t.Fatalf("failed to send request: %v", err)
// }
@@ -91,5 +91,5 @@ func TestLoki(t *testing.T) {
// if err != nil {
// t.Fatalf("failed to marshal data: %v", err)
// }
// t.Log(string(data))
// t.LogItem(string(data))
//}
+1
View File
@@ -62,6 +62,7 @@ var logFormatter = map[string]interface{}{
"$proxy_host",
"$proxy_header",
"$proxy_addr",
"$response_header",
"$response_headers",
"$status",
"$content_type",
+39
View File
@@ -259,3 +259,42 @@ type RestLogItem struct {
ResponseTime string `json:"response_time"`
Traffic string `json:"traffic"`
}
type RestLogInfo struct {
Id string `json:"id"`
API auto.Label `json:"api" aolabel:"api"`
Consumer auto.Label `json:"consumer" aolabel:"service"`
IsSystemConsumer bool `json:"is_system_consumer"`
Status int64 `json:"status"`
Ip string `json:"ip"`
ResponseTime string `json:"response_time"`
Traffic string `json:"traffic"`
LogTime auto.TimeLabel `json:"log_time"`
Request OriginRequest `json:"request"`
Response OriginRequest `json:"response"`
}
type AILogInfo struct {
Id string `json:"id"`
API auto.Label `json:"api" aolabel:"api"`
Consumer auto.Label `json:"consumer" aolabel:"service"`
IsSystemConsumer bool `json:"is_system_consumer"`
Status int64 `json:"status"`
Ip string `json:"ip"`
Provider auto.Label `json:"provider" aolabel:"ai_provider"`
Model string `json:"model"`
LogTime auto.TimeLabel `json:"log_time"`
Request OriginAIRequest `json:"request"`
Response OriginAIRequest `json:"response"`
}
type OriginRequest struct {
Header string `json:"header"`
Origin string `json:"origin"`
}
type OriginAIRequest struct {
OriginRequest
Body string `json:"body"`
Token int64 `json:"token"`
}
+106
View File
@@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"net/url"
"sort"
"strings"
"time"
@@ -95,6 +96,111 @@ type imlServiceModule struct {
transaction store.ITransaction `autowired:""`
}
func formatHeader(header string) string {
result, err := url.QueryUnescape(header)
if err != nil {
return header
}
result = strings.ReplaceAll(result, "&", "\n")
result = strings.ReplaceAll(result, "=", ": ")
return result
}
func (i *imlServiceModule) RestLogInfo(ctx context.Context, serviceId string, logId string) (*service_dto.RestLogInfo, error) {
c, err := i.clusterService.Get(ctx, cluster.DefaultClusterID)
if err != nil {
return nil, fmt.Errorf("cluster %s not found", cluster.DefaultClusterID)
}
info, err := i.logService.LogInfo(ctx, "loki", c.Cluster, logId)
if err != nil {
return nil, err
}
if info.Service != serviceId {
return nil, errors.New("service not match")
}
logInfo := &service_dto.RestLogInfo{
Id: info.ID,
API: auto.UUID(info.API),
Consumer: auto.UUID(info.Consumer),
Status: info.StatusCode,
Ip: info.RemoteIP,
ResponseTime: common.FormatTime(info.ResponseTime),
Traffic: common.FormatByte(info.Traffic),
LogTime: auto.TimeLabel(info.RecordTime),
Request: service_dto.OriginRequest{
Header: formatHeader(info.RequestHeader),
Origin: info.RequestBody,
},
Response: service_dto.OriginRequest{
Header: formatHeader(info.ResponseHeader),
Origin: info.ResponseBody,
},
}
if info.Consumer == "apipark-global" {
logInfo.IsSystemConsumer = true
logInfo.Consumer = auto.Label{
Id: info.Consumer,
Name: "System Consumer",
}
}
return logInfo, nil
}
func (i *imlServiceModule) AILogInfo(ctx context.Context, serviceId string, logId string) (*service_dto.AILogInfo, error) {
c, err := i.clusterService.Get(ctx, cluster.DefaultClusterID)
if err != nil {
return nil, fmt.Errorf("cluster %s not found", cluster.DefaultClusterID)
}
info, err := i.logService.LogInfo(ctx, "loki", c.Cluster, logId)
if err != nil {
return nil, err
}
if info.Service != serviceId {
return nil, errors.New("service not match")
}
response, err := parseAIResponse(info.ResponseBody)
if err != nil {
response = info.ResponseBody
}
logInfo := &service_dto.AILogInfo{
Id: info.ID,
API: auto.UUID(info.API),
Consumer: auto.UUID(info.Consumer),
Status: info.StatusCode,
Ip: info.RemoteIP,
Provider: auto.UUID(info.AIProvider),
Model: info.AIModel,
LogTime: auto.TimeLabel(info.RecordTime),
Request: service_dto.OriginAIRequest{
OriginRequest: service_dto.OriginRequest{
Header: formatHeader(info.RequestHeader),
Origin: info.RequestBody,
},
Body: parseAIRequest(info.RequestBody),
Token: info.TotalToken,
},
Response: service_dto.OriginAIRequest{
OriginRequest: service_dto.OriginRequest{
Header: formatHeader(info.ResponseHeader),
Origin: info.ResponseBody,
},
Body: response,
Token: info.TotalToken,
},
}
if info.Consumer == "apipark-global" {
logInfo.IsSystemConsumer = true
logInfo.Consumer = auto.Label{
Id: info.Consumer,
Name: "System Consumer",
}
}
return logInfo, nil
}
func (i *imlServiceModule) RestLogs(ctx context.Context, serviceId string, start int64, end int64, page int, size int) ([]*service_dto.RestLogItem, int64, error) {
list, total, err := i.logService.LogRecordsByService(ctx, serviceId, time.Unix(start, 0), time.Unix(end, 0), page, size)
if err != nil {
+2
View File
@@ -42,6 +42,8 @@ type IServiceModule interface {
type ILogModule interface {
AILogs(ctx context.Context, serviceId string, start int64, end int64, page int, size int) ([]*service_dto.AILogItem, int64, error)
RestLogs(ctx context.Context, serviceId string, start int64, end int64, page int, size int) ([]*service_dto.RestLogItem, int64, error)
RestLogInfo(ctx context.Context, serviceId string, logId string) (*service_dto.RestLogInfo, error)
AILogInfo(ctx context.Context, serviceId string, logId string) (*service_dto.AILogInfo, error)
}
type IServiceDocModule interface {
+115
View File
@@ -0,0 +1,115 @@
package service
import (
"bufio"
"encoding/json"
"strings"
)
// ChatCompletionChunk represents the structure of a single chunk in the streaming response
type ChatCompletionChunk struct {
Object string `json:"object"`
Choices []Choice `json:"choices"`
}
// ChatCompletion represents the structure of a non-streaming response
type ChatCompletion struct {
Object string `json:"object"`
Choices []FullChoice `json:"choices"`
}
// Choice represents a choice in the streaming chunk
type Choice struct {
Delta Delta `json:"delta"`
FinishReason *string `json:"finish_reason"`
}
// FullChoice represents a choice in the non-streaming response
type FullChoice struct {
Message Message `json:"message"`
}
// Delta represents the delta content in a streaming choice
type Delta struct {
Content string `json:"content"`
Role string `json:"role,omitempty"`
}
// Message represents the message content in a non-streaming choice
type Message struct {
Content string `json:"content"`
Role string `json:"role"`
}
// ParseAIResponse parses both streaming and non-streaming AI responses and returns the concatenated content
func parseAIResponse(input string) (string, error) {
// First, try to parse as a non-streaming response
var nonStreaming ChatCompletion
if err := json.Unmarshal([]byte(input), &nonStreaming); err == nil && nonStreaming.Object == "chat.completion" {
var result strings.Builder
for _, choice := range nonStreaming.Choices {
result.WriteString(choice.Message.Content)
}
return result.String(), nil
}
// If not non-streaming, parse as streaming response
var result strings.Builder
scanner := bufio.NewScanner(strings.NewReader(input))
for scanner.Scan() {
line := scanner.Text()
// Skip empty lines or [DONE]
if line == "" || line == "data: [DONE]" {
continue
}
// Check if line starts with "data: "
if !strings.HasPrefix(line, "data: ") {
continue
}
// Extract JSON data
jsonData := strings.TrimPrefix(line, "data: ")
var chunk ChatCompletionChunk
if err := json.Unmarshal([]byte(jsonData), &chunk); err != nil {
return "", err
}
// Process each choice
for _, choice := range chunk.Choices {
// Append content from delta
result.WriteString(choice.Delta.Content)
// Check if this is the final chunk
if choice.FinishReason != nil && *choice.FinishReason == "stop" {
return result.String(), nil
}
}
}
if err := scanner.Err(); err != nil {
return "", err
}
return result.String(), nil
}
func parseAIRequest(ori string) string {
type aiRequest struct {
Messages []struct {
Role string `json:"role"`
Content string `json:"content"`
} `json:"messages"`
}
var req aiRequest
err := json.Unmarshal([]byte(ori), &req)
if err != nil {
return ori
}
size := len(req.Messages)
if size == 0 {
return ""
}
return req.Messages[size-1].Content
}
+2
View File
@@ -47,5 +47,7 @@ func (p *plugin) ServiceApis() []pm3.Api {
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/logs/ai", []string{"context", "query:service", "query:start", "query:end", "query:page", "query:page_size"}, []string{"logs", "total"}, p.serviceController.AILogs, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView),
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/logs/rest", []string{"context", "query:service", "query:start", "query:end", "query:page", "query:page_size"}, []string{"logs", "total"}, p.serviceController.RestLogs, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView),
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/log/rest", []string{"context", "query:service", "query:log"}, []string{"log"}, p.serviceController.RestLogInfo, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView),
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/log/ai", []string{"context", "query:service", "query:log"}, []string{"log"}, p.serviceController.AILogInfo, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView),
}
}
+22 -1
View File
@@ -230,11 +230,32 @@ func (i *imlLogService) LogInfo(ctx context.Context, driver string, cluster stri
return nil, err
}
return &Info{
ID: info.ID,
Item: Item{
ID: info.ID,
Strategy: info.Strategy,
Service: info.Service,
API: info.API,
Method: info.Method,
Url: info.Url,
RemoteIP: info.RemoteIP,
Consumer: info.Consumer,
Authorization: info.Authorization,
InputToken: info.InputToken,
OutputToken: info.OutputToken,
TotalToken: info.TotalToken,
AIProvider: info.AIProvider,
AIModel: info.AIModel,
StatusCode: info.StatusCode,
ResponseTime: info.ResponseTime,
Traffic: info.Traffic,
RecordTime: info.RecordTime,
},
ContentType: info.ContentType,
RequestBody: info.RequestBody,
ProxyBody: info.ProxyBody,
ProxyResponseBody: info.ProxyResponseBody,
ResponseBody: info.ResponseBody,
RequestHeader: info.RequestHeader,
ResponseHeader: info.ResponseHeader,
}, nil
}
+3 -1
View File
@@ -83,10 +83,12 @@ type Item struct {
}
type Info struct {
ID string
Item
ContentType string
RequestBody string
ProxyBody string
ProxyResponseBody string
ResponseBody string
RequestHeader string
ResponseHeader string
}