mirror of
https://github.com/YFGaia/dify-plus.git
synced 2026-06-04 10:14:00 +08:00
17832f2424
本次提交整合了多个功能改进和问题修复: 主要功能: - 批量工作流处理功能完善,支持 Excel 上传和进度跟踪 - 管理中心反向代理和转发配置优化 - 用户同步添加互斥锁,防止并发问题 - 计费系统和额度显示优化 - AI 绘图功能扩展 前端改进: - 文本生成应用显示修复 - 批量任务进度展示优化 - 按钮样式和 CSS 优化,禁止换行 - 多语言支持完善(新增印尼语等) - 构建镜像逻辑优化 - 批量处理进度管理器实现 后端改进: - Docker Compose 配置升级 - 队列任务和 Worker Pool 优化 - Admin API 初始化和验证逻辑改进 - 数据库迁移和初始化完善 - 静态变量处理优化 - URL 签名助手实现 - Celery 扩展优化 - 代码和导入包问题修复(idea 自动调整代码位置) 技术改进: - 兼容性修复 (flask-restx, jschardet) - 钉钉 Web API 版本更新 - 代码格式化和导入包问题修复 - 日志处理优化 - 工作流循环管理优化 Docker 相关: - Nginx 配置更新 - 容器启动脚本优化 - 镜像构建流程改进 - docker-compose.dify-plus.yaml 大幅更新 管理后台: - 工作流批量处理 API 实现 - 工作池初始化 - 批量工作流服务实现 - 转发扩展配置 - 用户服务扩展
303 lines
9.0 KiB
TypeScript
303 lines
9.0 KiB
TypeScript
/**
|
|
* 全局批量进度轮询管理器
|
|
* 解决多个BatchProgress组件同时轮询导致的性能问题
|
|
*/
|
|
|
|
import { fetchProgressApi } from '@/service/web-extend'
|
|
|
|
export type BatchStatus = 'pending' | 'processing' | 'completed' | 'failed' | 'stopped'
|
|
|
|
export type BatchProgressData = {
|
|
id: string
|
|
status: BatchStatus
|
|
total_rows: number
|
|
processed_rows: number
|
|
progress: number
|
|
pending_count: number
|
|
running_count: number
|
|
completed_count: number
|
|
failed_count: number
|
|
error?: string // 添加错误信息字段
|
|
created_at: string
|
|
updated_at: string
|
|
}
|
|
|
|
type BatchProgressListener = (data: BatchProgressData | null) => void
|
|
|
|
class BatchProgressManager {
|
|
private subscribers: Map<string, Set<BatchProgressListener>> = new Map()
|
|
private pollingTimer: NodeJS.Timeout | null = null
|
|
private pollingInterval = 3000 // 3秒轮询间隔
|
|
private lastProgressData: Map<string, BatchProgressData> = new Map()
|
|
private forcePollingBatches: Map<string, number> = new Map() // batchId -> until timestamp
|
|
|
|
private static instance: BatchProgressManager | null = null
|
|
|
|
public static getInstance(): BatchProgressManager {
|
|
if (!BatchProgressManager.instance)
|
|
BatchProgressManager.instance = new BatchProgressManager()
|
|
|
|
return BatchProgressManager.instance
|
|
}
|
|
|
|
private constructor() {
|
|
// 页面可见性变化时调整轮询频率
|
|
if (typeof document !== 'undefined') {
|
|
document.addEventListener('visibilitychange', () => {
|
|
if (document.visibilityState === 'visible') {
|
|
this.adjustPollingInterval(5000) // 页面可见时正常轮询
|
|
this.pollAllBatches() // 立即轮询一次
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 订阅批量任务进度更新
|
|
*/
|
|
public subscribe(batchId: string, listener: BatchProgressListener): () => void {
|
|
if (!this.subscribers.has(batchId))
|
|
this.subscribers.set(batchId, new Set())
|
|
|
|
this.subscribers.get(batchId)!.add(listener)
|
|
|
|
// 如果有运行时缓存数据,立即通知
|
|
const cachedData = this.lastProgressData.get(batchId)
|
|
if (cachedData)
|
|
listener(cachedData)
|
|
|
|
// 开始轮询
|
|
this.startPolling()
|
|
|
|
// 返回取消订阅函数
|
|
return () => {
|
|
this.unsubscribe(batchId, listener)
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 取消订阅
|
|
*/
|
|
public unsubscribe(batchId: string, listener: BatchProgressListener): void {
|
|
const listeners = this.subscribers.get(batchId)
|
|
if (listeners) {
|
|
listeners.delete(listener)
|
|
if (listeners.size === 0) {
|
|
this.subscribers.delete(batchId)
|
|
this.lastProgressData.delete(batchId)
|
|
this.forcePollingBatches.delete(batchId)
|
|
}
|
|
}
|
|
|
|
// 如果没有订阅者了,停止轮询
|
|
if (this.subscribers.size === 0)
|
|
this.stopPolling()
|
|
}
|
|
|
|
/**
|
|
* 强制轮询特定批量任务一段时间(用于重试/恢复后)
|
|
*/
|
|
public forcePolling(batchId: string, durationMs: number = 15000): void {
|
|
this.forcePollingBatches.set(batchId, Date.now() + durationMs)
|
|
}
|
|
|
|
/**
|
|
* 立即获取特定批量任务的进度
|
|
*/
|
|
public async fetchProgress(batchId: string): Promise<BatchProgressData | null> {
|
|
try {
|
|
const data = await fetchProgressApi(batchId)
|
|
if (data) {
|
|
// 验证和修复数据
|
|
const sanitizedData = this.sanitizeProgressData(data, batchId)
|
|
this.lastProgressData.set(batchId, sanitizedData)
|
|
this.notifyListeners(batchId, sanitizedData)
|
|
return sanitizedData
|
|
}
|
|
return data
|
|
}
|
|
catch (error) {
|
|
console.error(`获取批量任务 ${batchId} 进度失败:`, error)
|
|
return null
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 验证和修复进度数据
|
|
*/
|
|
private sanitizeProgressData(data: any, batchId: string): BatchProgressData {
|
|
// 确保数字字段的有效性
|
|
const total_rows = isNaN(Number(data.total_rows)) ? 0 : Number(data.total_rows)
|
|
const processed_rows = isNaN(Number(data.processed_rows)) ? 0 : Number(data.processed_rows)
|
|
const pending_count = isNaN(Number(data.pending_count)) ? 0 : Number(data.pending_count)
|
|
const running_count = isNaN(Number(data.running_count)) ? 0 : Number(data.running_count)
|
|
const completed_count = isNaN(Number(data.completed_count)) ? 0 : Number(data.completed_count)
|
|
const failed_count = isNaN(Number(data.failed_count)) ? 0 : Number(data.failed_count)
|
|
|
|
// 计算进度百分比
|
|
let progress = 0
|
|
if (total_rows > 0)
|
|
progress = (processed_rows / total_rows) * 100
|
|
|
|
if (isNaN(progress))
|
|
progress = 0
|
|
|
|
// 确保日期字段的有效性
|
|
let created_at = data.created_at
|
|
let updated_at = data.updated_at
|
|
|
|
if (!created_at || isNaN(new Date(created_at).getTime()))
|
|
created_at = new Date().toISOString()
|
|
|
|
if (!updated_at || isNaN(new Date(updated_at).getTime()))
|
|
updated_at = new Date().toISOString()
|
|
|
|
return {
|
|
id: data.id || batchId,
|
|
status: data.status || 'pending',
|
|
total_rows,
|
|
processed_rows,
|
|
progress,
|
|
pending_count,
|
|
running_count,
|
|
completed_count,
|
|
failed_count,
|
|
error: data.error || undefined, // 添加错误信息字段
|
|
created_at,
|
|
updated_at,
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 调整轮询间隔
|
|
*/
|
|
private adjustPollingInterval(interval: number): void {
|
|
if (this.pollingInterval !== interval) {
|
|
this.pollingInterval = interval
|
|
if (this.pollingTimer) {
|
|
this.stopPolling()
|
|
this.startPolling()
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 开始轮询
|
|
*/
|
|
private startPolling(): void {
|
|
if (this.pollingTimer || this.subscribers.size === 0)
|
|
return
|
|
|
|
this.pollingTimer = setInterval(() => {
|
|
this.pollAllBatches()
|
|
}, this.pollingInterval)
|
|
|
|
// 立即执行一次轮询
|
|
this.pollAllBatches()
|
|
}
|
|
|
|
/**
|
|
* 停止轮询
|
|
*/
|
|
private stopPolling(): void {
|
|
if (this.pollingTimer) {
|
|
clearInterval(this.pollingTimer)
|
|
this.pollingTimer = null
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 轮询所有需要更新的批量任务
|
|
*/
|
|
private async pollAllBatches(): Promise<void> {
|
|
const now = Date.now()
|
|
const batchesToPoll: string[] = []
|
|
|
|
for (const [batchId] of this.subscribers) {
|
|
// 检查是否是已完成/失败的缓存任务,如果是则跳过轮询
|
|
const completedCachedData = this.cachedCompletedTasks.get(batchId)
|
|
if (completedCachedData && (completedCachedData.status === 'completed' || completedCachedData.status === 'failed'))
|
|
continue // 跳过已完成和已失败的任务,减少不必要的请求
|
|
|
|
const lastData = this.lastProgressData.get(batchId)
|
|
const forceUntil = this.forcePollingBatches.get(batchId)
|
|
|
|
// 决定是否需要轮询
|
|
const shouldForcePoll = forceUntil && now < forceUntil
|
|
const shouldPollForActiveStatus = lastData?.status === 'processing' || lastData?.status === 'pending'
|
|
const shouldPollForNoData = !lastData
|
|
// 跳过已完成和已失败的任务(除非强制轮询)
|
|
const shouldSkipCompletedTasks = lastData?.status === 'completed' || lastData?.status === 'failed'
|
|
|
|
if ((shouldForcePoll || shouldPollForNoData || shouldPollForActiveStatus) && !shouldSkipCompletedTasks)
|
|
batchesToPoll.push(batchId)
|
|
|
|
// 清理过期的强制轮询
|
|
if (forceUntil && now >= forceUntil)
|
|
this.forcePollingBatches.delete(batchId)
|
|
}
|
|
|
|
// 批量获取进度 - 这里可以考虑后端提供批量API以进一步优化
|
|
if (batchesToPoll.length > 0) {
|
|
// 为了避免同时发送太多请求,分批处理
|
|
const batchSize = 10
|
|
for (let i = 0; i < batchesToPoll.length; i += batchSize) {
|
|
const batch = batchesToPoll.slice(i, i + batchSize)
|
|
await Promise.all(batch.map(batchId => this.fetchProgress(batchId)))
|
|
|
|
// 在批次之间稍微延迟,避免请求过于密集
|
|
if (i + batchSize < batchesToPoll.length)
|
|
await new Promise(resolve => setTimeout(resolve, 100))
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 通知监听器
|
|
*/
|
|
private notifyListeners(batchId: string, data: BatchProgressData | null): void {
|
|
const listeners = this.subscribers.get(batchId)
|
|
if (listeners) {
|
|
listeners.forEach((listener) => {
|
|
try {
|
|
listener(data)
|
|
}
|
|
catch (error) {
|
|
console.error('批量进度监听器错误:', error)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 获取当前订阅的批量任务数量(用于调试)
|
|
*/
|
|
public getSubscribedBatchCount(): number {
|
|
return this.subscribers.size
|
|
}
|
|
|
|
/**
|
|
* 获取当前轮询间隔(用于调试)
|
|
*/
|
|
public getPollingInterval(): number {
|
|
return this.pollingInterval
|
|
}
|
|
|
|
/**
|
|
* 清理特定任务的完成缓存(公开方法,供组件调用)
|
|
*/
|
|
public clearCompletedTaskCache(batchId: string): void {
|
|
this.removeFromCompletedCache(batchId)
|
|
}
|
|
|
|
/**
|
|
* 检查任务是否已缓存为完成状态
|
|
*/
|
|
public isTaskCompleted(batchId: string): boolean {
|
|
const cachedData = this.cachedCompletedTasks.get(batchId)
|
|
return cachedData ? (cachedData.status === 'completed' || cachedData.status === 'failed') : false
|
|
}
|
|
}
|
|
|
|
export default BatchProgressManager.getInstance()
|