AI API token quantity docking completed

This commit is contained in:
Liujian
2025-01-07 12:49:23 +08:00
parent 7fcfbb2a54
commit 1d09965d9a
9 changed files with 251 additions and 19 deletions
+1
View File
@@ -0,0 +1 @@
/config.yml
+72
View File
@@ -0,0 +1,72 @@
package main
import (
"flag"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"github.com/eolinker/go-common/autowire"
nsq "github.com/nsqio/go-nsq"
"github.com/eolinker/go-common/cftool"
_ "github.com/eolinker/go-common/store/store_mysql"
_ "github.com/go-sql-driver/mysql"
)
var (
version string
confPath string
)
func init() {
flag.StringVar(&confPath, "c", "config.yml", "`config` file path for server ")
}
type ServerConfig struct {
Port int `yaml:"port"`
}
func main() {
// 1. 连接 MySQL 数据库
cftool.Register[ServerConfig](fmt.Sprintf("root:%s", confPath))
cftool.ReadFile(confPath)
handler := &NSQHandler{}
autowire.Autowired(handler)
err := autowire.CheckComplete()
if err != nil {
log.Fatal("check autowired:", err)
return
}
// 2. 创建 NSQ 消费者
config := nsq.NewConfig()
consumer, err := nsq.NewConsumer("ai_event", "tmp", config)
if err != nil {
log.Fatalf("Failed to create NSQ consumer: %v", err)
}
consumer.AddHandler(handler)
// 4. 连接到 NSQ
nsqAddress := "172.18.166.219:9150" // NSQ 地址
err = consumer.ConnectToNSQD(nsqAddress)
if err != nil {
log.Fatalf("Failed to connect to NSQ: %v", err)
}
log.Println("Connected to NSQ")
// 5. 捕获系统信号,优雅关闭
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
// 优雅停止消费者
consumer.Stop()
<-consumer.StopChan
log.Println("NSQ Consumer stopped")
}
+100
View File
@@ -0,0 +1,100 @@
package main
import (
"context"
"encoding/json"
"log"
"time"
nsq "github.com/nsqio/go-nsq"
ai_api "github.com/APIParkLab/APIPark/service/ai-api"
)
// 定义 NSQ 消息结构
type AIProviderStatus struct {
Provider string `json:"provider"`
Model string `json:"model"`
Key string `json:"key"`
Status string `json:"status"`
}
type AIInfo struct {
Model string `json:"ai_model"`
Cost interface{} `json:"ai_model_cost"`
InputToken interface{} `json:"ai_model_input_token"`
OutputToken interface{} `json:"ai_model_output_token"`
TotalToken interface{} `json:"ai_model_total_token"`
Provider string `json:"ai_provider"`
ProviderStats []AIProviderStatus `json:"ai_provider_statuses"`
}
type NSQMessage struct {
AI AIInfo `json:"ai"`
API string `json:"api"`
Provider string `json:"provider"`
RequestID string `json:"request_id"`
TimeISO8601 string `json:"time_iso8601"`
}
// NSQHandler 处理 NSQ 消息并写入 MySQL
type NSQHandler struct {
service ai_api.IAPIUseService `autowired:""`
ctx context.Context
}
func convertInt(value interface{}) int {
switch v := value.(type) {
case int:
return v
case float64:
return int(v)
default:
return 0
}
}
// HandleMessage 处理从 NSQ 读取的消息
func (h *NSQHandler) HandleMessage(message *nsq.Message) error {
log.Printf("Received message: %s", string(message.Body))
// 解析消息为结构体
var data NSQMessage
err := json.Unmarshal(message.Body, &data)
if err != nil {
log.Printf("Failed to unmarshal message: %v", err)
return err
}
// 将时间字符串转换为 time.Time
timestamp, err := time.Parse(time.RFC3339, data.TimeISO8601)
if err != nil {
log.Printf("Failed to parse timestamp: %v", err)
return err
}
day := time.Date(timestamp.Year(), timestamp.Month(), timestamp.Day(), 0, 0, 0, 0, timestamp.Location())
hour := time.Date(timestamp.Year(), timestamp.Month(), timestamp.Day(), timestamp.Hour(), 0, 0, 0, timestamp.Location())
minute := time.Date(timestamp.Year(), timestamp.Month(), timestamp.Day(), timestamp.Hour(), timestamp.Minute(), 0, 0, timestamp.Location())
// 调用 AI API 接口
err = h.service.Incr(context.Background(), &ai_api.IncrAPIUse{
API: data.API,
Service: data.Provider,
Provider: data.AI.Provider,
Model: data.AI.Model,
Day: day.Unix(),
Hour: hour.Unix(),
Minute: minute.Unix(),
InputToken: convertInt(data.AI.InputToken),
OutputToken: convertInt(data.AI.OutputToken),
TotalToken: convertInt(data.AI.TotalToken),
})
if err != nil {
log.Printf("Failed to call AI API: %v", err)
return err
}
log.Printf("Message processed and saved to MySQL: %+v", data)
return nil
}
+3 -1
View File
@@ -11,8 +11,10 @@ require (
github.com/gabriel-vasile/mimetype v1.4.4
github.com/getkin/kin-openapi v0.127.0
github.com/gin-gonic/gin v1.10.0
github.com/go-sql-driver/mysql v1.7.0
github.com/google/uuid v1.6.0
github.com/influxdata/influxdb-client-go/v2 v2.14.0
github.com/nsqio/go-nsq v1.1.0
github.com/urfave/cli v1.22.16
golang.org/x/crypto v0.24.0
gopkg.in/yaml.v3 v3.0.1
@@ -37,8 +39,8 @@ require (
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.20.0 // indirect
github.com/go-sql-driver/mysql v1.7.0 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 // indirect
+5
View File
@@ -64,6 +64,9 @@ github.com/go-test/deep v1.0.8 h1:TDsG77qcSprGbC6vTN8OuXp5g+J+b5Pcguhf7Zt61VM=
github.com/go-test/deep v1.0.8/go.mod h1:5C2ZWiW0ErCdrYzpqxLbTX7MG14M9iiw8DgHncVwcsE=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
@@ -107,6 +110,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 h1:RWengNIwukTxcDr9M+97sNutRR1RKhG96O6jWumTTnw=
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826/go.mod h1:TaXosZuwdSHYgviHp1DAtfrULt5eUgsSMsZf+YrPgl8=
github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE=
github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
github.com/oapi-codegen/runtime v1.0.0 h1:P4rqFX5fMFWqRzY9M/3YF9+aPSPPB06IzP2P7oOxrWo=
github.com/oapi-codegen/runtime v1.0.0/go.mod h1:LmCUMQuPB4M/nLXilQXhHw+BLZdDb18B34OO356yJ/A=
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
+49 -15
View File
@@ -3,9 +3,10 @@ package ai_api
import (
"context"
"encoding/json"
"errors"
"time"
"github.com/eolinker/go-common/utils"
"gorm.io/gorm"
"github.com/APIParkLab/APIPark/service/universally"
"github.com/APIParkLab/APIPark/stores/api"
@@ -87,7 +88,9 @@ func updateHandler(e *api.AiAPIInfo, i *Edit) {
if i.Disable != nil {
e.Disable = *i.Disable
}
if i.UseToken != nil {
e.UseToken = *i.UseToken
}
e.UpdateAt = time.Now()
}
@@ -97,6 +100,36 @@ type imlAPIUseService struct {
store api.IAiAPIUseStore `autowired:""`
}
func (i *imlAPIUseService) Incr(ctx context.Context, incr *IncrAPIUse) error {
info, err := i.store.First(ctx, map[string]interface{}{
"api": incr.API,
"service": incr.Service,
"provider": incr.Provider,
"model": incr.Model,
"day": incr.Day,
"hour": incr.Hour,
"minute": incr.Minute,
})
if err != nil {
if !errors.Is(err, gorm.ErrRecordNotFound) {
return err
}
info = &api.AiAPIUse{
API: incr.API,
Service: incr.Service,
Provider: incr.Provider,
Model: incr.Model,
Day: incr.Day,
Hour: incr.Hour,
Minute: incr.Minute,
}
}
info.InputToken += incr.InputToken
info.OutputToken += incr.OutputToken
info.TotalToken += incr.TotalToken
return i.store.Save(ctx, info)
}
func (i *imlAPIUseService) SumByApisPage(ctx context.Context, providerId string, start, end int64, offset, limit int, order string, apiIds ...string) ([]*APIUse, int64, error) {
list, total, err := i.store.SumByGroupPage(ctx, "api", order, offset, limit, "api,sum(input_token) as input_token,sum(output_token) as output_token,sum(total_token) as total_token", "provider = ? and api in (?) and minute >= ? and minute <= ?", providerId, apiIds, start, end)
if err != nil {
@@ -116,17 +149,18 @@ func (i *imlAPIUseService) SumByApisPage(ctx context.Context, providerId string,
}
func (i *imlAPIUseService) SumByApis(ctx context.Context, providerId string, start, end int64, apiIds ...string) ([]*APIUse, error) {
list, err := i.store.SumByGroup(ctx, "api", "api,sum(input_token) as input_token,sum(output_token) as output_token,sum(total_token) as total_token", "provider = ? and api in (?) and minute >= ? and minute <= ?", providerId, apiIds, start, end)
if err != nil {
return nil, err
}
return utils.SliceToSlice(list, func(v *api.AiAPIUse) *APIUse {
return &APIUse{
API: v.API,
InputToken: v.InputToken,
OutputToken: v.OutputToken,
TotalToken: v.TotalToken,
}
}), nil
//list, err := i.store.SumByGroup(ctx, "api", "api,sum(input_token) as input_token,sum(output_token) as output_token,sum(total_token) as total_token", "provider = ? and api in (?) and minute >= ? and minute <= ?", providerId, apiIds, start, end)
//if err != nil {
// return nil, err
//}
//
//return utils.SliceToSlice(list, func(v *api.AiAPIUse) *APIUse {
// return &APIUse{
// API: v.API,
// InputToken: v.InputToken,
// OutputToken: v.OutputToken,
// TotalToken: v.TotalToken,
// }
//}), nil
return nil, nil
}
+16
View File
@@ -19,6 +19,7 @@ type API struct {
Provider string
CreateAt time.Time
UpdateAt time.Time
UseToken int
Creator string
Updater string
AdditionalConfig map[string]interface{}
@@ -48,6 +49,7 @@ type Edit struct {
Provider *string
Model *string
Disable *bool
UseToken *int
AdditionalConfig *map[string]interface{}
}
@@ -70,6 +72,7 @@ func FromEntity(e *api.AiAPIInfo) *API {
Creator: e.Creator,
Updater: e.Updater,
Disable: e.Disable,
UseToken: e.UseToken,
AdditionalConfig: cfg,
}
}
@@ -80,3 +83,16 @@ type APIUse struct {
OutputToken int
TotalToken int
}
type IncrAPIUse struct {
API string
Service string
Provider string
Model string
Day int64
Hour int64
Minute int64
InputToken int
OutputToken int
TotalToken int
}
+1
View File
@@ -19,6 +19,7 @@ type IAPIService interface {
type IAPIUseService interface {
SumByApis(ctx context.Context, providerId string, start, end int64, apiIds ...string) ([]*APIUse, error)
SumByApisPage(ctx context.Context, providerId string, start, end int64, page, pageSize int, order string, apiIds ...string) ([]*APIUse, int64, error)
Incr(ctx context.Context, incr *IncrAPIUse) error
}
func init() {
+4 -3
View File
@@ -103,9 +103,10 @@ type AiAPIUse struct {
API string `gorm:"size:36;not null;column:api;comment:API;index:api"`
Service string `gorm:"size:36;not null;column:service;comment:服务;index:service"`
Provider string `gorm:"size:36;not null;column:provider;comment:提供者;index:provider"`
Day int `gorm:"type:int(11);not null;column:day;comment:当前日期"`
Hour int `gorm:"type:int(11);not null;column:hour;comment:当前小时"`
Minute int `gorm:"type:int(11);not null;column:minute;comment:当前分钟"`
Model string `gorm:"size:255;not null;column:model;comment:模型"`
Day int64 `gorm:"type:int(11);not null;column:day;comment:当前日期"`
Hour int64 `gorm:"type:int(11);not null;column:hour;comment:当前小时"`
Minute int64 `gorm:"type:int(11);not null;column:minute;comment:当前分钟"`
InputToken int `gorm:"type:int(11);not null;column:input_token;comment:输入token"`
OutputToken int `gorm:"type:int(11);not null;column:output_token;comment:输出token"`
TotalToken int `gorm:"type:int(11);not null;column:total_token;comment:总token"`