mirror of
https://github.com/YFGaia/dify-plus.git
synced 2026-06-04 10:14:00 +08:00
232 lines
11 KiB
Python
Executable File
232 lines
11 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
"""
|
||
修复卡住的文档:检查并修复那些已经索引完成但状态未更新的文档
|
||
|
||
用法:
|
||
python scripts/fix_stuck_documents.py
|
||
"""
|
||
|
||
import sys
|
||
from pathlib import Path
|
||
|
||
# 添加项目根目录到 Python 路径
|
||
project_root = Path(__file__).parent.parent
|
||
sys.path.insert(0, str(project_root / "api"))
|
||
|
||
from sqlalchemy import func, select
|
||
from app_factory import create_app
|
||
from extensions.ext_database import db
|
||
from libs.datetime_utils import naive_utc_now
|
||
from models.dataset import Document, DocumentSegment
|
||
|
||
|
||
def fix_stuck_documents():
|
||
"""修复卡住的文档:检查并更新状态"""
|
||
|
||
# 创建 Flask 应用并初始化数据库连接
|
||
app = create_app()
|
||
|
||
with app.app_context():
|
||
# 查找状态为 indexing 的文档
|
||
indexing_documents = db.session.query(Document).filter(
|
||
Document.indexing_status == 'indexing'
|
||
).all()
|
||
|
||
if not indexing_documents:
|
||
print("✓ 没有找到状态为 'indexing' 的文档。")
|
||
else:
|
||
print(f"找到 {len(indexing_documents)} 个状态为 'indexing' 的文档,开始检查...\n")
|
||
|
||
fixed_docs = []
|
||
stuck_docs = []
|
||
|
||
for doc in indexing_documents:
|
||
# 检查该文档的所有段落
|
||
segments = db.session.query(DocumentSegment).filter(
|
||
DocumentSegment.document_id == doc.id
|
||
).all()
|
||
|
||
if not segments:
|
||
# 没有段落,可能是异常情况
|
||
print(f" ⚠️ 文档 {doc.id} ({doc.name[:50]}) 没有段落,跳过")
|
||
continue
|
||
|
||
# 统计段落状态
|
||
total_segments = len(segments)
|
||
completed_segments = sum(1 for s in segments if s.status == 'completed')
|
||
indexing_segments = sum(1 for s in segments if s.status == 'indexing')
|
||
waiting_segments = sum(1 for s in segments if s.status == 'waiting')
|
||
|
||
# 检查是否所有段落都已完成
|
||
if completed_segments == total_segments:
|
||
# 所有段落都完成了,但文档状态还是 indexing,需要修复
|
||
print(f" ✓ 文档 {doc.id} ({doc.name[:50]})")
|
||
print(f" 段落: {completed_segments}/{total_segments} 已完成")
|
||
print(f" 状态: indexing -> completed")
|
||
|
||
# 更新文档状态
|
||
doc.indexing_status = 'completed'
|
||
doc.completed_at = naive_utc_now()
|
||
if doc.error:
|
||
doc.error = None
|
||
|
||
# 计算统计信息
|
||
total_tokens = sum(s.tokens for s in segments if s.tokens)
|
||
if total_tokens:
|
||
doc.tokens = total_tokens
|
||
|
||
fixed_docs.append(doc)
|
||
|
||
elif indexing_segments > 0 or waiting_segments > 0:
|
||
# 还有段落在处理中,但可能卡住了
|
||
print(f" ⚠️ 文档 {doc.id} ({doc.name[:50]})")
|
||
print(f" 段落状态: 已完成 {completed_segments}, 索引中 {indexing_segments}, 等待中 {waiting_segments}, 总计 {total_segments}")
|
||
|
||
# 检查索引中的段落是否实际上已经完成了(有 index_node_id)
|
||
indexing_segments_list = [s for s in segments if s.status == 'indexing']
|
||
|
||
# 检查这些段落是否有 index_node_id(说明已经索引完成)
|
||
# 对于大量段落,先采样检查
|
||
if len(indexing_segments_list) > 1000:
|
||
sample_size = 1000
|
||
sample_segments = indexing_segments_list[:sample_size]
|
||
segments_with_node_id_in_sample = [s for s in sample_segments if s.index_node_id]
|
||
|
||
if segments_with_node_id_in_sample:
|
||
print(f" 采样检查:{sample_size} 个段落中有 {len(segments_with_node_id_in_sample)} 个有索引节点")
|
||
print(f" 正在检查所有 {len(indexing_segments_list)} 个段落...")
|
||
# 检查所有段落
|
||
actually_completed_segments = [s for s in indexing_segments_list if s.index_node_id]
|
||
print(f" ⚠️ 发现 {len(actually_completed_segments)}/{indexing_segments} 个段落状态为'索引中'但已有索引节点(实际已完成)")
|
||
stuck_docs.append((doc, actually_completed_segments))
|
||
else:
|
||
print(f" 采样检查:{sample_size} 个段落中都没有索引节点")
|
||
# 检查是否超时
|
||
from datetime import timedelta
|
||
one_hour_ago = naive_utc_now() - timedelta(hours=1)
|
||
stuck_segments = [
|
||
s for s in indexing_segments_list
|
||
if s.indexing_at and s.indexing_at < one_hour_ago
|
||
]
|
||
if stuck_segments:
|
||
print(f" 发现 {len(stuck_segments)} 个卡住的段落(超过1小时)")
|
||
stuck_docs.append((doc, stuck_segments))
|
||
else:
|
||
# 段落数量不多,直接检查所有
|
||
actually_completed_segments = [s for s in indexing_segments_list if s.index_node_id]
|
||
|
||
if actually_completed_segments:
|
||
print(f" ⚠️ 发现 {len(actually_completed_segments)}/{indexing_segments} 个段落状态为'索引中'但已有索引节点(实际已完成)")
|
||
stuck_docs.append((doc, actually_completed_segments))
|
||
else:
|
||
# 检查是否超时
|
||
from datetime import timedelta
|
||
one_hour_ago = naive_utc_now() - timedelta(hours=1)
|
||
stuck_segments = [
|
||
s for s in indexing_segments_list
|
||
if s.indexing_at and s.indexing_at < one_hour_ago
|
||
]
|
||
if stuck_segments:
|
||
print(f" 发现 {len(stuck_segments)} 个卡住的段落(超过1小时)")
|
||
stuck_docs.append((doc, stuck_segments))
|
||
|
||
# 处理卡住的段落
|
||
confirmation = None
|
||
if stuck_docs:
|
||
print(f"\n发现 {len(stuck_docs)} 个文档有需要修复的段落")
|
||
total_stuck_segments = sum(len(segments) for _, segments in stuck_docs)
|
||
print(f"总共需要修复 {total_stuck_segments} 个段落")
|
||
print("是否要将这些段落标记为已完成?(yes/no): ", end='')
|
||
confirmation = input()
|
||
|
||
if confirmation.lower() in ['yes', 'y']:
|
||
fixed_segments_count = 0
|
||
for doc, stuck_segments in stuck_docs:
|
||
print(f"\n处理文档 {doc.id} ({doc.name[:50]}) 的 {len(stuck_segments)} 个段落...")
|
||
for segment in stuck_segments:
|
||
# 检查段落是否真的已经索引了(有 index_node_id)
|
||
if segment.index_node_id:
|
||
segment.status = 'completed'
|
||
segment.completed_at = naive_utc_now()
|
||
segment.enabled = True
|
||
fixed_segments_count += 1
|
||
if fixed_segments_count <= 20: # 只显示前20个
|
||
print(f" ✓ 段落 {segment.id[:8]}... 标记为已完成(有索引节点)")
|
||
elif fixed_segments_count == 21:
|
||
print(" ... (更多段落正在更新)")
|
||
else:
|
||
# 没有索引节点,可能索引失败了
|
||
segment.status = 'error'
|
||
segment.error = '索引超时:段落处理超过1小时未完成'
|
||
fixed_segments_count += 1
|
||
if fixed_segments_count <= 20:
|
||
print(f" ⚠️ 段落 {segment.id[:8]}... 标记为错误(无索引节点)")
|
||
|
||
print(f"\n已处理 {fixed_segments_count} 个段落,重新检查文档状态...")
|
||
|
||
# 重新检查这些文档
|
||
for doc, _ in stuck_docs:
|
||
# 重新查询以确保获取最新状态
|
||
db.session.refresh(doc)
|
||
segments = db.session.query(DocumentSegment).filter(
|
||
DocumentSegment.document_id == doc.id
|
||
).all()
|
||
completed_count = sum(1 for s in segments if s.status == 'completed')
|
||
total_count = len(segments)
|
||
|
||
print(f" 文档 {doc.id}: {completed_count}/{total_count} 段落已完成")
|
||
|
||
if completed_count == total_count:
|
||
doc.indexing_status = 'completed'
|
||
doc.completed_at = naive_utc_now()
|
||
if doc.error:
|
||
doc.error = None
|
||
fixed_docs.append(doc)
|
||
print(f" ✓ 文档 {doc.id} 现在可以标记为已完成")
|
||
elif completed_count > 0:
|
||
print(f" ⚠️ 文档 {doc.id} 还有 {total_count - completed_count} 个段落未完成")
|
||
|
||
# 提交所有更改
|
||
if fixed_docs or (stuck_docs and confirmation and confirmation.lower() in ['yes', 'y']):
|
||
try:
|
||
db.session.commit()
|
||
print(f"\n✓ 成功修复 {len(fixed_docs)} 个文档的状态")
|
||
if stuck_docs and confirmation.lower() in ['yes', 'y']:
|
||
print(f"✓ 修复了卡住的段落")
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
print(f"\n✗ 错误:提交更改时出错: {e}")
|
||
sys.exit(1)
|
||
else:
|
||
print("\n没有需要修复的文档。")
|
||
|
||
# 额外检查:查找状态为 paused 但段落都完成的文档
|
||
print("\n" + "="*60)
|
||
print("检查暂停状态但已完成的文档...")
|
||
|
||
paused_documents = db.session.query(Document).filter(
|
||
Document.is_paused == True,
|
||
Document.indexing_status.in_(['waiting', 'parsing', 'cleaning', 'splitting', 'indexing'])
|
||
).all()
|
||
|
||
if paused_documents:
|
||
print(f"找到 {len(paused_documents)} 个暂停的文档,检查段落状态...")
|
||
|
||
for doc in paused_documents:
|
||
segments = db.session.query(DocumentSegment).filter(
|
||
DocumentSegment.document_id == doc.id
|
||
).all()
|
||
|
||
if segments:
|
||
completed_count = sum(1 for s in segments if s.status == 'completed')
|
||
total_count = len(segments)
|
||
|
||
if completed_count == total_count:
|
||
print(f" ✓ 文档 {doc.id} ({doc.name[:50]}) 已暂停但段落都完成了")
|
||
print(f" 建议:可以取消暂停并标记为已完成")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
fix_stuck_documents()
|
||
|