mirror of
https://github.com/APIParkLab/APIPark.git
synced 2026-06-14 20:41:15 +08:00
297 lines
8.1 KiB
Go
297 lines
8.1 KiB
Go
package loki
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/url"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/eolinker/eosc/log"
|
|
|
|
log_driver "github.com/APIParkLab/APIPark/log-driver"
|
|
)
|
|
|
|
func init() {
|
|
log_driver.RegisterFactory("loki", &factory{})
|
|
}
|
|
|
|
type factory struct {
|
|
}
|
|
|
|
func (f *factory) Create(config string) (log_driver.ILogDriver, map[string]interface{}, error) {
|
|
|
|
return NewDriver(config)
|
|
}
|
|
|
|
var (
|
|
client = http.Client{}
|
|
)
|
|
|
|
type Driver struct {
|
|
url string
|
|
headers map[string]string
|
|
}
|
|
|
|
func NewDriver(config string) (*Driver, map[string]interface{}, error) {
|
|
cfg := new(DriverConfig)
|
|
err := json.Unmarshal([]byte(config), cfg)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
err = cfg.Check()
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
headers := map[string]string{}
|
|
for _, h := range cfg.Header {
|
|
headers[h.Key] = h.Value
|
|
}
|
|
return &Driver{
|
|
url: cfg.URL,
|
|
headers: headers,
|
|
}, map[string]interface{}{
|
|
"url": cfg.URL,
|
|
"headers": headers,
|
|
}, nil
|
|
}
|
|
|
|
func (d *Driver) LogInfo(clusterId string, id string) (*log_driver.LogInfo, error) {
|
|
if id == "" {
|
|
return nil, fmt.Errorf("id is empty")
|
|
}
|
|
queries := url.Values{}
|
|
queries.Set("query", fmt.Sprintf("{cluster=\"%s\"} | json | request_id = `%s`", clusterId, id))
|
|
now := time.Now()
|
|
start := now.Add(-time.Hour * 24 * 30)
|
|
queries.Set("start", strconv.FormatInt(start.UnixNano(), 10))
|
|
queries.Set("end", strconv.FormatInt(now.UnixNano(), 10))
|
|
queries.Set("limit", "1")
|
|
log.Debug("query is ", queries.Get("query"))
|
|
|
|
list, err := send[LogInfo](http.MethodGet, fmt.Sprintf("%s/loki/api/v1/query_range", d.url), d.headers, queries, "")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(list) < 1 || list[0].Stream == nil {
|
|
return nil, fmt.Errorf("no log found")
|
|
}
|
|
stream := list[0].Stream
|
|
return &log_driver.LogInfo{
|
|
ID: stream.RequestId,
|
|
ContentType: stream.ContentType,
|
|
RequestBody: stream.RequestBody,
|
|
ProxyBody: stream.ProxyBody,
|
|
ProxyResponseBody: stream.ProxyResponseBody,
|
|
ResponseBody: stream.ResponseBody,
|
|
}, nil
|
|
}
|
|
|
|
func (d *Driver) LogCount(clusterId string, conditions map[string]string, spendHour int64, group string) (map[string]int64, error) {
|
|
|
|
cs := make([]string, 0, len(conditions))
|
|
for k, v := range conditions {
|
|
if strings.HasPrefix(k, "#") {
|
|
cs = append(cs, v)
|
|
continue
|
|
}
|
|
cs = append(cs, fmt.Sprintf("%s=\"%s\"", k, v))
|
|
}
|
|
tmpCondition := ""
|
|
if len(conditions) > 0 {
|
|
tmpCondition = "|" + strings.Join(cs, "|")
|
|
}
|
|
queries := url.Values{}
|
|
queries.Set("query", fmt.Sprintf("sum(count_over_time({cluster=\"%s\"} | json %s [%dh])) by (%s)", clusterId, tmpCondition, spendHour, group))
|
|
sendRequestTime := time.Now()
|
|
list, err := send[LogCount](http.MethodGet, fmt.Sprintf("%s/loki/api/v1/query", d.url), d.headers, queries, "")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
log.DebugF("send request spend time: %v", time.Now().Sub(sendRequestTime))
|
|
log.Debug("query is ", queries.Get("query"))
|
|
result := make(map[string]int64)
|
|
for _, l := range list {
|
|
if len(l.Value) != 2 {
|
|
continue
|
|
}
|
|
value, ok := l.Value[1].(string)
|
|
if !ok {
|
|
continue
|
|
}
|
|
v, err := strconv.ParseInt(value, 10, 64)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
result[l.Metric[group]] = v
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
func (d *Driver) Logs(clusterId string, conditions map[string]string, start time.Time, end time.Time, limit int64, offset int64) ([]*log_driver.Log, int64, error) {
|
|
if start.After(end) {
|
|
return nil, 0, fmt.Errorf("start time is greater than end time")
|
|
}
|
|
if len(conditions) < 1 {
|
|
return nil, 0, fmt.Errorf("conditions is empty")
|
|
}
|
|
if offset < 1 {
|
|
offset = 1
|
|
}
|
|
if limit < 1 {
|
|
limit = 15
|
|
}
|
|
count, err := d.logCount(clusterId, conditions, start, end)
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
if count == 0 {
|
|
return nil, 0, nil
|
|
}
|
|
if count < (offset-1)*limit {
|
|
return nil, 0, fmt.Errorf("offset is greater than count")
|
|
}
|
|
cs := make([]string, 0, len(conditions))
|
|
for k, v := range conditions {
|
|
if strings.HasPrefix(k, "#") {
|
|
cs = append(cs, v)
|
|
continue
|
|
}
|
|
cs = append(cs, fmt.Sprintf("%s=~\"%s\"", k, v))
|
|
}
|
|
queries := url.Values{}
|
|
queries.Set("query", fmt.Sprintf("{cluster=\"%s\"} | json | %s", clusterId, strings.Join(cs, " | ")))
|
|
queries.Set("limit", strconv.FormatInt(limit, 10))
|
|
queries.Set("direction", "backward")
|
|
queries.Set("start", strconv.FormatInt(start.UnixNano(), 10))
|
|
log.Debug("query is ", queries.Get("query"))
|
|
logs, err := d.recuseLogs(queries, end, offset)
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
|
|
return logs, count, nil
|
|
}
|
|
|
|
func (d *Driver) recuseLogs(queries url.Values, end time.Time, offset int64) ([]*log_driver.Log, error) {
|
|
queries.Set("end", strconv.FormatInt(end.UnixNano(), 10))
|
|
list, err := send[LogInfo](http.MethodGet, fmt.Sprintf("%s/loki/api/v1/query_range", d.url), d.headers, queries, "")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(list) < 1 {
|
|
return nil, nil
|
|
}
|
|
if offset > 1 {
|
|
// 获取list最后一个元素的时间戳
|
|
last := list[len(list)-1].Stream
|
|
if last == nil {
|
|
return nil, fmt.Errorf("last log is empty")
|
|
}
|
|
msec, err := strconv.ParseInt(last.Msec, 10, 64)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("parse last log time error: %v", err)
|
|
}
|
|
return d.recuseLogs(queries, time.UnixMilli(msec), offset-1)
|
|
}
|
|
logs := make([]*log_driver.Log, 0, len(list))
|
|
for _, l := range list {
|
|
if l.Stream == nil {
|
|
continue
|
|
}
|
|
detail := l.Stream
|
|
msec, _ := strconv.ParseInt(detail.Msec, 10, 64)
|
|
|
|
logs = append(logs, &log_driver.Log{
|
|
ID: detail.RequestId,
|
|
Service: detail.Provider,
|
|
Method: detail.RequestMethod,
|
|
Url: detail.RequestUri,
|
|
RemoteIP: detail.SrcIp,
|
|
Consumer: detail.Application,
|
|
Authorization: detail.Authorization,
|
|
RecordTime: time.UnixMilli(msec),
|
|
})
|
|
}
|
|
sort.Slice(logs, func(i, j int) bool {
|
|
return logs[i].RecordTime.After(logs[j].RecordTime)
|
|
})
|
|
return logs, nil
|
|
}
|
|
|
|
func (d *Driver) logCount(clusterId string, conditions map[string]string, start time.Time, end time.Time) (int64, error) {
|
|
// 先查在这段时间内符合条件的日志数量
|
|
queries := url.Values{}
|
|
queries.Add("start", strconv.FormatInt(start.UnixNano(), 10))
|
|
queries.Add("end", strconv.FormatInt(end.UnixNano(), 10))
|
|
cs := make([]string, 0, len(conditions))
|
|
for k, v := range conditions {
|
|
if strings.HasPrefix(k, "#") {
|
|
cs = append(cs, v)
|
|
continue
|
|
}
|
|
cs = append(cs, fmt.Sprintf("%s=\"%s\"", k, v))
|
|
}
|
|
tmpCondition := ""
|
|
if len(conditions) > 0 {
|
|
tmpCondition = "|" + strings.Join(cs, "|")
|
|
}
|
|
queries.Set("query", fmt.Sprintf("sum(count_over_time({cluster=\"%s\"} | json %s [720h]))", clusterId, tmpCondition))
|
|
list, err := send[LogCount](http.MethodGet, fmt.Sprintf("%s/loki/api/v1/query", d.url), d.headers, queries, "")
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
if len(list) < 1 || len(list[0].Value) < 2 {
|
|
return 0, nil
|
|
}
|
|
value, ok := list[0].Value[1].(string)
|
|
if !ok {
|
|
return 0, nil
|
|
}
|
|
v, err := strconv.ParseInt(value, 10, 64)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return v, nil
|
|
}
|
|
|
|
func send[T any](method string, uri string, headers map[string]string, queries url.Values, body string) ([]*T, error) {
|
|
if queries != nil && len(queries) > 0 {
|
|
uri = fmt.Sprintf("%s?%s", uri, queries.Encode())
|
|
}
|
|
req, err := http.NewRequest(method, uri, strings.NewReader(body))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create request: %w,uri is %s", err, uri)
|
|
}
|
|
for key, value := range headers {
|
|
req.Header.Set(key, value)
|
|
}
|
|
log.DebugF("do request: %s", uri)
|
|
doRequestTime := time.Now()
|
|
resp, err := client.Do(req)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to send request: %w", err)
|
|
}
|
|
log.DebugF("do request spend time: %v", time.Now().Sub(doRequestTime))
|
|
defer resp.Body.Close()
|
|
respData, err := io.ReadAll(resp.Body)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to read response: %w", err)
|
|
}
|
|
|
|
if resp.StatusCode > 399 {
|
|
return nil, fmt.Errorf("failed to send request: %s,body is %s", resp.Status, string(respData))
|
|
}
|
|
|
|
result := new(Response[T])
|
|
err = json.Unmarshal(respData, result)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to decode response: %w,body is %s", err, string(respData))
|
|
}
|
|
return result.Data.Result, nil
|
|
}
|