From 8257113c509c264927eb8f8a3a05ce2c2341ecef Mon Sep 17 00:00:00 2001 From: npc0-hue Date: Fri, 27 Mar 2026 14:51:11 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E6=89=B9=E5=A4=84=E7=90=86=E4=BF=AE?= =?UTF-8?q?=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- admin/server/model/gaia/batch_workflow.go | 26 +- admin/server/service/gaia/batch_workflow.go | 25 ++ admin/server/service/gaia/worker_pool.go | 353 ++++++++------------ 3 files changed, 180 insertions(+), 224 deletions(-) diff --git a/admin/server/model/gaia/batch_workflow.go b/admin/server/model/gaia/batch_workflow.go index bae18093e..0b54ffba6 100644 --- a/admin/server/model/gaia/batch_workflow.go +++ b/admin/server/model/gaia/batch_workflow.go @@ -32,10 +32,32 @@ const ( // 批量工作流配置常量 const ( - MaxTaskRetryCount = 3 // 最大任务重试次数 - ErrorPenaltyThreshold = 50 // 错误惩罚阈值(每50个错误减少1个并发位) + MaxTaskRetryCount = 3 // 最大任务重试次数 + ErrorPenaltyThreshold = 50 // 错误惩罚阈值(每50个错误减少1个并发位) ) +// 优先级分档阈值(按 total_rows 划分) +const ( + PriorityTier1MaxRows = 300 // 第一优先级:≤300 行 + PriorityTier2MaxRows = 800 // 第二优先级:≤800 行 + PriorityTier3MaxRows = 3000 // 第三优先级:≤3000 行 + // 第四优先级:>3000 行 +) + +// UserWorkerAllocation 用户工作器分配信息 +type UserWorkerAllocation struct { + UserID uint `json:"user_id"` + Workers int `json:"workers"` + MaxLimit int `json:"max_limit"` +} + +// UserErrorInfo 用户错误信息(用于工作器分配计算) +type UserErrorInfo struct { + UserID uint `json:"user_id"` + ErrorCount int `json:"error_count"` + TotalRows int `json:"total_rows"` +} + // BatchWorkflow 批量工作流处理 type BatchWorkflow struct { ID string `json:"id" gorm:"primaryKey;comment:批量处理ID"` diff --git a/admin/server/service/gaia/batch_workflow.go b/admin/server/service/gaia/batch_workflow.go index 96688c7ab..cca7e50fa 100644 --- a/admin/server/service/gaia/batch_workflow.go +++ b/admin/server/service/gaia/batch_workflow.go @@ -28,6 +28,31 @@ func (s *BatchWorkflowService) CreateBatchWorkflow( return nil, fmt.Errorf("数据库连接未初始化") } + // 计算本次上传的有效数据行数(去掉表头和空行) + uploadedDataRows := 0 + if len(fileContent) > 1 { + for _, row := range fileContent[1:] { + for _, v := range row { + if strings.TrimSpace(v) != "" { + uploadedDataRows++ + break + } + } + } + } + + // 检查当前用户 pending 状态队列中是否已有相同文件(文件名 + 行数一致视为重复) + var duplicateCount int64 + if err := global.GVA_DB.Model(&gaia.BatchWorkflow{}).Where( + "user_id = ? AND file_name = ? AND installed_id = ? AND total_rows = ? AND status = ?", + userId, fileName, installedID, uploadedDataRows, gaia.BatchWorkflowStatusPending). + Count(&duplicateCount).Error; err != nil { + return nil, fmt.Errorf("检查重复文件失败: %v", err) + } + if duplicateCount > 0 { + return nil, fmt.Errorf("文件重复上传:当前队列中已存在相同文件(文件名:%s,行数:%d),请勿重复提交", fileName, uploadedDataRows) + } + // 创建批量处理记录 keyByte, _ := json.Marshal(keyNameMapping) batchWorkflow := &gaia.BatchWorkflow{ diff --git a/admin/server/service/gaia/worker_pool.go b/admin/server/service/gaia/worker_pool.go index 1df0b8e4e..5e973162f 100644 --- a/admin/server/service/gaia/worker_pool.go +++ b/admin/server/service/gaia/worker_pool.go @@ -15,13 +15,6 @@ import ( "github.com/flipped-aurora/gin-vue-admin/server/model/gaia" ) -// UserWorkerAllocation 用户工作器分配信息 -type UserWorkerAllocation struct { - UserID uint `json:"user_id"` - Workers int `json:"workers"` - MaxLimit int `json:"max_limit"` -} - // 全局工作池实例 var globalWorkerPool *WorkerPool @@ -29,10 +22,10 @@ var globalWorkerPool *WorkerPool type WorkerPool struct { ctx context.Context cancel context.CancelFunc - totalWorkers int // 总工作器数量 - userWorkers map[uint]*UserWorkerAllocation // 每个用户的工作器分配 - userTaskChan map[uint]chan *gaia.BatchWorkflowTask // 每个用户的任务队列 - runningWorkers map[uint]int // 每个用户当前运行的worker数量 + totalWorkers int // 总工作器数量 + userWorkers map[uint]*gaia.UserWorkerAllocation // 每个用户的工作器分配 + userTaskChan map[uint]chan *gaia.BatchWorkflowTask // 每个用户的任务队列 + runningWorkers map[uint]int // 每个用户当前运行的worker数量 wg sync.WaitGroup batchService *BatchWorkflowService running bool @@ -47,7 +40,7 @@ func NewWorkerPool(totalWorkers int) *WorkerPool { ctx: ctx, cancel: cancel, totalWorkers: totalWorkers, - userWorkers: make(map[uint]*UserWorkerAllocation), + userWorkers: make(map[uint]*gaia.UserWorkerAllocation), userTaskChan: make(map[uint]chan *gaia.BatchWorkflowTask), runningWorkers: make(map[uint]int), batchService: &BatchWorkflowService{}, @@ -101,15 +94,13 @@ func (wp *WorkerPool) calculateUserWorkerAllocation() { return } - // 第二个查询:获取这些用户的所有批量工作流的累计错误次数(不限状态) - type UserErrorInfo struct { - UserID uint `json:"user_id"` - ErrorCount int `json:"error_count"` - } - var userErrorInfos []UserErrorInfo + // 第二个查询:获取这些用户的累计错误次数和最小 total_rows(pending 状态工作流) + var userErrorInfos []gaia.UserErrorInfo if len(activeUserIDs) > 0 { err = global.GVA_DB.Raw(` - SELECT bw.user_id, COALESCE(SUM(bw.error_count), 0) as error_count + SELECT bw.user_id, + COALESCE(SUM(bw.error_count), 0) AS error_count, + COALESCE(SUM(bw.total_rows), 0) AS total_rows FROM batch_workflows_extend bw WHERE bw.user_id IN (?) AND bw.status='pending' GROUP BY bw.user_id @@ -121,10 +112,12 @@ func (wp *WorkerPool) calculateUserWorkerAllocation() { } } - // 提取活跃用户ID列表和错误次数映射 + // 提取活跃用户ID列表、错误次数映射和 total_rows 映射 userErrorMap := make(map[uint]int) + userTotalRowsMap := make(map[uint]int) for _, info := range userErrorInfos { userErrorMap[info.UserID] = info.ErrorCount + userTotalRowsMap[info.UserID] = info.TotalRows } userCount := len(activeUserIDs) @@ -133,7 +126,7 @@ func (wp *WorkerPool) calculateUserWorkerAllocation() { for _, ch := range wp.userTaskChan { close(ch) } - wp.userWorkers = make(map[uint]*UserWorkerAllocation) + wp.userWorkers = make(map[uint]*gaia.UserWorkerAllocation) wp.userTaskChan = make(map[uint]chan *gaia.BatchWorkflowTask) wp.runningWorkers = make(map[uint]int) return @@ -155,226 +148,142 @@ func (wp *WorkerPool) calculateUserWorkerAllocation() { } } - // 检查用户数量是否超过了最大支持数量(每用户最少1个工作器) - maxSupportedUsers := wp.totalWorkers / 1 - // 存储新的分配计算结果 - newAllocations := make(map[uint]*UserWorkerAllocation) + newAllocations := make(map[uint]*gaia.UserWorkerAllocation) - if userCount <= maxSupportedUsers { - // 用户数量在可支持范围内,采用两阶段分配策略 - baseAllocation := wp.totalWorkers / userCount - remainder := wp.totalWorkers % userCount - - // 第一阶段:计算每个用户的基础分配和错误惩罚后的实际分配 - type UserAllocationInfo struct { - UserID uint - BaseWorkers int - ActualWorkers int - ErrorCount int - PenaltyReduced int + // 计算每个用户的优先级权重(total_rows 越小权重越大) + // 权重:tier1=4, tier2=3, tier3=2, tier4=1 + priorityWeightOf := func(totalRows int) int { + switch { + case totalRows <= gaia.PriorityTier1MaxRows: + return 4 + case totalRows <= gaia.PriorityTier2MaxRows: + return 3 + case totalRows <= gaia.PriorityTier3MaxRows: + return 2 + default: + return 1 } + } - var userAllocations []UserAllocationInfo - totalPenaltyReduced := 0 + // 计算所有用户的权重总和,用于按比例分配 worker + totalWeight := 0 + for _, userID := range activeUserIDs { + totalWeight += priorityWeightOf(userTotalRowsMap[userID]) + } + if totalWeight == 0 { + totalWeight = userCount + } - for i, userID := range activeUserIDs { - baseWorkers := baseAllocation - // 处理余数,前几个用户多分配一个 - if i < remainder { - baseWorkers++ - } + // UserAllocationInfo 单用户分配中间计算结构 + type UserAllocationInfo struct { + UserID uint + BaseWorkers int + ActualWorkers int + ErrorCount int + PenaltyReduced int + } - // 确保每个用户至少有1个并发位 - if baseWorkers < 1 { - baseWorkers = 1 - } - - // 应用错误惩罚:根据用户的累计错误次数减少并发位 - errorCount := userErrorMap[userID] - actualWorkers := wp.calculateWorkerCountWithErrorPenalty(baseWorkers, errorCount) - penaltyReduced := baseWorkers - actualWorkers - totalPenaltyReduced += penaltyReduced - userAllocations = append(userAllocations, UserAllocationInfo{ - UserID: userID, - BaseWorkers: baseWorkers, - ActualWorkers: actualWorkers, - ErrorCount: errorCount, - PenaltyReduced: penaltyReduced, - }) + // 按权重比例为每个用户分配基础 worker 数,余数补给权重最高的用户 + userAllocations := make([]UserAllocationInfo, 0, userCount) + allocatedBase := 0 + for _, userID := range activeUserIDs { + w := priorityWeightOf(userTotalRowsMap[userID]) + base := wp.totalWorkers * w / totalWeight + if base < 1 { + base = 1 } + userAllocations = append(userAllocations, UserAllocationInfo{ + UserID: userID, + BaseWorkers: base, + ErrorCount: userErrorMap[userID], + }) + allocatedBase += base + } + // 将剩余 worker 补给权重最大的用户(已按 activeUserIDs 顺序,这里找最大权重的) + remainder := wp.totalWorkers - allocatedBase + if remainder > 0 { + // 找权重最大的用户索引 + maxW, maxIdx := 0, 0 + for i, alloc := range userAllocations { + w := priorityWeightOf(userTotalRowsMap[alloc.UserID]) + if w > maxW { + maxW, maxIdx = w, i + } + } + userAllocations[maxIdx].BaseWorkers += remainder + } - // 第二阶段:将空出来的并发位重新分配给错误较少的用户 - if totalPenaltyReduced > 0 { - // 按错误数量排序,错误少的用户优先获得额外分配 - for i := 0; i < len(userAllocations)-1; i++ { - for j := i + 1; j < len(userAllocations); j++ { - if userAllocations[i].ErrorCount > userAllocations[j].ErrorCount { - userAllocations[i], userAllocations[j] = userAllocations[j], userAllocations[i] + // 应用错误惩罚并收集被释放的 worker + totalPenaltyReduced := 0 + for i := range userAllocations { + actual := wp.calculateWorkerCountWithErrorPenalty( + userAllocations[i].BaseWorkers, userAllocations[i].ErrorCount) + userAllocations[i].PenaltyReduced = userAllocations[i].BaseWorkers - actual + userAllocations[i].ActualWorkers = actual + totalPenaltyReduced += userAllocations[i].PenaltyReduced + } + + // 将惩罚释放的 worker 重新分配给无惩罚用户(按权重优先) + if totalPenaltyReduced > 0 { + // 按权重降序、错误数升序排序,优先分配给高优先级无惩罚用户 + for i := 0; i < len(userAllocations)-1; i++ { + for j := i + 1; j < len(userAllocations); j++ { + wi := priorityWeightOf(userTotalRowsMap[userAllocations[i].UserID]) + wj := priorityWeightOf(userTotalRowsMap[userAllocations[j].UserID]) + if wi < wj || (wi == wj && userAllocations[i].ErrorCount > userAllocations[j].ErrorCount) { + userAllocations[i], userAllocations[j] = userAllocations[j], userAllocations[i] + } + } + } + remainingToDistribute := totalPenaltyReduced + eligibleUsers := 0 + for _, a := range userAllocations { + if a.PenaltyReduced == 0 { + eligibleUsers++ + } + } + if eligibleUsers > 0 { + for i := 0; i < len(userAllocations) && remainingToDistribute > 0; i++ { + if userAllocations[i].PenaltyReduced == 0 { + extra := remainingToDistribute / eligibleUsers + if extra < 1 { + extra = 1 } - } - } - - // 只为没有被惩罚的用户(PenaltyReduced = 0)重新分配空闲的并发位 - // 被惩罚的用户不应该获得额外分配 - remainingToDistribute := totalPenaltyReduced - eligibleUsers := 0 - - // 计算有资格获得额外分配的用户数量(没有被惩罚的用户) - for _, allocation := range userAllocations { - if allocation.PenaltyReduced == 0 { - eligibleUsers++ - } - } - - if eligibleUsers > 0 { - // 只为没有被惩罚的用户分配额外的并发位 - for i := 0; i < len(userAllocations) && remainingToDistribute > 0; i++ { - if userAllocations[i].PenaltyReduced == 0 { - // 为没有错误惩罚的用户分配额外的并发位 - extraWorkers := remainingToDistribute / eligibleUsers - if extraWorkers < 1 { - extraWorkers = 1 - } - if extraWorkers > remainingToDistribute { - extraWorkers = remainingToDistribute - } - - userAllocations[i].ActualWorkers += extraWorkers - remainingToDistribute -= extraWorkers - eligibleUsers-- + if extra > remainingToDistribute { + extra = remainingToDistribute } + userAllocations[i].ActualWorkers += extra + remainingToDistribute -= extra + eligibleUsers-- } } } + } - // 创建最终分配结果 - totalFinalWorkers := 0 - for _, allocation := range userAllocations { - newAllocations[allocation.UserID] = &UserWorkerAllocation{ + // 写入最终分配,超出总量时截断(降级场景) + allocatedWorkers := 0 + for _, allocation := range userAllocations { + workers := allocation.ActualWorkers + remaining := wp.totalWorkers - allocatedWorkers + if workers > remaining { + workers = remaining + } + if workers > 0 { + newAllocations[allocation.UserID] = &gaia.UserWorkerAllocation{ UserID: allocation.UserID, - Workers: allocation.ActualWorkers, + Workers: workers, MaxLimit: wp.totalWorkers, } - totalFinalWorkers += allocation.ActualWorkers + allocatedWorkers += workers } - } else { - // 用户数量超过最大支持数量,采用降级分配策略(两阶段分配) - baseAllocation := wp.totalWorkers / userCount - remainder := wp.totalWorkers % userCount - - // 第一阶段:计算每个用户的基础分配和错误惩罚后的实际分配 - type UserAllocationInfo struct { - UserID uint - BaseWorkers int - ActualWorkers int - ErrorCount int - PenaltyReduced int + if allocatedWorkers >= wp.totalWorkers { + break } - - var userAllocations []UserAllocationInfo - totalPenaltyReduced := 0 - - for i, userID := range activeUserIDs { - baseWorkers := baseAllocation - // 处理余数,前几个用户多分配一个 - if i < remainder { - baseWorkers++ - } - - // 确保至少分配1个工作器 - if baseWorkers < 1 { - baseWorkers = 1 - } - - // 应用错误惩罚:根据用户的累计错误次数减少并发位 - errorCount := userErrorMap[userID] - actualWorkers := wp.calculateWorkerCountWithErrorPenalty(baseWorkers, errorCount) - penaltyReduced := baseWorkers - actualWorkers - totalPenaltyReduced += penaltyReduced - - // 添加详细的错误惩罚计算调试日志 - userAllocations = append(userAllocations, UserAllocationInfo{ - UserID: userID, - BaseWorkers: baseWorkers, - ActualWorkers: actualWorkers, - ErrorCount: errorCount, - PenaltyReduced: penaltyReduced, - }) - } - - // 第二阶段:将空出来的并发位重新分配给错误较少的用户 - if totalPenaltyReduced > 0 { - // 按错误数量排序,错误少的用户优先获得额外分配 - for i := 0; i < len(userAllocations)-1; i++ { - for j := i + 1; j < len(userAllocations); j++ { - if userAllocations[i].ErrorCount > userAllocations[j].ErrorCount { - userAllocations[i], userAllocations[j] = userAllocations[j], userAllocations[i] - } - } - } - - // 只为没有被惩罚的用户(PenaltyReduced = 0)重新分配空闲的并发位 - // 被惩罚的用户不应该获得额外分配 - remainingToDistribute := totalPenaltyReduced - eligibleUsers := 0 - - // 计算有资格获得额外分配的用户数量(没有被惩罚的用户) - for _, allocation := range userAllocations { - if allocation.PenaltyReduced == 0 { - eligibleUsers++ - } - } - - if eligibleUsers > 0 { - // 只为没有被惩罚的用户分配额外的并发位 - for i := 0; i < len(userAllocations) && remainingToDistribute > 0; i++ { - if userAllocations[i].PenaltyReduced == 0 { - // 为没有错误惩罚的用户分配额外的并发位 - extraWorkers := remainingToDistribute / eligibleUsers - if extraWorkers < 1 { - extraWorkers = 1 - } - if extraWorkers > remainingToDistribute { - extraWorkers = remainingToDistribute - } - - userAllocations[i].ActualWorkers += extraWorkers - remainingToDistribute -= extraWorkers - eligibleUsers-- - } - } - } - } - - // 创建最终分配结果,确保不超过总工作器数量 - allocatedWorkers := 0 - for _, allocation := range userAllocations { - workers := allocation.ActualWorkers - - // 确保不会超过剩余的工作器数量 - remainingWorkers := wp.totalWorkers - allocatedWorkers - if workers > remainingWorkers { - workers = remainingWorkers - } - - if workers > 0 { - newAllocations[allocation.UserID] = &UserWorkerAllocation{ - UserID: allocation.UserID, - Workers: workers, - MaxLimit: wp.totalWorkers, - } - allocatedWorkers += workers - } - - // 如果工作器已经分配完毕,剩余用户分配0个工作器 - if allocatedWorkers >= wp.totalWorkers { - break - } - } - - global.GVA_LOG.Warn(fmt.Sprintf("降级分配完成 - 总工作器: %d, 用户数: %d, 已分配: %d, 重新分配: %d, 平均每用户: %.1f个", - wp.totalWorkers, userCount, allocatedWorkers, totalPenaltyReduced, float64(allocatedWorkers)/float64(userCount))) + } + if userCount > wp.totalWorkers { + global.GVA_LOG.Warn(fmt.Sprintf("降级分配完成 - 总工作器: %d, 用户数: %d, 已分配: %d, 平均每用户: %.1f个", + wp.totalWorkers, userCount, allocatedWorkers, float64(allocatedWorkers)/float64(userCount))) } // 应用新的分配,只更新有变化的用户