package gaia import ( "context" "encoding/json" "fmt" "github.com/flipped-aurora/gin-vue-admin/server/model/system" "github.com/flipped-aurora/gin-vue-admin/server/utils" "strconv" "strings" "sync" "time" "github.com/flipped-aurora/gin-vue-admin/server/global" "github.com/flipped-aurora/gin-vue-admin/server/model/gaia" ) // 全局工作池实例 var globalWorkerPool *WorkerPool // WorkerPool 工作池管理器 type WorkerPool struct { ctx context.Context cancel context.CancelFunc totalWorkers int // 总工作器数量 userWorkers map[uint]*gaia.UserWorkerAllocation // 每个用户的工作器分配 userTaskChan map[uint]chan *gaia.BatchWorkflowTask // 每个用户的任务队列 runningWorkers map[uint]int // 每个用户当前运行的worker数量 wg sync.WaitGroup batchService *BatchWorkflowService running bool mutex sync.RWMutex userMutex sync.RWMutex } // NewWorkerPool 创建新的工作池 func NewWorkerPool(totalWorkers int) *WorkerPool { ctx, cancel := context.WithCancel(context.Background()) return &WorkerPool{ ctx: ctx, cancel: cancel, totalWorkers: totalWorkers, userWorkers: make(map[uint]*gaia.UserWorkerAllocation), userTaskChan: make(map[uint]chan *gaia.BatchWorkflowTask), runningWorkers: make(map[uint]int), batchService: &BatchWorkflowService{}, running: false, } } // calculateWorkerCountWithErrorPenalty 根据错误次数计算工作器数量 // 每50个错误减少1个并发位,最少保留1个并发位 func (wp *WorkerPool) calculateWorkerCountWithErrorPenalty(baseWorkers int, errorCount int) int { if baseWorkers <= 0 { return 1 } // 计算错误惩罚:每50个错误减少1个并发位 penalty := errorCount / gaia.ErrorPenaltyThreshold adjustedWorkers := baseWorkers - penalty // 确保至少保留1个并发位 if adjustedWorkers < 1 { adjustedWorkers = 1 } return adjustedWorkers } // calculateUserWorkerAllocation 计算用户工作器分配 func (wp *WorkerPool) calculateUserWorkerAllocation() { wp.userMutex.Lock() defer wp.userMutex.Unlock() // 获取有批量任务的活跃用户(按需分配) // 排除超过重试次数的任务,只考虑可以继续处理的任务 // 分两个查询:1. 获取有活跃任务的用户,2. 获取用户的累计错误次数 // 第一个查询:获取有活跃批量任务的用户 var activeUserIDs []uint err := global.GVA_DB.Raw(` SELECT DISTINCT bw.user_id FROM batch_workflows_extend bw INNER JOIN sys_users su ON bw.user_id = su.id INNER JOIN batch_workflow_tasks_extend bwt ON bw.id = bwt.batch_workflow_id WHERE su.enable = ? AND bw.status IN (?, ?) AND (bwt.status IN (?, ?) AND bwt.error_count < ?) `, system.UserActive, gaia.BatchWorkflowStatusPending, gaia.BatchWorkflowStatusProcessing, gaia.BatchTaskStatusPending, gaia.BatchTaskStatusQueued, gaia.MaxTaskRetryCount).Scan(&activeUserIDs).Error if err != nil { global.GVA_LOG.Error("获取有批量任务的活跃用户失败: " + err.Error()) return } // 第二个查询:获取这些用户的累计错误次数和最小 total_rows(pending 状态工作流) var userErrorInfos []gaia.UserErrorInfo if len(activeUserIDs) > 0 { err = global.GVA_DB.Raw(` SELECT bw.user_id, COALESCE(SUM(bw.error_count), 0) AS error_count, COALESCE(SUM(bw.total_rows), 0) AS total_rows FROM batch_workflows_extend bw WHERE bw.user_id IN (?) AND bw.status='pending' GROUP BY bw.user_id `, activeUserIDs).Scan(&userErrorInfos).Error if err != nil { global.GVA_LOG.Error("获取用户累计错误次数失败: " + err.Error()) return } } // 提取活跃用户ID列表、错误次数映射和 total_rows 映射 userErrorMap := make(map[uint]int) userTotalRowsMap := make(map[uint]int) for _, info := range userErrorInfos { userErrorMap[info.UserID] = info.ErrorCount userTotalRowsMap[info.UserID] = info.TotalRows } userCount := len(activeUserIDs) if userCount == 0 { // 如果没有用户有批量任务,关闭所有队列 for _, ch := range wp.userTaskChan { close(ch) } wp.userWorkers = make(map[uint]*gaia.UserWorkerAllocation) wp.userTaskChan = make(map[uint]chan *gaia.BatchWorkflowTask) wp.runningWorkers = make(map[uint]int) return } // 创建活跃用户ID集合 activeUserIDMap := make(map[uint]bool) for _, userID := range activeUserIDs { activeUserIDMap[userID] = true } // 关闭不再有批量任务的用户的任务队列 for userID, ch := range wp.userTaskChan { if !activeUserIDMap[userID] { close(ch) delete(wp.userTaskChan, userID) delete(wp.userWorkers, userID) delete(wp.runningWorkers, userID) } } // 存储新的分配计算结果 newAllocations := make(map[uint]*gaia.UserWorkerAllocation) // 计算每个用户的优先级权重(total_rows 越小权重越大) // 权重:tier1=4, tier2=3, tier3=2, tier4=1 priorityWeightOf := func(totalRows int) int { switch { case totalRows <= gaia.PriorityTier1MaxRows: return 4 case totalRows <= gaia.PriorityTier2MaxRows: return 3 case totalRows <= gaia.PriorityTier3MaxRows: return 2 default: return 1 } } // 计算所有用户的权重总和,用于按比例分配 worker totalWeight := 0 for _, userID := range activeUserIDs { totalWeight += priorityWeightOf(userTotalRowsMap[userID]) } if totalWeight == 0 { totalWeight = userCount } // UserAllocationInfo 单用户分配中间计算结构 type UserAllocationInfo struct { UserID uint BaseWorkers int ActualWorkers int ErrorCount int PenaltyReduced int } // 按权重比例为每个用户分配基础 worker 数,余数补给权重最高的用户 userAllocations := make([]UserAllocationInfo, 0, userCount) allocatedBase := 0 for _, userID := range activeUserIDs { w := priorityWeightOf(userTotalRowsMap[userID]) base := wp.totalWorkers * w / totalWeight if base < 1 { base = 1 } userAllocations = append(userAllocations, UserAllocationInfo{ UserID: userID, BaseWorkers: base, ErrorCount: userErrorMap[userID], }) allocatedBase += base } // 将剩余 worker 补给权重最大的用户(已按 activeUserIDs 顺序,这里找最大权重的) remainder := wp.totalWorkers - allocatedBase if remainder > 0 { // 找权重最大的用户索引 maxW, maxIdx := 0, 0 for i, alloc := range userAllocations { w := priorityWeightOf(userTotalRowsMap[alloc.UserID]) if w > maxW { maxW, maxIdx = w, i } } userAllocations[maxIdx].BaseWorkers += remainder } // 应用错误惩罚并收集被释放的 worker totalPenaltyReduced := 0 for i := range userAllocations { actual := wp.calculateWorkerCountWithErrorPenalty( userAllocations[i].BaseWorkers, userAllocations[i].ErrorCount) userAllocations[i].PenaltyReduced = userAllocations[i].BaseWorkers - actual userAllocations[i].ActualWorkers = actual totalPenaltyReduced += userAllocations[i].PenaltyReduced } // 将惩罚释放的 worker 重新分配给无惩罚用户(按权重优先) if totalPenaltyReduced > 0 { // 按权重降序、错误数升序排序,优先分配给高优先级无惩罚用户 for i := 0; i < len(userAllocations)-1; i++ { for j := i + 1; j < len(userAllocations); j++ { wi := priorityWeightOf(userTotalRowsMap[userAllocations[i].UserID]) wj := priorityWeightOf(userTotalRowsMap[userAllocations[j].UserID]) if wi < wj || (wi == wj && userAllocations[i].ErrorCount > userAllocations[j].ErrorCount) { userAllocations[i], userAllocations[j] = userAllocations[j], userAllocations[i] } } } remainingToDistribute := totalPenaltyReduced eligibleUsers := 0 for _, a := range userAllocations { if a.PenaltyReduced == 0 { eligibleUsers++ } } if eligibleUsers > 0 { for i := 0; i < len(userAllocations) && remainingToDistribute > 0; i++ { if userAllocations[i].PenaltyReduced == 0 { extra := remainingToDistribute / eligibleUsers if extra < 1 { extra = 1 } if extra > remainingToDistribute { extra = remainingToDistribute } userAllocations[i].ActualWorkers += extra remainingToDistribute -= extra eligibleUsers-- } } } } // 写入最终分配,超出总量时截断(降级场景) allocatedWorkers := 0 for _, allocation := range userAllocations { workers := allocation.ActualWorkers remaining := wp.totalWorkers - allocatedWorkers if workers > remaining { workers = remaining } if workers > 0 { newAllocations[allocation.UserID] = &gaia.UserWorkerAllocation{ UserID: allocation.UserID, Workers: workers, MaxLimit: wp.totalWorkers, } allocatedWorkers += workers } if allocatedWorkers >= wp.totalWorkers { break } } if userCount > wp.totalWorkers { global.GVA_LOG.Warn(fmt.Sprintf("降级分配完成 - 总工作器: %d, 用户数: %d, 已分配: %d, 平均每用户: %.1f个", wp.totalWorkers, userCount, allocatedWorkers, float64(allocatedWorkers)/float64(userCount))) } // 应用新的分配,只更新有变化的用户 for userID, newAllocation := range newAllocations { oldAllocation, exists := wp.userWorkers[userID] if !exists { // 新用户,创建分配和任务队列 wp.userWorkers[userID] = newAllocation wp.userTaskChan[userID] = make(chan *gaia.BatchWorkflowTask, newAllocation.Workers*2) } else if oldAllocation.Workers != newAllocation.Workers { // 现有用户的工作器数量发生变化,需要重新创建任务队列 close(wp.userTaskChan[userID]) wp.userWorkers[userID] = newAllocation wp.userTaskChan[userID] = make(chan *gaia.BatchWorkflowTask, newAllocation.Workers*2) // 重置运行中的worker计数,让adjustWorkers重新启动 wp.runningWorkers[userID] = 0 } else { // 工作器数量没有变化,只更新分配信息 wp.userWorkers[userID] = newAllocation } } } // getUserWorkerCount 获取指定用户的工作器数量 func (wp *WorkerPool) getUserWorkerCount(userID uint) int { wp.userMutex.RLock() defer wp.userMutex.RUnlock() if allocation, exists := wp.userWorkers[userID]; exists { return allocation.Workers } return 0 } // Start 启动工作池 func (wp *WorkerPool) Start() { wp.mutex.Lock() defer wp.mutex.Unlock() if wp.running { return } // 计算用户工作器分配 wp.calculateUserWorkerAllocation() wp.running = true // 启动初始工作器 wp.startWorkers() // 启动任务调度器 wp.wg.Add(1) go wp.taskScheduler() // 启动用户工作器分配更新器 wp.wg.Add(1) go wp.userAllocationUpdater() // 启动动态工作器管理器 wp.wg.Add(1) go wp.dynamicWorkerManager() } // Stop 停止工作池 func (wp *WorkerPool) Stop() { wp.mutex.Lock() defer wp.mutex.Unlock() if !wp.running { return } wp.cancel() wp.running = false // 关闭所有用户的任务队列 wp.userMutex.Lock() for _, ch := range wp.userTaskChan { close(ch) } wp.userMutex.Unlock() // 等待所有goroutine完成 wp.wg.Wait() } // IsRunning 检查工作池是否运行中 func (wp *WorkerPool) IsRunning() bool { wp.mutex.RLock() defer wp.mutex.RUnlock() return wp.running } // GetStatus 获取工作池状态 func (wp *WorkerPool) GetStatus() map[string]interface{} { wp.mutex.RLock() defer wp.mutex.RUnlock() wp.userMutex.RLock() defer wp.userMutex.RUnlock() userAllocations := make(map[string]interface{}) for userID, allocation := range wp.userWorkers { userAllocations[fmt.Sprintf("user_%d", userID)] = allocation } // 计算所有用户队列的总长度 totalQueueLength := 0 userQueueLengths := make(map[string]int) for userID, ch := range wp.userTaskChan { queueLen := len(ch) totalQueueLength += queueLen userQueueLengths[fmt.Sprintf("user_%d", userID)] = queueLen } return map[string]interface{}{ "running": wp.running, "total_workers": wp.totalWorkers, "total_queue_length": totalQueueLength, "user_queue_lengths": userQueueLengths, "user_allocations": userAllocations, } } // userAllocationUpdater 用户工作器分配更新器 func (wp *WorkerPool) userAllocationUpdater() { defer wp.wg.Done() ticker := time.NewTicker(30 * time.Second) // 每30秒检查一次用户变化 defer ticker.Stop() for { select { case <-wp.ctx.Done(): return case <-ticker.C: wp.calculateUserWorkerAllocation() } } } // dynamicWorkerManager 动态工作器管理器 func (wp *WorkerPool) dynamicWorkerManager() { defer wp.wg.Done() defer global.GVA_LOG.Info("动态工作器管理器停止") ticker := time.NewTicker(10 * time.Second) // 每10秒检查一次工作器状态 defer ticker.Stop() for { select { case <-wp.ctx.Done(): return case <-ticker.C: wp.adjustWorkers() } } } // startWorkers 启动工作器 func (wp *WorkerPool) startWorkers() { wp.userMutex.Lock() defer wp.userMutex.Unlock() for userID, allocation := range wp.userWorkers { // 启动所有需要的worker for i := 0; i < allocation.Workers; i++ { wp.wg.Add(1) workerID := fmt.Sprintf("user_%d_worker_%d", userID, i) go wp.worker(workerID, userID) } // 更新运行中的worker数量 wp.runningWorkers[userID] = allocation.Workers } } // adjustWorkers 调整工作器数量 func (wp *WorkerPool) adjustWorkers() { // 重新计算用户分配 wp.calculateUserWorkerAllocation() // 检查哪些用户的工作器数量发生了变化,为它们启动新worker wp.userMutex.Lock() defer wp.userMutex.Unlock() for userID, allocation := range wp.userWorkers { runningCount := wp.runningWorkers[userID] neededCount := allocation.Workers if runningCount < neededCount { // 需要启动更多worker for i := runningCount; i < neededCount; i++ { wp.wg.Add(1) workerID := fmt.Sprintf("user_%d_worker_%d", userID, i) go wp.worker(workerID, userID) } wp.runningWorkers[userID] = neededCount } } } // worker 工作协程 func (wp *WorkerPool) worker(workerID string, userID uint) { defer wp.wg.Done() defer func() { // Worker退出时减少运行中的worker计数 wp.userMutex.Lock() if wp.runningWorkers[userID] > 0 { wp.runningWorkers[userID]-- } wp.userMutex.Unlock() }() // 获取用户专属的任务队列 wp.userMutex.RLock() userTaskChan, exists := wp.userTaskChan[userID] wp.userMutex.RUnlock() if !exists { global.GVA_LOG.Error(fmt.Sprintf("Worker %s: 用户 %d 的任务队列不存在", workerID, userID)) return } for { select { case <-wp.ctx.Done(): return case task, ok := <-userTaskChan: if !ok { // 任务队列已关闭 return } if task != nil { wp.processTask(task) } } } } // taskScheduler 任务调度器 func (wp *WorkerPool) taskScheduler() { defer wp.wg.Done() ticker := time.NewTicker(2 * time.Second) // 每2秒检查一次新任务 defer ticker.Stop() for { select { case <-wp.ctx.Done(): return case <-ticker.C: global.GVA_LOG.Debug("任务调度器开始检查新任务...") wp.fetchAndScheduleTasks() } } } // fetchAndScheduleTasks 获取并调度任务 func (wp *WorkerPool) fetchAndScheduleTasks() { global.GVA_LOG.Debug("fetchAndScheduleTasks 开始执行") if global.GVA_DB == nil { global.GVA_LOG.Error("数据库连接为空,无法获取任务") return } // 获取所有待处理的任务,但排除已停止的批量工作流的任务和超过重试次数的任务 var tasks []gaia.BatchWorkflowTask err := global.GVA_DB.Table("batch_workflow_tasks_extend bwt"). Select("bwt.*"). Joins("INNER JOIN batch_workflows_extend bw ON bwt.batch_workflow_id = bw.id"). Where("bwt.status = ? AND bw.status != ? AND bwt.error_count < ?", gaia.BatchTaskStatusPending, gaia.BatchWorkflowStatusStopped, gaia.MaxTaskRetryCount). Order("bwt.created_at ASC"). Find(&tasks).Error if err != nil { global.GVA_LOG.Error("获取待处理任务失败: " + err.Error()) return } // 按用户分组任务 userTasks := make(map[uint][]*gaia.BatchWorkflowTask) for i := range tasks { task := &tasks[i] // 获取任务对应的用户ID var batchWorkflow gaia.BatchWorkflow if err = global.GVA_DB.Where("id = ?", task.BatchWorkflowID).First(&batchWorkflow).Error; err != nil { global.GVA_LOG.Error(fmt.Sprintf("找不到任务 %s 对应的批量工作流 %s: %s", task.ID, task.BatchWorkflowID, err.Error())) continue } userTasks[batchWorkflow.UserID] = append(userTasks[batchWorkflow.UserID], task) } // 在分配任务前,再次清理已停止的批量工作流任务 cleanupStoppedBatchWorkflowTasks() // 为每个用户分配任务到队列 for userID, userTaskList := range userTasks { userWorkerCount := wp.getUserWorkerCount(userID) if userWorkerCount == 0 { continue } // 限制任务数量 if len(userTaskList) > userWorkerCount { userTaskList = userTaskList[:userWorkerCount] } // 获取用户专属的任务队列 wp.userMutex.RLock() userTaskChan, exists := wp.userTaskChan[userID] wp.userMutex.RUnlock() if !exists { continue } // 将任务标记为排队状态并加入队列 for _, task := range userTaskList { // 更新任务状态为排队中 if err = global.GVA_DB.Model(task).Update("status", gaia.BatchTaskStatusQueued).Error; err != nil { global.GVA_LOG.Error(fmt.Sprintf("更新任务状态失败: %s", err.Error())) continue } // 非阻塞方式添加到用户专属队列 select { case userTaskChan <- task: //global.GVA_LOG.Info(fmt.Sprintf("成功将任务 %s 添加到用户 %d 的队列", task.ID, userID)) case <-wp.ctx.Done(): return default: // 队列满了,将任务状态改回pending //global.GVA_LOG.Warn(fmt.Sprintf("用户 %d 的队列已满,任务 %s 状态改回pending", userID, task.ID)) global.GVA_DB.Model(task).Update("status", gaia.BatchTaskStatusPending) } } if len(userTaskList) > 0 { global.GVA_LOG.Info(fmt.Sprintf("为用户 %d 调度了 %d 个任务到队列", userID, len(userTaskList))) } } } // processTask 处理单个任务 func (wp *WorkerPool) processTask(task *gaia.BatchWorkflowTask) { // 更新任务状态为运行中 if err := global.GVA_DB.Model(task).Updates(map[string]interface{}{ "status": gaia.BatchTaskStatusRunning, "updated_at": time.Now(), }).Error; err != nil { global.GVA_LOG.Error(fmt.Sprintf("更新任务状态失败: %s", err.Error())) return } // 获取批量工作流信息 var batchWorkflow gaia.BatchWorkflow if err := global.GVA_DB.Where("id = ?", task.BatchWorkflowID).First(&batchWorkflow).Error; err != nil { wp.updateTaskError(task, "获取批量工作流信息失败: "+err.Error()) return } // 检查批量工作流是否被停止 if batchWorkflow.Status == gaia.BatchWorkflowStatusStopped { wp.updateTaskError(task, "批量工作流已被停止") return } // 解析输入参数 var inputs map[string]string if err := json.Unmarshal([]byte(task.Inputs), &inputs); err != nil { wp.updateTaskError(task, "解析输入参数失败: "+err.Error()) return } // 检查输入参数是否全为空值 hasNonEmptyValue := false for _, value := range inputs { if strings.TrimSpace(value) != "" { hasNonEmptyValue = true break } } // 如果所有输入都为空,跳过处理并标记为完成 if !hasNonEmptyValue { global.GVA_LOG.Info(fmt.Sprintf("任务 %s 包含全空值输入,跳过处理并标记为完成", task.ID)) // 创建空结果并标记为完成 emptyResultJSON, _ := json.Marshal(map[string]interface{}{ "status": gaia.BatchTaskStatusCompleted, "message": "跳过空值输入任务", "outputs": map[string]interface{}{ "text": "输入为空,已跳过处理", }, }) // 更新任务状态为完成 if err := global.GVA_DB.Model(task).Updates(map[string]interface{}{ "status": gaia.BatchTaskStatusCompleted, "result": string(emptyResultJSON), "updated_at": time.Now(), }).Error; err != nil { global.GVA_LOG.Error(fmt.Sprintf("更新空值任务状态失败: %s", err.Error())) return } // 更新批量处理的已处理行数 global.GVA_DB.Exec("UPDATE batch_workflows_extend SET processed_rows = processed_rows + 1, updated_at = ? WHERE id = ?", time.Now(), batchWorkflow.ID) // 检查批量工作流是否完成 wp.checkBatchWorkflowCompletion(batchWorkflow.ID) return } // 快速生成即时token和CSRF token var err error var token string var csrfToken string var user system.SysUser if err = global.GVA_DB.Where( "id = ? AND enable = ?", batchWorkflow.UserID, system.UserActive).First(&user).Error; err != nil { wp.updateTaskError(task, "用户不存在: "+err.Error()) return } // 生成这个用户的token和CSRF token if token, csrfToken, _, err = utils.LoginTokenWithCSRF(&user); err != nil { wp.updateTaskError(task, "用户token生成失败: "+err.Error()) return } // 调用Dify API result, err := wp.batchService.callDifyAPI(batchWorkflow.InstalledID, token, csrfToken, inputs) if err != nil { // 检查是否是余额不足错误(403状态码) if strings.Contains(err.Error(), "状态码: 403") && strings.Contains( err.Error(), "Insufficient balance") { global.GVA_LOG.Warn(fmt.Sprintf( "用户 %d 余额不足,将其所有pending和processing状态的批量工作流和任务设置为失败", batchWorkflow.UserID)) wp.handleInsufficientBalance(batchWorkflow.UserID, task.BatchWorkflowID) wp.updateTaskError(task, gaia.ErrorInsufficientBalance) return } wp.updateTaskError(task, gaia.ErrorCallAPIFailed+": "+err.Error()) return } // 解析返回结果,检查是否有错误 var apiResult map[string]interface{} if err = json.Unmarshal([]byte(result), &apiResult); err != nil { wp.updateTaskError(task, gaia.ErrorParseResultFailed+": "+err.Error()) return } // 检查API返回的状态 if status, ok := apiResult["status"].(string); ok && status == gaia.BatchTaskStatusFailed { // API执行失败,提取错误信息 var apiError string errorMsg := gaia.ErrorWorkflowFailed if apiError, ok = apiResult["error"].(string); ok && apiError != "" { errorMsg = apiError } // 检查是否是余额不足错误 if strings.Contains(result, "call failed") || strings.Contains( apiError, "Insufficient balance") { global.GVA_LOG.Warn(fmt.Sprintf( "用户 %d 余额不足,将其所有pending和processing状态的批量工作流和任务设置为失败", batchWorkflow.UserID)) wp.handleInsufficientBalance(batchWorkflow.UserID, task.BatchWorkflowID) wp.updateTaskError(task, gaia.ErrorInsufficientBalance) return } // 其他类型的失败,标记为失败状态 wp.updateTaskError(task, errorMsg) return } // API执行成功,更新任务结果 if err = global.GVA_DB.Model(task).Updates(map[string]interface{}{ "status": gaia.BatchTaskStatusCompleted, "result": result, "updated_at": time.Now(), }).Error; err != nil { global.GVA_LOG.Error(fmt.Sprintf("更新任务结果失败: %s", err.Error())) return } // 更新批量处理的已处理行数 global.GVA_DB.Exec( "UPDATE batch_workflows_extend SET processed_rows = processed_rows + 1, updated_at = ? WHERE id = ?", time.Now(), batchWorkflow.ID) // 检查批量工作流是否完成 wp.checkBatchWorkflowCompletion(batchWorkflow.ID) } // decodeUnicodeEscapes 解码字符串中的 Unicode 转义序列 func decodeUnicodeEscapes(input string) string { // 尝试将字符串作为带引号的字符串进行解码 if decoded, err := strconv.Unquote(`"` + input + `"`); err == nil { return decoded } // 如果直接解码失败,尝试逐个替换 Unicode 转义序列 // 处理类似 \u897f\u73ed\u7259\u7ad9 这样的转义序列 result := input for { // 查找下一个 \u 序列的起始位置 startIdx := strings.Index(result, "\\u") if startIdx == -1 { break } // 检查是否有足够的字符来形成一个完整的 Unicode 转义序列 if startIdx+6 > len(result) { break } // 提取 Unicode 转义序列(包括 \u 和 4 位十六进制数字) unicodeEscape := result[startIdx : startIdx+6] // 尝试解码这个单独的 Unicode 转义序列 if decoded, err := strconv.Unquote(`"` + unicodeEscape + `"`); err == nil { // 替换原字符串中的转义序列 result = result[:startIdx] + decoded + result[startIdx+6:] } else { // 如果解码失败,跳过这个序列,防止无限循环 result = result[:startIdx] + "?" + result[startIdx+6:] } } return result } // updateTaskError 更新任务错误信息 func (wp *WorkerPool) updateTaskError(task *gaia.BatchWorkflowTask, errorMsg string) { // 解码错误信息中的 Unicode 转义序列 decodedErrorMsg := decodeUnicodeEscapes(errorMsg) global.GVA_LOG.Error(fmt.Sprintf("任务 %s 失败: %s", task.ID, decodedErrorMsg)) // 增加错误次数 newErrorCount := task.ErrorCount + 1 // 更新批量工作流的错误次数和错误信息 if err := global.GVA_DB.Exec( "UPDATE batch_workflows_extend SET error_count = error_count + 1, error = ?, updated_at = ? WHERE id = ?", decodedErrorMsg, time.Now(), task.BatchWorkflowID).Error; err != nil { global.GVA_LOG.Error(fmt.Sprintf("更新批量工作流错误次数和错误信息失败: %s", err.Error())) } // 检查是否超过最大重试次数 if newErrorCount >= gaia.MaxTaskRetryCount { // 超过重试次数,标记为最终失败 if err := global.GVA_DB.Model(task).Updates(map[string]interface{}{ "status": gaia.BatchTaskStatusFailed, "error": fmt.Sprintf("%s: %s", gaia.ErrorMaxRetryExceeded, decodedErrorMsg), "error_count": newErrorCount, "updated_at": time.Now(), }).Error; err != nil { global.GVA_LOG.Error(fmt.Sprintf("更新任务最终失败状态失败: %s", err.Error())) } global.GVA_LOG.Warn(fmt.Sprintf("任务 %s 重试次数已达上限(%d次),标记为最终失败", task.ID, gaia.MaxTaskRetryCount)) } else { // 未超过重试次数,重置为pending状态以便重试 if err := global.GVA_DB.Model(task).Updates(map[string]interface{}{ "status": gaia.BatchTaskStatusPending, "error": decodedErrorMsg, "error_count": newErrorCount, "updated_at": time.Now(), }).Error; err != nil { global.GVA_LOG.Error(fmt.Sprintf("更新任务重试状态失败: %s", err.Error())) } global.GVA_LOG.Info(fmt.Sprintf("任务 %s 第%d次失败,重置为pending状态准备重试", task.ID, newErrorCount)) } // 检查批量工作流状态 wp.checkBatchWorkflowCompletion(task.BatchWorkflowID) } // checkBatchWorkflowCompletion 检查批量工作流是否完成 func (wp *WorkerPool) checkBatchWorkflowCompletion(batchWorkflowID string) { var batchWorkflow gaia.BatchWorkflow if err := global.GVA_DB.Where("id = ?", batchWorkflowID).First( &batchWorkflow).Error; err != nil { return } // 统计任务状态 var pendingCount, queuedCount, runningCount, completedCount, failedCount int64 global.GVA_DB.Model(&gaia.BatchWorkflowTask{}).Where( "batch_workflow_id = ? AND status = ?", batchWorkflowID, gaia.BatchTaskStatusPending).Count(&pendingCount) global.GVA_DB.Model(&gaia.BatchWorkflowTask{}).Where( "batch_workflow_id = ? AND status = ?", batchWorkflowID, gaia.BatchTaskStatusQueued).Count(&queuedCount) global.GVA_DB.Model(&gaia.BatchWorkflowTask{}).Where( "batch_workflow_id = ? AND status = ?", batchWorkflowID, gaia.BatchTaskStatusRunning).Count(&runningCount) global.GVA_DB.Model(&gaia.BatchWorkflowTask{}).Where( "batch_workflow_id = ? AND status = ?", batchWorkflowID, gaia.BatchTaskStatusCompleted).Count(&completedCount) global.GVA_DB.Model(&gaia.BatchWorkflowTask{}).Where( "batch_workflow_id = ? AND status = ?", batchWorkflowID, gaia.BatchTaskStatusFailed).Count(&failedCount) // 如果所有任务都已完成 if completedCount == int64(batchWorkflow.TotalRows) { // 重置错误计数并更新状态,同时同步 processed_rows 确保数据一致性 if err := global.GVA_DB.Model(&gaia.BatchWorkflow{}).Where("id = ?", batchWorkflowID).Updates(map[string]interface{}{ "status": gaia.BatchWorkflowStatusCompleted, "processed_rows": completedCount, // 同步已处理行数,确保数据一致性 "error": "", // 清空错误信息 "error_count": 0, // 重置错误计数,恢复用户并发位 "updated_at": time.Now(), }).Error; err != nil { global.GVA_LOG.Error(fmt.Sprintf("更新批量工作流完成状态失败: %s", err.Error())) } else { global.GVA_LOG.Info(fmt.Sprintf("批量工作流 %s 已完成,错误计数已重置,用户 %d 的并发位将恢复", batchWorkflowID, batchWorkflow.UserID)) } } else if pendingCount == 0 && queuedCount == 0 && runningCount == 0 && failedCount > 0 { // 如果没有待处理、排队或运行中的任务,但有失败的任务 // 获取第一个失败任务的错误信息作为代表 var failedTask gaia.BatchWorkflowTask var errorInfo = gaia.ErrorWorkflowFailed if err := global.GVA_DB.Where("batch_workflow_id = ? AND status = ?", batchWorkflowID, gaia.BatchTaskStatusFailed).First(&failedTask).Error; err == nil && failedTask.Error != "" { errorInfo = failedTask.Error } global.GVA_DB.Model(&gaia.BatchWorkflow{}).Where( "id = ?", batchWorkflowID).Updates(map[string]interface{}{ "status": gaia.BatchWorkflowStatusFailed, "error": errorInfo, "updated_at": time.Now(), }) } } // resetAbnormalTasks 重置异常状态的任务 func resetAbnormalTasks() { if global.GVA_DB == nil { global.GVA_LOG.Error("数据库连接为空,无法重置异常任务状态") return } global.GVA_LOG.Info("开始重置异常状态的任务...") // 首先清理已停止的批量工作流中的待处理和排队任务 cleanupStoppedBatchWorkflowTasks() // 重置 running 状态的任务为 pending runningResult := global.GVA_DB.Model(&gaia.BatchWorkflowTask{}). Where("status = ?", gaia.BatchTaskStatusRunning). Update("status", gaia.BatchTaskStatusPending) if runningResult.Error != nil { global.GVA_LOG.Error("重置running状态任务失败: " + runningResult.Error.Error()) } else if runningResult.RowsAffected > 0 { global.GVA_LOG.Info(fmt.Sprintf("重置了 %d 个running状态的任务为pending", runningResult.RowsAffected)) } // 重置 queued 状态的任务为 pending queuedResult := global.GVA_DB.Model(&gaia.BatchWorkflowTask{}). Where("status = ?", gaia.BatchTaskStatusQueued). Update("status", gaia.BatchTaskStatusPending) if queuedResult.Error != nil { global.GVA_LOG.Error("重置queued状态任务失败: " + queuedResult.Error.Error()) } else if queuedResult.RowsAffected > 0 { global.GVA_LOG.Info(fmt.Sprintf("重置了 %d 个queued状态的任务为pending", queuedResult.RowsAffected)) } // 重置相关批量工作流的状态 // 如果批量工作流状态为 processing 但没有 running 或 queued 的任务,将其重置为 pending var batchWorkflows []gaia.BatchWorkflow err := global.GVA_DB.Where("status = ?", gaia.BatchWorkflowStatusProcessing).Find(&batchWorkflows).Error if err != nil { global.GVA_LOG.Error("查询processing状态的批量工作流失败: " + err.Error()) return } for _, bw := range batchWorkflows { var runningCount int64 global.GVA_DB.Model(&gaia.BatchWorkflowTask{}). Where("batch_workflow_id = ? AND status IN (?)", bw.ID, []string{ gaia.BatchTaskStatusRunning, gaia.BatchTaskStatusQueued}).Count(&runningCount) // 如果没有正在运行或排队的任务,将批量工作流状态重置为 pending if runningCount == 0 { if err = global.GVA_DB.Model(&gaia.BatchWorkflow{}).Where("id = ?", bw.ID).Update( "status", gaia.BatchWorkflowStatusPending).Error; err != nil { global.GVA_LOG.Error(fmt.Sprintf("重置批量工作流 %s 状态失败: %s", bw.ID, err.Error())) } } } } // cleanupStoppedBatchWorkflowTasks 清理已停止的批量工作流中的待处理和排队任务 func cleanupStoppedBatchWorkflowTasks() { // 将已停止的批量工作流中的pending和queued任务标记为cancelled // 使用子查询方式避免JOIN在UPDATE中的别名问题 result := global.GVA_DB.Table("batch_workflow_tasks_extend").Where( "batch_workflow_id IN (?) AND status IN (?)", global.GVA_DB.Table("batch_workflows_extend").Select( "id").Where("status = ?", gaia.BatchWorkflowStatusStopped), []string{ gaia.BatchTaskStatusPending, gaia.BatchTaskStatusQueued}).Update( "status", gaia.BatchTaskStatusCancelled) if result.Error != nil { global.GVA_LOG.Error("清理已停止的批量工作流任务失败: " + result.Error.Error()) return } } // InitWorkerPool 初始化全局工作池 func InitWorkerPool(workers int) { if globalWorkerPool != nil { globalWorkerPool.Stop() } // 重置所有异常状态的任务 resetAbnormalTasks() globalWorkerPool = NewWorkerPool(workers) globalWorkerPool.Start() } // GetWorkerPool 获取全局工作池 func GetWorkerPool() *WorkerPool { return globalWorkerPool } // StopWorkerPool 停止全局工作池 func StopWorkerPool() { if globalWorkerPool != nil { globalWorkerPool.Stop() globalWorkerPool = nil } } // ResetBatchWorkflowErrorCount 重置指定批量工作流的错误计数 func ResetBatchWorkflowErrorCount(batchWorkflowID string) error { if global.GVA_DB == nil { return fmt.Errorf("数据库连接未初始化") } // 获取批量工作流信息 var batchWorkflow gaia.BatchWorkflow if err := global.GVA_DB.Where("id = ?", batchWorkflowID).First(&batchWorkflow).Error; err != nil { return fmt.Errorf("批量工作流不存在: %v", err) } // 重置错误计数 if err := global.GVA_DB.Model(&gaia.BatchWorkflow{}).Where("id = ?", batchWorkflowID).Updates(map[string]interface{}{ "error_count": 0, "updated_at": time.Now(), }).Error; err != nil { return fmt.Errorf("重置错误计数失败: %v", err) } global.GVA_LOG.Info(fmt.Sprintf("批量工作流 %s 的错误计数已手动重置,用户 %d 的并发位将恢复", batchWorkflowID, batchWorkflow.UserID)) return nil } // ResetUserErrorCount 重置指定用户所有批量工作流的错误计数 func ResetUserErrorCount(userID uint) error { if global.GVA_DB == nil { return fmt.Errorf("数据库连接未初始化") } // 重置该用户所有批量工作流的错误计数 result := global.GVA_DB.Model(&gaia.BatchWorkflow{}).Where("user_id = ?", userID).Updates(map[string]interface{}{ "error_count": 0, "updated_at": time.Now(), }) if result.Error != nil { return fmt.Errorf("重置用户错误计数失败: %v", result.Error) } global.GVA_LOG.Info(fmt.Sprintf("用户 %d 的所有批量工作流错误计数已重置,影响 %d 个工作流,并发位将恢复", userID, result.RowsAffected)) return nil } // handleInsufficientBalance 处理余额不足的情况,将用户所有pending和processing状态的工作流和任务设置为失败 // 特别处理同batch_workflow_id的所有任务 func (wp *WorkerPool) handleInsufficientBalance(userID uint, currentBatchWorkflowID string) { if global.GVA_DB == nil { global.GVA_LOG.Error("数据库连接未初始化,无法处理余额不足情况") return } // 优先处理当前batch_workflow_id的所有任务(包括processing状态) currentWorkflowResult := global.GVA_DB.Model(&gaia.BatchWorkflow{}). Where("id = ? AND status IN (?)", currentBatchWorkflowID, []string{gaia.BatchWorkflowStatusPending, gaia.BatchWorkflowStatusProcessing}). Updates(map[string]interface{}{ "status": gaia.BatchWorkflowStatusFailed, "error": gaia.ErrorInsufficientBalance, "updated_at": time.Now(), }) if currentWorkflowResult.Error != nil { global.GVA_LOG.Error(fmt.Sprintf("更新批量工作流 %s 状态失败: %s", currentBatchWorkflowID, currentWorkflowResult.Error.Error())) } else if currentWorkflowResult.RowsAffected > 0 { global.GVA_LOG.Info(fmt.Sprintf("已将批量工作流 %s 设置为失败状态", currentBatchWorkflowID)) } // 将当前batch_workflow_id的所有未完成任务设置为失败 currentTaskResult := global.GVA_DB.Table("batch_workflow_tasks_extend"). Where("batch_workflow_id = ? AND status IN (?)", currentBatchWorkflowID, []string{gaia.BatchTaskStatusPending, gaia.BatchTaskStatusQueued, gaia.BatchTaskStatusRunning}). Updates(map[string]interface{}{ "status": gaia.BatchTaskStatusFailed, "error": gaia.ErrorInsufficientBalance, "updated_at": time.Now(), }) if currentTaskResult.Error != nil { global.GVA_LOG.Error(fmt.Sprintf("更新批量工作流 %s 的任务状态失败: %s", currentBatchWorkflowID, currentTaskResult.Error.Error())) } else if currentTaskResult.RowsAffected > 0 { global.GVA_LOG.Info(fmt.Sprintf("已将批量工作流 %s 的 %d 个任务设置为失败状态", currentBatchWorkflowID, currentTaskResult.RowsAffected)) } // 将用户其他所有pending状态的批量工作流设置为失败 otherWorkflowResult := global.GVA_DB.Model(&gaia.BatchWorkflow{}). Where("user_id = ? AND id != ? AND status = ?", userID, currentBatchWorkflowID, gaia.BatchWorkflowStatusPending). Updates(map[string]interface{}{ "status": gaia.BatchWorkflowStatusFailed, "error": gaia.ErrorInsufficientBalance, "updated_at": time.Now(), }) if otherWorkflowResult.Error != nil { global.GVA_LOG.Error(fmt.Sprintf("更新用户 %d 其他pending批量工作流状态失败: %s", userID, otherWorkflowResult.Error.Error())) } else if otherWorkflowResult.RowsAffected > 0 { global.GVA_LOG.Info(fmt.Sprintf("已将用户 %d 的 %d 个其他pending批量工作流设置为失败状态", userID, otherWorkflowResult.RowsAffected)) } // 将用户其他所有pending状态的批量工作流任务设置为失败 otherTaskResult := global.GVA_DB.Table("batch_workflow_tasks_extend"). Where("batch_workflow_id IN (?) AND batch_workflow_id != ? AND status = ?", global.GVA_DB.Table("batch_workflows_extend").Select("id").Where("user_id = ?", userID), currentBatchWorkflowID, gaia.BatchTaskStatusPending). Updates(map[string]interface{}{ "status": gaia.BatchTaskStatusFailed, "error": gaia.ErrorInsufficientBalance, "updated_at": time.Now(), }) if otherTaskResult.Error != nil { global.GVA_LOG.Error(fmt.Sprintf("更新用户 %d 其他pending批量工作流任务状态失败: %s", userID, otherTaskResult.Error.Error())) } else if otherTaskResult.RowsAffected > 0 { global.GVA_LOG.Info(fmt.Sprintf("已将用户 %d 的 %d 个其他pending批量工作流任务设置为失败状态", userID, otherTaskResult.RowsAffected)) } } // GetBatchWorkflowList 获取最近30天的批量工作流列表 func (s *BatchWorkflowService) GetBatchWorkflowList(userID uint, installedID string, page, limit int) ([]gaia.BatchWorkflow, int64, error) { if global.GVA_DB == nil { return nil, 0, fmt.Errorf("数据库连接未初始化") } // 计算30天前的时间 thirtyDaysAgo := time.Now().AddDate(0, 0, -30) // 构建查询条件 query := global.GVA_DB.Model(&gaia.BatchWorkflow{}). Where("user_id = ? AND created_at >= ?", userID, thirtyDaysAgo) // 如果指定了installedID,则添加该条件 if installedID != "" { query = query.Where("installed_id = ?", installedID) } // 获取总数 var total int64 if err := query.Count(&total).Error; err != nil { return nil, 0, err } // 分页查询 var batchWorkflows []gaia.BatchWorkflow offset := (page - 1) * limit if err := query.Order("created_at DESC"). Limit(limit). Offset(offset). Find(&batchWorkflows).Error; err != nil { return nil, 0, err } // 解码错误信息中的 Unicode 转义序列 for i := range batchWorkflows { if batchWorkflows[i].Error != "" { batchWorkflows[i].Error = decodeUnicodeEscapes(batchWorkflows[i].Error) } } return batchWorkflows, total, nil }