mirror of
https://github.com/APIParkLab/APIPark.git
synced 2026-06-04 10:13:53 +08:00
update loki publish
This commit is contained in:
@@ -20,7 +20,6 @@ func NewDynamicClient(client admin_client.Client, resource string) (*DynamicClie
|
||||
cfg, has := gateway.GetDynamicResourceDriver(resource)
|
||||
if !has {
|
||||
return nil, errors.New("resource not found")
|
||||
|
||||
}
|
||||
|
||||
return &DynamicClient{client: client, profession: cfg.Profession, driver: cfg.Driver}, nil
|
||||
|
||||
+4
-36
@@ -57,42 +57,10 @@ var dynamicResourceMap = map[string]Worker{
|
||||
Profession: ProfessionCertificate,
|
||||
Driver: "server",
|
||||
},
|
||||
//"openai": {
|
||||
// Profession: ProfessionAIProvider,
|
||||
// Driver: "openai",
|
||||
//},
|
||||
//"google": {
|
||||
// Profession: ProfessionAIProvider,
|
||||
// Driver: "google",
|
||||
//},
|
||||
//"anthropic": {
|
||||
// Profession: ProfessionAIProvider,
|
||||
// Driver: "anthropic",
|
||||
//},
|
||||
//"moonshot": {
|
||||
// Profession: ProfessionAIProvider,
|
||||
// Driver: "moonshot",
|
||||
//},
|
||||
//"tongyi": {
|
||||
// Profession: ProfessionAIProvider,
|
||||
// Driver: "tongyi",
|
||||
//},
|
||||
//"zhipuai": {
|
||||
// Profession: ProfessionAIProvider,
|
||||
// Driver: "zhipuai",
|
||||
//},
|
||||
//"fireworks": {
|
||||
// Profession: ProfessionAIProvider,
|
||||
// Driver: "fireworks",
|
||||
//},
|
||||
//"novita": {
|
||||
// Profession: ProfessionAIProvider,
|
||||
// Driver: "novita",
|
||||
//},
|
||||
//"mistralai": {
|
||||
// Profession: ProfessionAIProvider,
|
||||
// Driver: "mistralai",
|
||||
//},
|
||||
"loki": {
|
||||
Profession: ProfessionOutput,
|
||||
Driver: "loki",
|
||||
},
|
||||
}
|
||||
|
||||
type Worker struct {
|
||||
|
||||
@@ -7,7 +7,7 @@ var (
|
||||
)
|
||||
|
||||
type IFactory interface {
|
||||
Create(config string) (ILogDriver, error)
|
||||
Create(config string) (ILogDriver, map[string]interface{}, error)
|
||||
}
|
||||
|
||||
type factoryManager struct {
|
||||
|
||||
+10
-7
@@ -21,7 +21,7 @@ func init() {
|
||||
type factory struct {
|
||||
}
|
||||
|
||||
func (f *factory) Create(config string) (log_driver.ILogDriver, error) {
|
||||
func (f *factory) Create(config string) (log_driver.ILogDriver, map[string]interface{}, error) {
|
||||
|
||||
return NewDriver(config)
|
||||
}
|
||||
@@ -35,24 +35,27 @@ type Driver struct {
|
||||
headers map[string]string
|
||||
}
|
||||
|
||||
func NewDriver(config string) (*Driver, error) {
|
||||
func NewDriver(config string) (*Driver, map[string]interface{}, error) {
|
||||
cfg := new(DriverConfig)
|
||||
err := json.Unmarshal([]byte(config), cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
err = cfg.Check()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
headers := map[string]string{}
|
||||
for _, h := range cfg.Header {
|
||||
headers[h.Key] = h.Value
|
||||
}
|
||||
return &Driver{
|
||||
url: cfg.URL,
|
||||
headers: headers,
|
||||
}, nil
|
||||
url: cfg.URL,
|
||||
headers: headers,
|
||||
}, map[string]interface{}{
|
||||
"url": cfg.URL,
|
||||
"headers": headers,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (d *Driver) LogInfo(clusterId string, id string) (*log_driver.LogInfo, error) {
|
||||
|
||||
+104
-5
@@ -4,6 +4,13 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
log_driver "github.com/APIParkLab/APIPark/log-driver"
|
||||
|
||||
"github.com/APIParkLab/APIPark/gateway"
|
||||
|
||||
"github.com/eolinker/go-common/store"
|
||||
|
||||
"gorm.io/gorm"
|
||||
|
||||
@@ -18,10 +25,58 @@ import (
|
||||
var _ ILogModule = (*imlLogModule)(nil)
|
||||
|
||||
type imlLogModule struct {
|
||||
service log.ILogService `autowired:""`
|
||||
service log.ILogService `autowired:""`
|
||||
clusterService cluster.IClusterService `autowired:""`
|
||||
transaction store.ITransaction `autowired:""`
|
||||
}
|
||||
|
||||
var labels = map[string]string{
|
||||
"cluster": "$cluster",
|
||||
"node": "$node",
|
||||
}
|
||||
var logFormatter = map[string]interface{}{
|
||||
"fields": []string{
|
||||
"$msec",
|
||||
"$service",
|
||||
"$provider",
|
||||
"$scheme as request_scheme",
|
||||
"$url as request_uri",
|
||||
"$host as request_host",
|
||||
"$header as request_header",
|
||||
"$remote_addr",
|
||||
"$request_body",
|
||||
"$proxy_body",
|
||||
"$proxy_method",
|
||||
"$proxy_scheme",
|
||||
"$proxy_uri",
|
||||
"$api",
|
||||
"$proxy_host",
|
||||
"$proxy_header",
|
||||
"$proxy_addr",
|
||||
"$response_headers",
|
||||
"$status",
|
||||
"$content_type",
|
||||
"$proxy_status",
|
||||
"$request_time",
|
||||
"$response_time",
|
||||
"$node",
|
||||
"$cluster",
|
||||
"$application",
|
||||
"$src_ip",
|
||||
"$block_name as strategy",
|
||||
"$request_id",
|
||||
"$request_method",
|
||||
"$authorization",
|
||||
"$response_body",
|
||||
"$proxy_response_body",
|
||||
},
|
||||
}
|
||||
|
||||
func (i *imlLogModule) Save(ctx context.Context, driver string, input *log_dto.Save) error {
|
||||
factory, has := log_driver.GetFactory(driver)
|
||||
if !has {
|
||||
return errors.New("driver not found")
|
||||
}
|
||||
input.Cluster = cluster.DefaultClusterID
|
||||
var cfg *string
|
||||
if input.Config != nil {
|
||||
@@ -29,10 +84,54 @@ func (i *imlLogModule) Save(ctx context.Context, driver string, input *log_dto.S
|
||||
tmp := string(data)
|
||||
cfg = &tmp
|
||||
}
|
||||
return i.service.UpdateLogSource(ctx, driver, &log.Save{
|
||||
ID: input.ID,
|
||||
Cluster: &input.Cluster,
|
||||
Config: cfg,
|
||||
return i.transaction.Transaction(ctx, func(txCtx context.Context) error {
|
||||
err := i.service.UpdateLogSource(ctx, driver, &log.Save{
|
||||
ID: input.ID,
|
||||
Cluster: &input.Cluster,
|
||||
Config: cfg,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
info, err := i.service.GetLogSource(ctx, driver)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d, c, err := factory.Create(info.Config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
client, err := i.clusterService.GatewayClient(ctx, input.Cluster)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
dynamicClient, err := client.Dynamic(driver)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
attr := make(map[string]interface{})
|
||||
attr["driver"] = driver
|
||||
attr["formatter"] = logFormatter
|
||||
attr["labels"] = labels
|
||||
attr["method"] = "POST"
|
||||
for k, v := range c {
|
||||
attr[k] = v
|
||||
}
|
||||
err = dynamicClient.Online(ctx, &gateway.DynamicRelease{
|
||||
BasicItem: &gateway.BasicItem{
|
||||
ID: driver,
|
||||
Description: "collect access log",
|
||||
Version: time.Now().Format("20060102150405"),
|
||||
Resource: gateway.ProfessionOutput,
|
||||
},
|
||||
Attr: attr,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log_driver.SetDriver(driver, d)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
+2
-14
@@ -35,7 +35,7 @@ func (i *imlLogService) OnComplete() {
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
driver, err := factory.Create(s.Config)
|
||||
driver, _, err := factory.Create(s.Config)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
@@ -45,10 +45,6 @@ func (i *imlLogService) OnComplete() {
|
||||
}
|
||||
|
||||
func (i *imlLogService) UpdateLogSource(ctx context.Context, driver string, input *Save) error {
|
||||
factory, has := log_driver.GetFactory(driver)
|
||||
if !has {
|
||||
return errors.New("driver not found")
|
||||
}
|
||||
s, err := i.store.First(ctx, map[string]interface{}{"driver": driver})
|
||||
if err != nil {
|
||||
if !errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
@@ -83,15 +79,7 @@ func (i *imlLogService) UpdateLogSource(ctx context.Context, driver string, inpu
|
||||
s.Updater = utils.UserId(ctx)
|
||||
s.UpdateAt = time.Now()
|
||||
}
|
||||
newDriver, err := factory.Create(s.Config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = i.store.Save(ctx, s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log_driver.SetDriver(driver, newDriver)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user