Files
dify-plus/admin/server/api/v1/gaia/workflow.go
T

670 lines
20 KiB
Go

package gaia
import (
"bytes"
"encoding/csv"
"encoding/json"
"fmt"
"github.com/flipped-aurora/gin-vue-admin/server/global"
"github.com/flipped-aurora/gin-vue-admin/server/model/gaia"
"github.com/flipped-aurora/gin-vue-admin/server/model/gaia/request"
"github.com/flipped-aurora/gin-vue-admin/server/utils"
"golang.org/x/text/encoding/simplifiedchinese"
"golang.org/x/text/transform"
"io"
"net/http"
"strconv"
"strings"
"github.com/flipped-aurora/gin-vue-admin/server/model/common/response"
"github.com/flipped-aurora/gin-vue-admin/server/service"
gaiaService "github.com/flipped-aurora/gin-vue-admin/server/service/gaia"
"github.com/gin-gonic/gin"
)
type BatchWorkflowApi struct{}
var batchWorkflowService = service.ServiceGroupApp.GaiaServiceGroup.BatchWorkflowService
// CreateBatchWorkflow 创建批量处理工作流
// @Tags BatchWorkflow
// @Summary 创建批量处理工作流
// @Description 上传CSV文件并创建批量处理工作流
// @Accept multipart/form-data
// @Produce application/json
// @Param file formData file true "CSV文件"
// @Param installed_id formData string true "安装的应用ID"
// @Param app_id formData string true "应用ID"
// @Param tenant_id formData string true "租户ID"
// @Success 200 {object} response.Response{data=gaia.BatchWorkflow} "成功"
// @Router /gaia/workflow/batch/processing [post]
func (api *BatchWorkflowApi) CreateBatchWorkflow(c *gin.Context) {
// 获取表单参数
userID := utils.GetUserID(c)
installedID := c.PostForm("installed_id")
keyNameMappingStr := c.PostForm("key_name_mapping")
if installedID == "" {
response.FailWithMessage("缺少必要参数", c)
return
}
// 解析key-name映射
var keyNameMapping map[string]string
if keyNameMappingStr != "" {
if err := json.Unmarshal([]byte(keyNameMappingStr), &keyNameMapping); err != nil {
response.FailWithMessage("解析key_name_mapping失败: "+err.Error(), c)
return
}
}
// 获取上传的文件
file, err := c.FormFile("file")
if err != nil {
response.FailWithMessage("获取文件失败: "+err.Error(), c)
return
}
// 打开上传的文件
src, err := file.Open()
if err != nil {
response.FailWithMessage("打开文件失败: "+err.Error(), c)
return
}
defer src.Close()
// 读取文件内容并检测编码
content, err := io.ReadAll(src)
if err != nil {
response.FailWithMessage("读取文件内容失败: "+err.Error(), c)
return
}
// 尝试不同编码解析CSV
var data [][]string
var parseErr error
// 1. 先尝试UTF-8读取,使用宽松的CSV解析器配置
reader := bytes.NewReader(content)
csvReader := csv.NewReader(reader)
csvReader.LazyQuotes = true // 允许懒惰引号
csvReader.TrimLeadingSpace = true // 去除前导空格
data, parseErr = csvReader.ReadAll()
// 2. 如果UTF-8失败或包含乱码,尝试GBK编码
if parseErr != nil || containsGarbledText(data) {
decoder := simplifiedchinese.GBK.NewDecoder()
gbkReader := transform.NewReader(bytes.NewReader(content), decoder)
csvReader = csv.NewReader(gbkReader)
csvReader.LazyQuotes = true // 允许懒惰引号
csvReader.TrimLeadingSpace = true // 去除前导空格
data, parseErr = csvReader.ReadAll()
// 3. 如果GBK也失败,尝试GB18030编码
if parseErr != nil || containsGarbledText(data) {
gb18030Decoder := simplifiedchinese.GB18030.NewDecoder()
gb18030Reader := transform.NewReader(bytes.NewReader(content), gb18030Decoder)
csvReader = csv.NewReader(gb18030Reader)
csvReader.LazyQuotes = true // 允许懒惰引号
csvReader.TrimLeadingSpace = true // 去除前导空格
data, parseErr = csvReader.ReadAll()
}
}
// 4. 如果以上方法都失败,尝试最后的兜底解析方法
if parseErr != nil {
data, parseErr = parseCSVWithFallback(content)
}
if parseErr != nil {
response.FailWithMessage("解析CSV文件失败,请检查文件格式。错误详情: "+parseErr.Error(), c)
return
}
// 创建批量处理工作流
batchWorkflow, err := batchWorkflowService.CreateBatchWorkflow(
userID, installedID, file.Filename, data, keyNameMapping)
if err != nil {
// 特别处理数据库连接问题
if strings.Contains(err.Error(), "数据库连接未初始化") {
response.FailWithMessage("系统初始化中,请稍后重试", c)
} else {
response.FailWithMessage("创建批量处理失败: "+err.Error(), c)
}
return
}
response.OkWithData(batchWorkflow, c)
}
// GetBatchWorkflow 获取批量处理信息
// @Tags BatchWorkflow
// @Summary 获取批量处理信息
// @Description 根据ID获取批量处理信息
// @Produce application/json
// @Param id path string true "批量处理ID"
// @Success 200 {object} response.Response{data=gaia.BatchWorkflow} "成功"
// @Router /gaia/workflow/batch/{id} [get]
func (api *BatchWorkflowApi) GetBatchWorkflow(c *gin.Context) {
id := c.Param("id")
if id == "" {
response.FailWithMessage("缺少批量处理ID", c)
return
}
batchWorkflow, err := batchWorkflowService.GetBatchWorkflow(id)
if err != nil {
response.FailWithMessage("获取批量处理信息失败: "+err.Error(), c)
return
}
response.OkWithData(batchWorkflow, c)
}
// GetBatchWorkflowTasks 获取批量处理任务列表
// @Tags BatchWorkflow
// @Summary 获取批量处理任务列表
// @Description 根据批量处理ID获取任务列表
// @Produce application/json
// @Param id path string true "批量处理ID"
// @Success 200 {object} response.Response{data=[]gaia.BatchWorkflowTask} "成功"
// @Router /gaia/workflow/batch/{id}/tasks [get]
func (api *BatchWorkflowApi) GetBatchWorkflowTasks(c *gin.Context) {
id := c.Param("id")
if id == "" {
response.FailWithMessage("缺少批量处理ID", c)
return
}
tasks, err := batchWorkflowService.GetBatchWorkflowTasks(id)
if err != nil {
response.FailWithMessage("获取任务列表失败: "+err.Error(), c)
return
}
response.OkWithData(tasks, c)
}
// GetBatchWorkflowProgress 获取批量处理进度
// @Tags BatchWorkflow
// @Summary 获取批量处理进度
// @Description 根据ID获取批量处理进度信息
// @Produce application/json
// @Param id path string true "批量处理ID"
// @Success 200 {object} response.Response{data=map[string]interface{}} "成功"
// @Router /gaia/workflow/batch/{id}/progress [get]
func (api *BatchWorkflowApi) GetBatchWorkflowProgress(c *gin.Context) {
id := c.Param("id")
if id == "" {
response.FailWithMessage("缺少批量处理ID", c)
return
}
progress, err := batchWorkflowService.GetBatchWorkflowProgress(id)
if err != nil {
response.FailWithMessage("获取进度信息失败: "+err.Error(), c)
return
}
response.OkWithData(progress, c)
}
// StopBatchWorkflow 停止批量处理
// @Tags BatchWorkflow
// @Summary 停止批量处理
// @Description 根据ID停止批量处理
// @Produce application/json
// @Param id path string true "批量处理ID"
// @Success 200 {object} response.Response "成功"
// @Router /gaia/workflow/batch/{id}/stop [post]
func (api *BatchWorkflowApi) StopBatchWorkflow(c *gin.Context) {
id := c.Param("id")
if id == "" {
response.FailWithMessage("缺少批量处理ID", c)
return
}
err := batchWorkflowService.StopBatchWorkflow(id)
if err != nil {
response.FailWithMessage("停止批量处理失败: "+err.Error(), c)
return
}
response.OkWithMessage("停止成功", c)
}
// RetryBatchWorkflow 重试批量处理(重新开始所有任务)
// @Tags BatchWorkflow
// @Summary 重试批量处理
// @Description 根据ID重试批量处理,重置所有任务从头开始
// @Produce application/json
// @Param id path string true "批量处理ID"
// @Success 200 {object} response.Response "成功"
// @Router /gaia/workflow/batch/{id}/retry [post]
func (api *BatchWorkflowApi) RetryBatchWorkflow(c *gin.Context) {
id := c.Param("id")
if id == "" {
response.FailWithMessage("缺少批量处理ID", c)
return
}
err := batchWorkflowService.RetryBatchWorkflow(id)
if err != nil {
response.FailWithMessage("重试批量处理失败: "+err.Error(), c)
return
}
response.OkWithMessage("重试成功,所有任务已重置", c)
}
// RetryFailedTasks 仅重试失败的任务
// @Tags BatchWorkflow
// @Summary 仅重试失败的任务
// @Description 根据ID仅重试失败的任务,保留已完成的任务
// @Produce application/json
// @Param id path string true "批量处理ID"
// @Success 200 {object} response.Response "成功"
// @Router /gaia/workflow/batch/{id}/retry-failed [post]
func (api *BatchWorkflowApi) RetryFailedTasks(c *gin.Context) {
id := c.Param("id")
if id == "" {
response.FailWithMessage("缺少批量处理ID", c)
return
}
err := batchWorkflowService.RetryFailedTasks(id)
if err != nil {
response.FailWithMessage("重试失败任务失败: "+err.Error(), c)
return
}
response.OkWithMessage("失败任务重试成功", c)
}
// ResumeBatchWorkflow 恢复批量处理
// @Tags BatchWorkflow
// @Summary 恢复批量处理
// @Description 根据ID恢复批量处理
// @Produce application/json
// @Param id path string true "批量处理ID"
// @Success 200 {object} response.Response "成功"
// @Router /gaia/workflow/batch/{id}/resume [post]
func (api *BatchWorkflowApi) ResumeBatchWorkflow(c *gin.Context) {
id := c.Param("id")
if id == "" {
response.FailWithMessage("缺少批量处理ID", c)
return
}
err := batchWorkflowService.ResumeBatchWorkflow(id)
if err != nil {
response.FailWithMessage("恢复批量处理失败: "+err.Error(), c)
return
}
response.OkWithMessage("恢复成功", c)
}
// ResetBatchWorkflowErrorCount 重置批量工作流错误计数
// @Tags BatchWorkflow
// @Summary 重置批量工作流错误计数
// @Description 重置指定批量工作流的错误计数,恢复用户并发位
// @Produce application/json
// @Param id path string true "批量处理ID"
// @Success 200 {object} response.Response "成功"
// @Router /gaia/workflow/batch/{id}/reset-error-count [post]
func (api *BatchWorkflowApi) ResetBatchWorkflowErrorCount(c *gin.Context) {
id := c.Param("id")
if id == "" {
response.FailWithMessage("缺少批量处理ID", c)
return
}
// 调用worker_pool中的重置函数
err := gaiaService.ResetBatchWorkflowErrorCount(id)
if err != nil {
response.FailWithMessage("重置错误计数失败: "+err.Error(), c)
return
}
response.OkWithMessage("错误计数已重置,用户并发位将恢复", c)
}
// ResetUserErrorCount 重置用户所有批量工作流错误计数
// @Tags BatchWorkflow
// @Summary 重置用户所有批量工作流错误计数
// @Description 重置指定用户所有批量工作流的错误计数,恢复用户并发位
// @Produce application/json
// @Success 200 {object} response.Response "成功"
// @Router /gaia/workflow/batch/reset-user-error-count [post]
func (api *BatchWorkflowApi) ResetUserErrorCount(c *gin.Context) {
userID := utils.GetUserID(c)
// 调用worker_pool中的重置函数
err := gaiaService.ResetUserErrorCount(userID)
if err != nil {
response.FailWithMessage("重置用户错误计数失败: "+err.Error(), c)
return
}
response.OkWithMessage("用户所有批量工作流错误计数已重置,并发位将恢复", c)
}
// DownloadBatchWorkflowResults 下载批量处理结果
// @Tags BatchWorkflow
// @Summary 下载批量处理结果
// @Description 根据ID下载批量处理结果
// @Produce text/csv
// @Param id path string true "批量处理ID"
// @Success 200 {file} file "CSV文件"
// @Router /gaia/workflow/batch/{id}/download [get]
func (api *BatchWorkflowApi) DownloadBatchWorkflowResults(c *gin.Context) {
id := c.Param("id")
if id == "" {
response.FailWithMessage("缺少批量处理ID", c)
return
}
// 获取批量处理信息
flow, err := batchWorkflowService.GetBatchWorkflow(id)
if err != nil {
response.FailWithMessage("获取批量处理信息失败: "+err.Error(), c)
return
}
// 获取任务列表
tasks, err := batchWorkflowService.GetBatchWorkflowTasks(id)
if err != nil {
response.FailWithMessage("获取任务列表失败: "+err.Error(), c)
return
}
// 生成CSV内容
csvContent := generateCSVFromTasks(flow, tasks)
csvBytes := []byte(csvContent)
// 添加 UTF-8 BOM 以确保在 Excel 中正确显示中文
bom := []byte{0xEF, 0xBB, 0xBF}
fullContent := append(bom, csvBytes...)
// 设置响应头
filename := fmt.Sprintf("batch_results_%s.csv", id)
c.Header("Content-Type", "text/csv; charset=utf-8")
c.Header("Content-Disposition", fmt.Sprintf("attachment; filename*=UTF-8''%s", filename))
c.Header("Content-Length", fmt.Sprintf("%d", len(fullContent)))
c.Header("Cache-Control", "no-cache, no-store, must-revalidate")
c.Header("Pragma", "no-cache")
c.Header("Expires", "0")
c.Data(http.StatusOK, "text/csv; charset=utf-8", fullContent)
}
// parseCSVWithFallback 兜底的CSV解析方法,用于处理格式不规范的CSV文件
func parseCSVWithFallback(content []byte) ([][]string, error) {
// 将内容转换为字符串并按行分割
contentStr := string(content)
lines := strings.Split(contentStr, "\n")
var result [][]string
for i, line := range lines {
// 跳过空行
if strings.TrimSpace(line) == "" {
continue
}
// 尝试简单的逗号分割
fields := strings.Split(line, ",")
// 清理字段:去除多余的引号和空格
for j, field := range fields {
field = strings.TrimSpace(field)
// 如果字段被引号包围,去除引号
if len(field) >= 2 && field[0] == '"' && field[len(field)-1] == '"' {
field = field[1 : len(field)-1]
// 处理转义的引号
field = strings.ReplaceAll(field, `""`, `"`)
}
fields[j] = field
}
result = append(result, fields)
// 如果解析失败超过100行,停止解析
if i > 100 && len(result) == 0 {
return nil, fmt.Errorf("无法解析CSV文件:格式不正确")
}
}
if len(result) == 0 {
return nil, fmt.Errorf("CSV文件为空或格式无法识别")
}
return result, nil
}
// containsGarbledText 检测是否包含乱码文本
func containsGarbledText(data [][]string) bool {
// 检查前几行是否包含类似乱码的字符
checkRows := 3
if len(data) < checkRows {
checkRows = len(data)
}
for i := 0; i < checkRows; i++ {
for _, cell := range data[i] {
// 检查是否包含典型的编码错误字符
for _, char := range cell {
// 检查是否为替换字符(U+FFFD)或其他异常字符
if char == '' {
return true
}
}
// 检查特定的GBK乱码模式
if strings.Contains(cell, "") || strings.Contains(cell, "Ŀ") {
return true
}
}
}
return false
}
// generateCSVFromTasks 从任务生成CSV内容
func generateCSVFromTasks(flow *gaia.BatchWorkflow, tasks []gaia.BatchWorkflowTask) string {
if len(tasks) == 0 {
return ""
}
// 解析第一个任务的输入参数来获取列名
var firstTaskInputs map[string]string
if err := json.Unmarshal([]byte(tasks[0].Inputs), &firstTaskInputs); err != nil {
return ""
}
buf := &bytes.Buffer{}
w := csv.NewWriter(buf)
// 标题:输入列 + 处理结果 + 状态
var nameList []string
var keyMap map[string]string
_ = json.Unmarshal([]byte(flow.KeyName), &keyMap)
headers := make([]string, 0, len(keyMap))
for key, value := range keyMap {
headers = append(headers, key)
nameList = append(nameList, value)
}
headers = append(headers, "生成结果")
headers = append(headers, "报错信息")
_ = w.Write(headers)
// 行数据
for _, task := range tasks {
var inputs map[string]string
if err := json.Unmarshal([]byte(task.Inputs), &inputs); err != nil {
continue
}
var text string
row := make([]string, 0, len(headers))
var result request.WorkflowBatchProcessing
for _, value := range nameList {
row = append(row, inputs[value])
}
if err := json.Unmarshal([]byte(task.Result), &result); err == nil {
for key, v := range result.Outputs {
if key == "task_id" {
continue
}
switch vv := v.(type) {
case string:
text += fmt.Sprintf("%s\r", vv)
case float64:
text += fmt.Sprintf("%s\r", strconv.FormatFloat(vv, 'f', -1, 64))
case int64:
text += fmt.Sprintf("%d\r", vv)
}
}
}
row = append(row, text)
if len(task.Error) > 0 {
row = append(row, task.Error)
}
_ = w.Write(row)
}
w.Flush()
return buf.String()
}
// GetWorkerPoolStatus 获取工作池状态
// @Tags BatchWorkflow
// @Summary 获取工作池状态
// @Description 获取当前工作池的运行状态和统计信息
// @Accept application/json
// @Produce application/json
// @Success 200 {object} response.Response{data=map[string]interface{}} "成功"
// @Router /gaia/workflow/worker-pool/status [get]
func (api *BatchWorkflowApi) GetWorkerPoolStatus(c *gin.Context) {
pool := batchWorkflowService.GetWorkerPool()
if pool == nil {
response.FailWithMessage("工作池未初始化", c)
return
}
status := pool.GetStatus()
response.OkWithData(status, c)
}
// RestartWorkerPool 重启工作池
// @Tags BatchWorkflow
// @Summary 重启工作池
// @Description 停止当前工作池并重新启动
// @Accept application/json
// @Produce application/json
// @Param workers query int false "worker数量" default(5)
// @Success 200 {object} response.Response "成功"
// @Router /gaia/workflow/worker-pool/restart [post]
func (api *BatchWorkflowApi) RestartWorkerPool(c *gin.Context) {
workers := global.GVA_CONFIG.System.WorkFlowNumber
if workersParam := c.Query("workers"); workersParam != "" {
if w, err := strconv.Atoi(workersParam); err == nil && w > 0 && w <= 20 {
workers = w
}
}
// 停止当前工作池
batchWorkflowService.StopWorkerPool()
// 启动新的工作池
batchWorkflowService.InitWorkerPool(workers)
response.OkWithMessage("工作池重启成功", c)
}
// StopWorkerPool 停止工作池
// @Tags BatchWorkflow
// @Summary 停止工作池
// @Description 停止当前工作池
// @Accept application/json
// @Produce application/json
// @Success 200 {object} response.Response "成功"
// @Router /gaia/workflow/worker-pool/stop [post]
func (api *BatchWorkflowApi) StopWorkerPool(c *gin.Context) {
batchWorkflowService.StopWorkerPool()
response.OkWithMessage("工作池已停止", c)
}
// StartWorkerPool 启动工作池
// @Tags BatchWorkflow
// @Summary 启动工作池
// @Description 启动工作池
// @Accept application/json
// @Produce application/json
// @Param workers query int false "worker数量" default(5)
// @Success 200 {object} response.Response "成功"
// @Router /gaia/workflow/worker-pool/start [post]
func (api *BatchWorkflowApi) StartWorkerPool(c *gin.Context) {
workers := global.GVA_CONFIG.System.WorkFlowNumber
if workersParam := c.Query("workers"); workersParam != "" {
if w, err := strconv.Atoi(workersParam); err == nil && w > 0 && w <= 20 {
workers = w
}
}
batchWorkflowService.InitWorkerPool(workers)
response.OkWithMessage("工作池启动成功", c)
}
// GetBatchWorkflowList 获取最近30天的批量工作流列表
// @Tags BatchWorkflow
// @Summary 获取最近30天的批量工作流列表
// @Description 获取指定用户最近30天的批量工作流列表,支持分页和按应用过滤
// @Accept application/json
// @Produce application/json
// @Param installed_id query string false "安装的应用ID"
// @Param page query int false "页码" default(1)
// @Param limit query int false "每页数量" default(10)
// @Success 200 {object} response.Response{data=map[string]interface{}} "成功"
// @Router /gaia/workflow/batch/list [get]
func (api *BatchWorkflowApi) GetBatchWorkflowList(c *gin.Context) {
userID := utils.GetUserID(c)
installedID := c.Query("installed_id")
// 解析分页参数
page := 1
limit := 10
if pageParam := c.Query("page"); pageParam != "" {
if p, err := strconv.Atoi(pageParam); err == nil && p > 0 {
page = p
}
}
if limitParam := c.Query("limit"); limitParam != "" {
if l, err := strconv.Atoi(limitParam); err == nil && l > 0 && l <= 100 {
limit = l
}
}
// 调用服务层方法
batchWorkflows, total, err := batchWorkflowService.GetBatchWorkflowList(userID, installedID, page, limit)
if err != nil {
response.FailWithMessage("获取批量工作流列表失败: "+err.Error(), c)
return
}
// 计算分页信息
totalPages := (total + int64(limit) - 1) / int64(limit)
hasMore := int64(page) < totalPages
response.OkWithData(map[string]interface{}{
"items": batchWorkflows,
"total": total,
"page": page,
"limit": limit,
"total_pages": totalPages,
"has_more": hasMore,
}, c)
}