Merge upstream/release/e-1.11.2 into 1.11.4

# Conflicts:
#	.github/workflows/tool-test-sdks.yaml
#	.github/workflows/translate-i18n-claude.yml
#	api/core/rag/datasource/retrieval_service.py
#	web/app/components/app/create-app-dialog/app-list/index.tsx
#	web/app/components/header/account-setting/members-page/operation/index.tsx
#	web/i18n-config/server.ts
#	web/package.json
#	web/pnpm-lock.yaml
This commit is contained in:
npc0-hue
2026-01-16 15:53:11 +08:00
15 changed files with 743 additions and 1194 deletions
+24 -50
View File
@@ -1,5 +1,4 @@
import concurrent.futures
import logging
from concurrent.futures import ThreadPoolExecutor
from typing import Any
@@ -14,7 +13,7 @@ from core.model_runtime.entities.model_entities import ModelType
from core.rag.data_post_processor.data_post_processor import DataPostProcessor
from core.rag.datasource.keyword.keyword_factory import Keyword
from core.rag.datasource.vdb.vector_factory import Vector
from core.rag.embedding.retrieval import RetrievalChildChunk, RetrievalSegments
from core.rag.embedding.retrieval import RetrievalSegments
from core.rag.entities.metadata_entities import MetadataCondition
from core.rag.index_processor.constant.doc_type import DocType
from core.rag.index_processor.constant.index_type import IndexStructureType
@@ -37,8 +36,6 @@ default_retrieval_model = {
"score_threshold_enabled": False,
}
logger = logging.getLogger(__name__)
class RetrievalService:
# Cache precompiled regular expressions to avoid repeated compilation
@@ -109,12 +106,7 @@ class RetrievalService:
)
)
if futures:
for future in concurrent.futures.as_completed(futures, timeout=3600):
if exceptions:
for f in futures:
f.cancel()
break
concurrent.futures.wait(futures, timeout=3600, return_when=concurrent.futures.ALL_COMPLETED)
if exceptions:
raise ValueError(";\n".join(exceptions))
@@ -218,7 +210,6 @@ class RetrievalService:
)
all_documents.extend(documents)
except Exception as e:
logger.error(e, exc_info=True)
exceptions.append(str(e))
@classmethod
@@ -312,7 +303,6 @@ class RetrievalService:
else:
all_documents.extend(documents)
except Exception as e:
logger.error(e, exc_info=True)
exceptions.append(str(e))
@classmethod
@@ -361,7 +351,6 @@ class RetrievalService:
else:
all_documents.extend(documents)
except Exception as e:
logger.error(e, exc_info=True)
exceptions.append(str(e))
@staticmethod
@@ -427,12 +416,12 @@ class RetrievalService:
child_index_node_ids = [i for i in child_index_node_ids if i]
index_node_ids = [i for i in index_node_ids if i]
segment_ids: list[str] = []
segment_ids = []
index_node_segments: list[DocumentSegment] = []
segments: list[DocumentSegment] = []
attachment_map: dict[str, list[dict[str, Any]]] = {}
child_chunk_map: dict[str, list[ChildChunk]] = {}
doc_segment_map: dict[str, list[str]] = {}
attachment_map = {}
child_chunk_map: dict[Any, Any] = {}
doc_segment_map = {}
with session_factory.create_session() as session:
attachments = cls.get_segment_attachment_infos(image_doc_ids, session)
@@ -443,7 +432,7 @@ class RetrievalService:
attachment_map[attachment["segment_id"]].append(attachment["attachment_info"])
else:
attachment_map[attachment["segment_id"]] = [attachment["attachment_info"]]
if attachment["segment_id"] in doc_segment_map:
if attachment["attachment_id"] in doc_segment_map:
doc_segment_map[attachment["segment_id"]].append(attachment["attachment_id"])
else:
doc_segment_map[attachment["segment_id"]] = [attachment["attachment_id"]]
@@ -513,7 +502,7 @@ class RetrievalService:
"child_chunks": child_chunk_details,
}
segment_child_map[segment.id] = map_detail
record: dict[str, Any] = {
record = {
"segment": segment,
}
records.append(record)
@@ -521,13 +510,13 @@ class RetrievalService:
if segment.id not in include_segment_ids:
include_segment_ids.add(segment.id)
max_score = 0.0
segment_document = doc_to_document_map.get(segment.index_node_id)
if segment_document:
max_score = max(max_score, segment_document.metadata.get("score", 0.0))
document = doc_to_document_map.get(segment.index_node_id)
if document:
max_score = max(max_score, document.metadata.get("score", 0.0))
for attachment_info in attachment_infos:
file_doc = doc_to_document_map.get(attachment_info["id"])
if file_doc:
max_score = max(max_score, file_doc.metadata.get("score", 0.0))
file_document = doc_to_document_map.get(attachment_info["id"])
if file_document:
max_score = max(max_score, file_document.metadata.get("score", 0.0))
record = {
"segment": segment,
"score": max_score,
@@ -542,26 +531,18 @@ class RetrievalService:
if record["segment"].id in attachment_map:
record["files"] = attachment_map[record["segment"].id] # type: ignore[assignment]
result: list[RetrievalSegments] = []
result = []
for record in records:
# Extract segment
segment = record["segment"]
# Extract child_chunks, ensuring it's a list or None
raw_child_chunks = record.get("child_chunks")
child_chunks_list: list[RetrievalChildChunk] | None = None
if isinstance(raw_child_chunks, list):
# Sort by score descending
sorted_chunks = sorted(raw_child_chunks, key=lambda x: x.get("score", 0.0), reverse=True)
child_chunks_list = [
RetrievalChildChunk(
id=chunk["id"],
content=chunk["content"],
score=chunk.get("score", 0.0),
position=chunk["position"],
)
for chunk in sorted_chunks
]
child_chunks = record.get("child_chunks")
if not isinstance(child_chunks, list):
child_chunks = None
if child_chunks:
child_chunks = sorted(child_chunks, key=lambda x: x.get("score", 0.0), reverse=True)
# Extract files, ensuring it's a list or None
files = record.get("files")
@@ -578,11 +559,11 @@ class RetrievalService:
# Create RetrievalSegments object
retrieval_segment = RetrievalSegments(
segment=segment, child_chunks=child_chunks_list, score=score, files=files
segment=segment, child_chunks=child_chunks, score=score, files=files
)
result.append(retrieval_segment)
return sorted(result, key=lambda x: x.score if x.score is not None else 0.0, reverse=True)
return sorted(result, key=lambda x: x.score, reverse=True)
except Exception as e:
db.session.rollback()
raise e
@@ -674,14 +655,7 @@ class RetrievalService:
document_ids_filter=document_ids_filter,
)
)
# Use as_completed for early error propagation - cancel remaining futures on first error
if futures:
for future in concurrent.futures.as_completed(futures, timeout=300):
if future.exception():
# Cancel remaining futures to avoid unnecessary waiting
for f in futures:
f.cancel()
break
concurrent.futures.wait(futures, timeout=300, return_when=concurrent.futures.ALL_COMPLETED)
if exceptions:
raise ValueError(";\n".join(exceptions))
+2
View File
@@ -6,6 +6,7 @@ from .create_site_record_when_app_created import handle as handle_create_site_re
from .delete_tool_parameters_cache_when_sync_draft_workflow import (
handle as handle_delete_tool_parameters_cache_when_sync_draft_workflow,
)
from .queue_credential_sync_when_tenant_created import handle as handle_queue_credential_sync_when_tenant_created
from .sync_plugin_trigger_when_app_created import handle as handle_sync_plugin_trigger_when_app_created
from .sync_webhook_when_app_created import handle as handle_sync_webhook_when_app_created
from .sync_workflow_schedule_when_app_published import handle as handle_sync_workflow_schedule_when_app_published
@@ -33,6 +34,7 @@ __all__ = [
"handle_create_installed_app_when_app_created",
"handle_create_site_record_when_app_created",
"handle_delete_tool_parameters_cache_when_sync_draft_workflow",
"handle_queue_credential_sync_when_tenant_created",
"handle_sync_plugin_trigger_when_app_created",
"handle_sync_webhook_when_app_created",
"handle_sync_workflow_schedule_when_app_published",
@@ -0,0 +1,19 @@
from configs import dify_config
from events.tenant_event import tenant_was_created
from services.enterprise.workspace_sync import WorkspaceSyncService
@tenant_was_created.connect
def handle(sender, **kwargs):
"""Queue credential sync when a tenant/workspace is created."""
# Only queue sync tasks if plugin manager (enterprise feature) is enabled
if not dify_config.ENTERPRISE_ENABLED:
return
tenant = sender
# Determine source from kwargs if available, otherwise use generic
source = kwargs.get("source", "tenant_created")
# Queue credential sync task to Redis for enterprise backend to process
WorkspaceSyncService.queue_credential_sync(tenant.id, source=source)
+58
View File
@@ -0,0 +1,58 @@
import json
import logging
import uuid
from datetime import UTC, datetime
from redis import RedisError
from extensions.ext_redis import redis_client
logger = logging.getLogger(__name__)
WORKSPACE_SYNC_QUEUE = "enterprise:workspace:sync:queue"
WORKSPACE_SYNC_PROCESSING = "enterprise:workspace:sync:processing"
class WorkspaceSyncService:
"""Service to publish workspace sync tasks to Redis queue for enterprise backend consumption"""
@staticmethod
def queue_credential_sync(workspace_id: str, *, source: str) -> bool:
"""
Queue a credential sync task for a newly created workspace.
This publishes a task to Redis that will be consumed by the enterprise backend
worker to sync credentials with the plugin-manager.
Args:
workspace_id: The workspace/tenant ID to sync credentials for
source: Source of the sync request (for debugging/tracking)
Returns:
bool: True if task was queued successfully, False otherwise
"""
try:
task = {
"task_id": str(uuid.uuid4()),
"workspace_id": workspace_id,
"retry_count": 0,
"created_at": datetime.now(UTC).isoformat(),
"source": source,
}
# Push to Redis list (queue) - LPUSH adds to the head, worker consumes from tail with RPOP
redis_client.lpush(WORKSPACE_SYNC_QUEUE, json.dumps(task))
logger.info(
"Queued credential sync task for workspace %s, task_id: %s, source: %s",
workspace_id,
task["task_id"],
source,
)
return True
except (RedisError, TypeError) as e:
logger.error("Failed to queue credential sync for workspace %s: %s", workspace_id, str(e), exc_info=True)
# Don't raise - we don't want to fail workspace creation if queueing fails
# The scheduled task will catch it later
return False