diff --git a/app/ai-event-handler/nsq.go b/app/ai-event-handler/nsq.go index 6093d930..2611a3d1 100644 --- a/app/ai-event-handler/nsq.go +++ b/app/ai-event-handler/nsq.go @@ -4,8 +4,17 @@ import ( "context" "encoding/json" "log" + "strings" "time" + ai_dto "github.com/APIParkLab/APIPark/module/ai/dto" + + "github.com/eolinker/go-common/store" + + "github.com/APIParkLab/APIPark/service/ai" + + ai_key "github.com/APIParkLab/APIPark/service/ai-key" + nsq "github.com/nsqio/go-nsq" ai_api "github.com/APIParkLab/APIPark/service/ai-api" @@ -39,8 +48,11 @@ type NSQMessage struct { // NSQHandler 处理 NSQ 消息并写入 MySQL type NSQHandler struct { - service ai_api.IAPIUseService `autowired:""` - ctx context.Context + apiUseService ai_api.IAPIUseService `autowired:""` + aiKeyService ai_key.IKeyService `autowired:""` + aiService ai.IProviderService `autowired:""` + transaction store.ITransaction `autowired:""` + ctx context.Context } func convertInt(value interface{}) int { @@ -76,25 +88,57 @@ func (h *NSQHandler) HandleMessage(message *nsq.Message) error { day := time.Date(timestamp.Year(), timestamp.Month(), timestamp.Day(), 0, 0, 0, 0, timestamp.Location()) hour := time.Date(timestamp.Year(), timestamp.Month(), timestamp.Day(), timestamp.Hour(), 0, 0, 0, timestamp.Location()) minute := time.Date(timestamp.Year(), timestamp.Month(), timestamp.Day(), timestamp.Hour(), timestamp.Minute(), 0, 0, timestamp.Location()) + return h.transaction.Transaction(context.Background(), func(ctx context.Context) error { + finalStatus := &AIProviderStatus{} + for _, s := range data.AI.ProviderStats { + status := ToKeyStatus(s.Status).Int() + keys := strings.Split(s.Key, "@") + key := keys[0] + err = h.aiKeyService.Save(ctx, key, &ai_key.Edit{ + Status: &status, + }) + if err != nil { + log.Printf("Failed to save AI key: %v", err) + return err + } + if s.Provider != data.AI.Provider { - // 调用 AI API 接口 - err = h.service.Incr(context.Background(), &ai_api.IncrAPIUse{ - API: data.API, - Service: data.Provider, - Provider: data.AI.Provider, - Model: data.AI.Model, - Day: day.Unix(), - Hour: hour.Unix(), - Minute: minute.Unix(), - InputToken: convertInt(data.AI.InputToken), - OutputToken: convertInt(data.AI.OutputToken), - TotalToken: convertInt(data.AI.TotalToken), + pStatus := ai_dto.ProviderAbnormal.Int() + err = h.aiService.Save(ctx, s.Provider, &ai.SetProvider{ + Status: &pStatus, + }) + } + finalStatus = &s + } + if finalStatus != nil { + keys := strings.Split(finalStatus.Key, "@") + err = h.aiKeyService.IncrUseToken(ctx, keys[0], convertInt(data.AI.TotalToken)) + if err != nil { + log.Printf("Failed to increment AI key token: %v", err) + return err + } + } + + // 调用 AI API 接口 + err = h.apiUseService.Incr(context.Background(), &ai_api.IncrAPIUse{ + API: data.API, + Service: data.Provider, + Provider: data.AI.Provider, + Model: data.AI.Model, + Day: day.Unix(), + Hour: hour.Unix(), + Minute: minute.Unix(), + InputToken: convertInt(data.AI.InputToken), + OutputToken: convertInt(data.AI.OutputToken), + TotalToken: convertInt(data.AI.TotalToken), + }) + if err != nil { + log.Printf("Failed to call AI API: %v", err) + return err + } + + log.Printf("Message processed and saved to MySQL: %+v", data) + return nil }) - if err != nil { - log.Printf("Failed to call AI API: %v", err) - return err - } - log.Printf("Message processed and saved to MySQL: %+v", data) - return nil } diff --git a/app/ai-event-handler/status.go b/app/ai-event-handler/status.go new file mode 100644 index 00000000..9b1ff61d --- /dev/null +++ b/app/ai-event-handler/status.go @@ -0,0 +1,34 @@ +package main + +import ai_key_dto "github.com/APIParkLab/APIPark/module/ai-key/dto" + +var ( + StatusNormal = "normal" + StatusInvalidRequest = "invalid request" + StatusQuotaExhausted = "quota exhausted" + StatusExpired = "expired" + StatusExceeded = "exceeded" + StatusInvalid = "invalid" + StatusTimeout = "timeout" +) + +func ToKeyStatus(status string) ai_key_dto.KeyStatus { + switch status { + case StatusNormal: + return ai_key_dto.KeyNormal + case StatusInvalidRequest: + return ai_key_dto.KeyNormal + case StatusQuotaExhausted: + return ai_key_dto.KeyExceed + case StatusExpired: + return ai_key_dto.KeyExpired + case StatusExceeded: + return ai_key_dto.KeyNormal + case StatusInvalid: + return ai_key_dto.KeyError + case StatusTimeout: + return ai_key_dto.KeyError + default: + return ai_key_dto.KeyNormal + } +} diff --git a/module/ai/iml.go b/module/ai/iml.go index 1b1cc324..88e73454 100644 --- a/module/ai/iml.go +++ b/module/ai/iml.go @@ -425,7 +425,7 @@ func (i *imlProviderModule) Provider(ctx context.Context, id string) (*ai_dto.Pr DefaultLLM: defaultLLM.ID(), DefaultLLMConfig: defaultLLM.DefaultConfig(), Priority: info.Priority, - Status: ai_dto.ProviderEnabled, + Status: ai_dto.ToProviderStatus(info.Status), Configured: true, }, nil } diff --git a/service/ai-api/iml.go b/service/ai-api/iml.go index baa3b2d3..97ffbe31 100644 --- a/service/ai-api/iml.go +++ b/service/ai-api/iml.go @@ -35,7 +35,7 @@ func (i *imlAPIService) OnComplete() { } func labelHandler(e *api.AiAPIInfo) []string { - return []string{e.Name, e.Uuid} + return []string{e.Name} } func uniquestHandler(i *Create) []map[string]interface{} { return []map[string]interface{}{{"uuid": i.ID}} diff --git a/service/ai-key/iml.go b/service/ai-key/iml.go index b8eb314c..302ec182 100644 --- a/service/ai-key/iml.go +++ b/service/ai-key/iml.go @@ -23,6 +23,16 @@ type imlAIKeyService struct { universally.IServiceDelete } +func (i *imlAIKeyService) IncrUseToken(ctx context.Context, id string, useToken int) error { + info, err := i.store.GetByUUID(ctx, id) + if err != nil { + return err + } + + info.UseToken += useToken + return i.store.Save(ctx, info) +} + func (i *imlAIKeyService) SearchUnExpiredByPage(ctx context.Context, w map[string]interface{}, page, pageSize int, order string) ([]*Key, int64, error) { sql := "(expire_time = 0 || expire_time > ?)" args := []interface{}{time.Now().Unix()} @@ -192,7 +202,7 @@ func (i *imlAIKeyService) OnComplete() { } func labelHandler(e *ai.Key) []string { - return []string{e.Name, e.Uuid} + return []string{e.Name} } func uniquestHandler(i *Create) []map[string]interface{} { return []map[string]interface{}{{"uuid": i.ID}} @@ -229,5 +239,6 @@ func updateHandler(e *ai.Key, i *Edit) { if i.Priority != nil { e.Sort = *i.Priority } + e.UpdateAt = time.Now() } diff --git a/service/ai-key/service.go b/service/ai-key/service.go index f427efd4..8bbe6d83 100644 --- a/service/ai-key/service.go +++ b/service/ai-key/service.go @@ -21,6 +21,7 @@ type IKeyService interface { SortAfter(ctx context.Context, provider string, originID string, targetID string) ([]*Key, error) KeysAfterPriority(ctx context.Context, providerId string, priority int) ([]*Key, error) SearchUnExpiredByPage(ctx context.Context, w map[string]interface{}, page, pageSize int, order string) ([]*Key, int64, error) + IncrUseToken(ctx context.Context, id string, useToken int) error } func init() {