From 1d09965d9a05283b629aa4a41299015f9e3413d8 Mon Sep 17 00:00:00 2001 From: Liujian <824010343@qq.com> Date: Tue, 7 Jan 2025 12:49:23 +0800 Subject: [PATCH] AI API token quantity docking completed --- app/ai-event-handler/.gitignore | 1 + app/ai-event-handler/main.go | 72 +++++++++++++++++++++++ app/ai-event-handler/nsq.go | 100 ++++++++++++++++++++++++++++++++ go.mod | 4 +- go.sum | 5 ++ service/ai-api/iml.go | 64 +++++++++++++++----- service/ai-api/model.go | 16 +++++ service/ai-api/service.go | 1 + stores/api/model.go | 7 ++- 9 files changed, 251 insertions(+), 19 deletions(-) create mode 100644 app/ai-event-handler/.gitignore create mode 100644 app/ai-event-handler/main.go create mode 100644 app/ai-event-handler/nsq.go diff --git a/app/ai-event-handler/.gitignore b/app/ai-event-handler/.gitignore new file mode 100644 index 00000000..a0ec5967 --- /dev/null +++ b/app/ai-event-handler/.gitignore @@ -0,0 +1 @@ +/config.yml diff --git a/app/ai-event-handler/main.go b/app/ai-event-handler/main.go new file mode 100644 index 00000000..7fa9271a --- /dev/null +++ b/app/ai-event-handler/main.go @@ -0,0 +1,72 @@ +package main + +import ( + "flag" + "fmt" + "log" + "os" + "os/signal" + "syscall" + + "github.com/eolinker/go-common/autowire" + nsq "github.com/nsqio/go-nsq" + + "github.com/eolinker/go-common/cftool" + + _ "github.com/eolinker/go-common/store/store_mysql" + _ "github.com/go-sql-driver/mysql" +) + +var ( + version string + confPath string +) + +func init() { + flag.StringVar(&confPath, "c", "config.yml", "`config` file path for server ") +} + +type ServerConfig struct { + Port int `yaml:"port"` +} + +func main() { + // 1. 连接 MySQL 数据库 + cftool.Register[ServerConfig](fmt.Sprintf("root:%s", confPath)) + cftool.ReadFile(confPath) + + handler := &NSQHandler{} + autowire.Autowired(handler) + err := autowire.CheckComplete() + if err != nil { + log.Fatal("check autowired:", err) + return + } + + // 2. 创建 NSQ 消费者 + config := nsq.NewConfig() + consumer, err := nsq.NewConsumer("ai_event", "tmp", config) + if err != nil { + log.Fatalf("Failed to create NSQ consumer: %v", err) + } + + consumer.AddHandler(handler) + + // 4. 连接到 NSQ + nsqAddress := "172.18.166.219:9150" // NSQ 地址 + err = consumer.ConnectToNSQD(nsqAddress) + if err != nil { + log.Fatalf("Failed to connect to NSQ: %v", err) + } + log.Println("Connected to NSQ") + + // 5. 捕获系统信号,优雅关闭 + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + <-sigChan + + // 优雅停止消费者 + consumer.Stop() + <-consumer.StopChan + log.Println("NSQ Consumer stopped") +} diff --git a/app/ai-event-handler/nsq.go b/app/ai-event-handler/nsq.go new file mode 100644 index 00000000..6093d930 --- /dev/null +++ b/app/ai-event-handler/nsq.go @@ -0,0 +1,100 @@ +package main + +import ( + "context" + "encoding/json" + "log" + "time" + + nsq "github.com/nsqio/go-nsq" + + ai_api "github.com/APIParkLab/APIPark/service/ai-api" +) + +// 定义 NSQ 消息结构 +type AIProviderStatus struct { + Provider string `json:"provider"` + Model string `json:"model"` + Key string `json:"key"` + Status string `json:"status"` +} + +type AIInfo struct { + Model string `json:"ai_model"` + Cost interface{} `json:"ai_model_cost"` + InputToken interface{} `json:"ai_model_input_token"` + OutputToken interface{} `json:"ai_model_output_token"` + TotalToken interface{} `json:"ai_model_total_token"` + Provider string `json:"ai_provider"` + ProviderStats []AIProviderStatus `json:"ai_provider_statuses"` +} + +type NSQMessage struct { + AI AIInfo `json:"ai"` + API string `json:"api"` + Provider string `json:"provider"` + RequestID string `json:"request_id"` + TimeISO8601 string `json:"time_iso8601"` +} + +// NSQHandler 处理 NSQ 消息并写入 MySQL +type NSQHandler struct { + service ai_api.IAPIUseService `autowired:""` + ctx context.Context +} + +func convertInt(value interface{}) int { + switch v := value.(type) { + case int: + return v + case float64: + return int(v) + default: + return 0 + } +} + +// HandleMessage 处理从 NSQ 读取的消息 +func (h *NSQHandler) HandleMessage(message *nsq.Message) error { + log.Printf("Received message: %s", string(message.Body)) + + // 解析消息为结构体 + var data NSQMessage + err := json.Unmarshal(message.Body, &data) + if err != nil { + log.Printf("Failed to unmarshal message: %v", err) + return err + } + + // 将时间字符串转换为 time.Time + timestamp, err := time.Parse(time.RFC3339, data.TimeISO8601) + if err != nil { + log.Printf("Failed to parse timestamp: %v", err) + return err + } + + day := time.Date(timestamp.Year(), timestamp.Month(), timestamp.Day(), 0, 0, 0, 0, timestamp.Location()) + hour := time.Date(timestamp.Year(), timestamp.Month(), timestamp.Day(), timestamp.Hour(), 0, 0, 0, timestamp.Location()) + minute := time.Date(timestamp.Year(), timestamp.Month(), timestamp.Day(), timestamp.Hour(), timestamp.Minute(), 0, 0, timestamp.Location()) + + // 调用 AI API 接口 + err = h.service.Incr(context.Background(), &ai_api.IncrAPIUse{ + API: data.API, + Service: data.Provider, + Provider: data.AI.Provider, + Model: data.AI.Model, + Day: day.Unix(), + Hour: hour.Unix(), + Minute: minute.Unix(), + InputToken: convertInt(data.AI.InputToken), + OutputToken: convertInt(data.AI.OutputToken), + TotalToken: convertInt(data.AI.TotalToken), + }) + if err != nil { + log.Printf("Failed to call AI API: %v", err) + return err + } + + log.Printf("Message processed and saved to MySQL: %+v", data) + return nil +} diff --git a/go.mod b/go.mod index 4da9e3f4..fb4396bf 100644 --- a/go.mod +++ b/go.mod @@ -11,8 +11,10 @@ require ( github.com/gabriel-vasile/mimetype v1.4.4 github.com/getkin/kin-openapi v0.127.0 github.com/gin-gonic/gin v1.10.0 + github.com/go-sql-driver/mysql v1.7.0 github.com/google/uuid v1.6.0 github.com/influxdata/influxdb-client-go/v2 v2.14.0 + github.com/nsqio/go-nsq v1.1.0 github.com/urfave/cli v1.22.16 golang.org/x/crypto v0.24.0 gopkg.in/yaml.v3 v3.0.1 @@ -37,8 +39,8 @@ require ( github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect github.com/go-playground/validator/v10 v10.20.0 // indirect - github.com/go-sql-driver/mysql v1.7.0 // indirect github.com/goccy/go-json v0.10.2 // indirect + github.com/golang/snappy v0.0.4 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/gorilla/websocket v1.4.2 // indirect github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 // indirect diff --git a/go.sum b/go.sum index 8d27d9f5..01c9b855 100644 --- a/go.sum +++ b/go.sum @@ -64,6 +64,9 @@ github.com/go-test/deep v1.0.8 h1:TDsG77qcSprGbC6vTN8OuXp5g+J+b5Pcguhf7Zt61VM= github.com/go-test/deep v1.0.8/go.mod h1:5C2ZWiW0ErCdrYzpqxLbTX7MG14M9iiw8DgHncVwcsE= github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -107,6 +110,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 h1:RWengNIwukTxcDr9M+97sNutRR1RKhG96O6jWumTTnw= github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826/go.mod h1:TaXosZuwdSHYgviHp1DAtfrULt5eUgsSMsZf+YrPgl8= +github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE= +github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY= github.com/oapi-codegen/runtime v1.0.0 h1:P4rqFX5fMFWqRzY9M/3YF9+aPSPPB06IzP2P7oOxrWo= github.com/oapi-codegen/runtime v1.0.0/go.mod h1:LmCUMQuPB4M/nLXilQXhHw+BLZdDb18B34OO356yJ/A= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= diff --git a/service/ai-api/iml.go b/service/ai-api/iml.go index 97e157c2..baa3b2d3 100644 --- a/service/ai-api/iml.go +++ b/service/ai-api/iml.go @@ -3,9 +3,10 @@ package ai_api import ( "context" "encoding/json" + "errors" "time" - "github.com/eolinker/go-common/utils" + "gorm.io/gorm" "github.com/APIParkLab/APIPark/service/universally" "github.com/APIParkLab/APIPark/stores/api" @@ -87,7 +88,9 @@ func updateHandler(e *api.AiAPIInfo, i *Edit) { if i.Disable != nil { e.Disable = *i.Disable } - + if i.UseToken != nil { + e.UseToken = *i.UseToken + } e.UpdateAt = time.Now() } @@ -97,6 +100,36 @@ type imlAPIUseService struct { store api.IAiAPIUseStore `autowired:""` } +func (i *imlAPIUseService) Incr(ctx context.Context, incr *IncrAPIUse) error { + info, err := i.store.First(ctx, map[string]interface{}{ + "api": incr.API, + "service": incr.Service, + "provider": incr.Provider, + "model": incr.Model, + "day": incr.Day, + "hour": incr.Hour, + "minute": incr.Minute, + }) + if err != nil { + if !errors.Is(err, gorm.ErrRecordNotFound) { + return err + } + info = &api.AiAPIUse{ + API: incr.API, + Service: incr.Service, + Provider: incr.Provider, + Model: incr.Model, + Day: incr.Day, + Hour: incr.Hour, + Minute: incr.Minute, + } + } + info.InputToken += incr.InputToken + info.OutputToken += incr.OutputToken + info.TotalToken += incr.TotalToken + return i.store.Save(ctx, info) +} + func (i *imlAPIUseService) SumByApisPage(ctx context.Context, providerId string, start, end int64, offset, limit int, order string, apiIds ...string) ([]*APIUse, int64, error) { list, total, err := i.store.SumByGroupPage(ctx, "api", order, offset, limit, "api,sum(input_token) as input_token,sum(output_token) as output_token,sum(total_token) as total_token", "provider = ? and api in (?) and minute >= ? and minute <= ?", providerId, apiIds, start, end) if err != nil { @@ -116,17 +149,18 @@ func (i *imlAPIUseService) SumByApisPage(ctx context.Context, providerId string, } func (i *imlAPIUseService) SumByApis(ctx context.Context, providerId string, start, end int64, apiIds ...string) ([]*APIUse, error) { - list, err := i.store.SumByGroup(ctx, "api", "api,sum(input_token) as input_token,sum(output_token) as output_token,sum(total_token) as total_token", "provider = ? and api in (?) and minute >= ? and minute <= ?", providerId, apiIds, start, end) - if err != nil { - return nil, err - } - - return utils.SliceToSlice(list, func(v *api.AiAPIUse) *APIUse { - return &APIUse{ - API: v.API, - InputToken: v.InputToken, - OutputToken: v.OutputToken, - TotalToken: v.TotalToken, - } - }), nil + //list, err := i.store.SumByGroup(ctx, "api", "api,sum(input_token) as input_token,sum(output_token) as output_token,sum(total_token) as total_token", "provider = ? and api in (?) and minute >= ? and minute <= ?", providerId, apiIds, start, end) + //if err != nil { + // return nil, err + //} + // + //return utils.SliceToSlice(list, func(v *api.AiAPIUse) *APIUse { + // return &APIUse{ + // API: v.API, + // InputToken: v.InputToken, + // OutputToken: v.OutputToken, + // TotalToken: v.TotalToken, + // } + //}), nil + return nil, nil } diff --git a/service/ai-api/model.go b/service/ai-api/model.go index 9ba1e635..b8ee9cb7 100644 --- a/service/ai-api/model.go +++ b/service/ai-api/model.go @@ -19,6 +19,7 @@ type API struct { Provider string CreateAt time.Time UpdateAt time.Time + UseToken int Creator string Updater string AdditionalConfig map[string]interface{} @@ -48,6 +49,7 @@ type Edit struct { Provider *string Model *string Disable *bool + UseToken *int AdditionalConfig *map[string]interface{} } @@ -70,6 +72,7 @@ func FromEntity(e *api.AiAPIInfo) *API { Creator: e.Creator, Updater: e.Updater, Disable: e.Disable, + UseToken: e.UseToken, AdditionalConfig: cfg, } } @@ -80,3 +83,16 @@ type APIUse struct { OutputToken int TotalToken int } + +type IncrAPIUse struct { + API string + Service string + Provider string + Model string + Day int64 + Hour int64 + Minute int64 + InputToken int + OutputToken int + TotalToken int +} diff --git a/service/ai-api/service.go b/service/ai-api/service.go index fc411a3a..de841fda 100644 --- a/service/ai-api/service.go +++ b/service/ai-api/service.go @@ -19,6 +19,7 @@ type IAPIService interface { type IAPIUseService interface { SumByApis(ctx context.Context, providerId string, start, end int64, apiIds ...string) ([]*APIUse, error) SumByApisPage(ctx context.Context, providerId string, start, end int64, page, pageSize int, order string, apiIds ...string) ([]*APIUse, int64, error) + Incr(ctx context.Context, incr *IncrAPIUse) error } func init() { diff --git a/stores/api/model.go b/stores/api/model.go index b4b1add3..1cfb6106 100644 --- a/stores/api/model.go +++ b/stores/api/model.go @@ -103,9 +103,10 @@ type AiAPIUse struct { API string `gorm:"size:36;not null;column:api;comment:API;index:api"` Service string `gorm:"size:36;not null;column:service;comment:服务;index:service"` Provider string `gorm:"size:36;not null;column:provider;comment:提供者;index:provider"` - Day int `gorm:"type:int(11);not null;column:day;comment:当前日期"` - Hour int `gorm:"type:int(11);not null;column:hour;comment:当前小时"` - Minute int `gorm:"type:int(11);not null;column:minute;comment:当前分钟"` + Model string `gorm:"size:255;not null;column:model;comment:模型"` + Day int64 `gorm:"type:int(11);not null;column:day;comment:当前日期"` + Hour int64 `gorm:"type:int(11);not null;column:hour;comment:当前小时"` + Minute int64 `gorm:"type:int(11);not null;column:minute;comment:当前分钟"` InputToken int `gorm:"type:int(11);not null;column:input_token;comment:输入token"` OutputToken int `gorm:"type:int(11);not null;column:output_token;comment:输出token"` TotalToken int `gorm:"type:int(11);not null;column:total_token;comment:总token"`