feat: 合并更新到1.11.2

This commit is contained in:
npc0-hue
2026-01-15 18:02:46 +08:00
7447 changed files with 739734 additions and 263694 deletions
+26 -10
View File
@@ -1,3 +1,4 @@
import math
import time
import click
@@ -5,9 +6,10 @@ import click
import app
from extensions.ext_database import db
from models.account import TenantPluginAutoUpgradeStrategy
from tasks.process_tenant_plugin_autoupgrade_check_task import process_tenant_plugin_autoupgrade_check_task
from tasks import process_tenant_plugin_autoupgrade_check_task as check_task
AUTO_UPGRADE_MINIMAL_CHECKING_INTERVAL = 15 * 60 # 15 minutes
MAX_CONCURRENT_CHECK_TASKS = 20
@app.celery.task(queue="plugin")
@@ -30,15 +32,29 @@ def check_upgradable_plugin_task():
.all()
)
for strategy in strategies:
process_tenant_plugin_autoupgrade_check_task.delay(
strategy.tenant_id,
strategy.strategy_setting,
strategy.upgrade_time_of_day,
strategy.upgrade_mode,
strategy.exclude_plugins,
strategy.include_plugins,
)
total_strategies = len(strategies)
click.echo(click.style(f"Total strategies: {total_strategies}", fg="green"))
batch_chunk_count = math.ceil(
total_strategies / MAX_CONCURRENT_CHECK_TASKS
) # make sure all strategies are checked in this interval
batch_interval_time = (AUTO_UPGRADE_MINIMAL_CHECKING_INTERVAL / batch_chunk_count) if batch_chunk_count > 0 else 0
for i in range(0, total_strategies, MAX_CONCURRENT_CHECK_TASKS):
batch_strategies = strategies[i : i + MAX_CONCURRENT_CHECK_TASKS]
for strategy in batch_strategies:
check_task.process_tenant_plugin_autoupgrade_check_task.delay(
strategy.tenant_id,
strategy.strategy_setting,
strategy.upgrade_time_of_day,
strategy.upgrade_mode,
strategy.exclude_plugins,
strategy.include_plugins,
)
# Only sleep if batch_interval_time > 0.0001 AND current batch is not the last one
if batch_interval_time > 0.0001 and i + MAX_CONCURRENT_CHECK_TASKS < total_strategies:
time.sleep(batch_interval_time)
end_at = time.perf_counter()
click.echo(
+2 -1
View File
@@ -7,6 +7,7 @@ from sqlalchemy.exc import SQLAlchemyError
import app
from configs import dify_config
from enums.cloud_plan import CloudPlan
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from models.model import (
@@ -63,7 +64,7 @@ def clean_messages():
plan = features.billing.subscription.plan
else:
plan = plan_cache.decode()
if plan == "sandbox":
if plan == CloudPlan.SANDBOX:
# clean related message
db.session.query(MessageFeedback).where(MessageFeedback.message_id == message.id).delete(
synchronize_session=False
+12 -13
View File
@@ -1,6 +1,6 @@
import datetime
import time
from typing import Optional, TypedDict
from typing import TypedDict
import click
from sqlalchemy import func, select
@@ -9,6 +9,7 @@ from sqlalchemy.exc import SQLAlchemyError
import app
from configs import dify_config
from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
from enums.cloud_plan import CloudPlan
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from models.dataset import Dataset, DatasetAutoDisableLog, DatasetQuery, Document
@@ -17,7 +18,7 @@ from services.feature_service import FeatureService
class CleanupConfig(TypedDict):
clean_day: datetime.datetime
plan_filter: Optional[str]
plan_filter: str | None
add_logs: bool
@@ -35,7 +36,7 @@ def clean_unused_datasets_task():
},
{
"clean_day": datetime.datetime.now() - datetime.timedelta(days=dify_config.PLAN_PRO_CLEAN_DAY_SETTING),
"plan_filter": "sandbox",
"plan_filter": CloudPlan.SANDBOX,
"add_logs": False,
},
]
@@ -96,11 +97,11 @@ def clean_unused_datasets_task():
break
for dataset in datasets:
dataset_query = (
db.session.query(DatasetQuery)
.where(DatasetQuery.created_at > clean_day, DatasetQuery.dataset_id == dataset.id)
.all()
)
dataset_query = db.session.scalars(
select(DatasetQuery).where(
DatasetQuery.created_at > clean_day, DatasetQuery.dataset_id == dataset.id
)
).all()
if not dataset_query or len(dataset_query) == 0:
try:
@@ -121,15 +122,13 @@ def clean_unused_datasets_task():
if should_clean:
# Add auto disable log if required
if add_logs:
documents = (
db.session.query(Document)
.where(
documents = db.session.scalars(
select(Document).where(
Document.dataset_id == dataset.id,
Document.enabled == True,
Document.archived == False,
)
.all()
)
).all()
for document in documents:
dataset_auto_disable_log = DatasetAutoDisableLog(
tenant_id=dataset.tenant_id,
+59 -66
View File
@@ -1,8 +1,11 @@
import datetime
import logging
import time
from collections.abc import Sequence
import click
from sqlalchemy import select
from sqlalchemy.orm import Session, sessionmaker
import app
from configs import dify_config
@@ -35,50 +38,53 @@ def clean_workflow_runlogs_precise():
retention_days = dify_config.WORKFLOW_LOG_RETENTION_DAYS
cutoff_date = datetime.datetime.now() - datetime.timedelta(days=retention_days)
session_factory = sessionmaker(db.engine, expire_on_commit=False)
try:
total_workflow_runs = db.session.query(WorkflowRun).where(WorkflowRun.created_at < cutoff_date).count()
if total_workflow_runs == 0:
logger.info("No expired workflow run logs found")
return
logger.info("Found %s expired workflow run logs to clean", total_workflow_runs)
with session_factory.begin() as session:
total_workflow_runs = session.query(WorkflowRun).where(WorkflowRun.created_at < cutoff_date).count()
if total_workflow_runs == 0:
logger.info("No expired workflow run logs found")
return
logger.info("Found %s expired workflow run logs to clean", total_workflow_runs)
total_deleted = 0
failed_batches = 0
batch_count = 0
while True:
workflow_runs = (
db.session.query(WorkflowRun.id).where(WorkflowRun.created_at < cutoff_date).limit(BATCH_SIZE).all()
)
with session_factory.begin() as session:
workflow_run_ids = session.scalars(
select(WorkflowRun.id)
.where(WorkflowRun.created_at < cutoff_date)
.order_by(WorkflowRun.created_at, WorkflowRun.id)
.limit(BATCH_SIZE)
).all()
if not workflow_runs:
break
workflow_run_ids = [run.id for run in workflow_runs]
batch_count += 1
success = _delete_batch_with_retry(workflow_run_ids, failed_batches)
if success:
total_deleted += len(workflow_run_ids)
failed_batches = 0
else:
failed_batches += 1
if failed_batches >= MAX_RETRIES:
logger.error("Failed to delete batch after %s retries, aborting cleanup for today", MAX_RETRIES)
if not workflow_run_ids:
break
batch_count += 1
success = _delete_batch(session, workflow_run_ids, failed_batches)
if success:
total_deleted += len(workflow_run_ids)
failed_batches = 0
else:
# Calculate incremental delay times: 5, 10, 15 minutes
retry_delay_minutes = failed_batches * 5
logger.warning("Batch deletion failed, retrying in %s minutes...", retry_delay_minutes)
time.sleep(retry_delay_minutes * 60)
continue
failed_batches += 1
if failed_batches >= MAX_RETRIES:
logger.error("Failed to delete batch after %s retries, aborting cleanup for today", MAX_RETRIES)
break
else:
# Calculate incremental delay times: 5, 10, 15 minutes
retry_delay_minutes = failed_batches * 5
logger.warning("Batch deletion failed, retrying in %s minutes...", retry_delay_minutes)
time.sleep(retry_delay_minutes * 60)
continue
logger.info("Cleanup completed: %s expired workflow run logs deleted", total_deleted)
except Exception:
db.session.rollback()
logger.exception("Unexpected error in workflow log cleanup")
raise
@@ -87,69 +93,56 @@ def clean_workflow_runlogs_precise():
click.echo(click.style(f"Cleaned workflow run logs from db success latency: {execution_time:.2f}s", fg="green"))
def _delete_batch_with_retry(workflow_run_ids: list[str], attempt_count: int) -> bool:
"""Delete a single batch with a retry mechanism and complete cascading deletion"""
def _delete_batch(session: Session, workflow_run_ids: Sequence[str], attempt_count: int) -> bool:
"""Delete a single batch of workflow runs and all related data within a nested transaction."""
try:
with db.session.begin_nested():
with session.begin_nested():
message_data = (
db.session.query(Message.id, Message.conversation_id)
session.query(Message.id, Message.conversation_id)
.where(Message.workflow_run_id.in_(workflow_run_ids))
.all()
)
message_id_list = [msg.id for msg in message_data]
conversation_id_list = list({msg.conversation_id for msg in message_data if msg.conversation_id})
if message_id_list:
db.session.query(AppAnnotationHitHistory).where(
AppAnnotationHitHistory.message_id.in_(message_id_list)
).delete(synchronize_session=False)
message_related_models = [
AppAnnotationHitHistory,
MessageAgentThought,
MessageChain,
MessageFile,
MessageAnnotation,
MessageFeedback,
]
for model in message_related_models:
session.query(model).where(model.message_id.in_(message_id_list)).delete(synchronize_session=False) # type: ignore
# error: "DeclarativeAttributeIntercept" has no attribute "message_id". But this type is only in lib
# and these 6 types all have the message_id field.
db.session.query(MessageAgentThought).where(MessageAgentThought.message_id.in_(message_id_list)).delete(
session.query(Message).where(Message.workflow_run_id.in_(workflow_run_ids)).delete(
synchronize_session=False
)
db.session.query(MessageChain).where(MessageChain.message_id.in_(message_id_list)).delete(
synchronize_session=False
)
db.session.query(MessageFile).where(MessageFile.message_id.in_(message_id_list)).delete(
synchronize_session=False
)
db.session.query(MessageAnnotation).where(MessageAnnotation.message_id.in_(message_id_list)).delete(
synchronize_session=False
)
db.session.query(MessageFeedback).where(MessageFeedback.message_id.in_(message_id_list)).delete(
synchronize_session=False
)
db.session.query(Message).where(Message.workflow_run_id.in_(workflow_run_ids)).delete(
synchronize_session=False
)
db.session.query(WorkflowAppLog).where(WorkflowAppLog.workflow_run_id.in_(workflow_run_ids)).delete(
session.query(WorkflowAppLog).where(WorkflowAppLog.workflow_run_id.in_(workflow_run_ids)).delete(
synchronize_session=False
)
db.session.query(WorkflowNodeExecutionModel).where(
session.query(WorkflowNodeExecutionModel).where(
WorkflowNodeExecutionModel.workflow_run_id.in_(workflow_run_ids)
).delete(synchronize_session=False)
if conversation_id_list:
db.session.query(ConversationVariable).where(
session.query(ConversationVariable).where(
ConversationVariable.conversation_id.in_(conversation_id_list)
).delete(synchronize_session=False)
db.session.query(Conversation).where(Conversation.id.in_(conversation_id_list)).delete(
session.query(Conversation).where(Conversation.id.in_(conversation_id_list)).delete(
synchronize_session=False
)
db.session.query(WorkflowRun).where(WorkflowRun.id.in_(workflow_run_ids)).delete(synchronize_session=False)
session.query(WorkflowRun).where(WorkflowRun.id.in_(workflow_run_ids)).delete(synchronize_session=False)
db.session.commit()
return True
return True
except Exception:
db.session.rollback()
logger.exception("Batch deletion failed (attempt %s)", attempt_count + 1)
return False
@@ -3,13 +3,15 @@ import time
from collections import defaultdict
import click
from sqlalchemy import select
import app
from configs import dify_config
from enums.cloud_plan import CloudPlan
from extensions.ext_database import db
from extensions.ext_mail import mail
from libs.email_i18n import EmailType, get_email_i18n_service
from models.account import Account, Tenant, TenantAccountJoin
from models import Account, Tenant, TenantAccountJoin
from models.dataset import Dataset, DatasetAutoDisableLog
from services.feature_service import FeatureService
@@ -31,9 +33,9 @@ def mail_clean_document_notify_task():
# send document clean notify mail
try:
dataset_auto_disable_logs = (
db.session.query(DatasetAutoDisableLog).where(DatasetAutoDisableLog.notified == False).all()
)
dataset_auto_disable_logs = db.session.scalars(
select(DatasetAutoDisableLog).where(DatasetAutoDisableLog.notified == False)
).all()
# group by tenant_id
dataset_auto_disable_logs_map: dict[str, list[DatasetAutoDisableLog]] = defaultdict(list)
for dataset_auto_disable_log in dataset_auto_disable_logs:
@@ -44,7 +46,7 @@ def mail_clean_document_notify_task():
for tenant_id, tenant_dataset_auto_disable_logs in dataset_auto_disable_logs_map.items():
features = FeatureService.get_features(tenant_id)
plan = features.billing.subscription.plan
if plan != "sandbox":
if plan != CloudPlan.SANDBOX:
knowledge_details = []
# check tenant
tenant = db.session.query(Tenant).where(Tenant.id == tenant_id).first()
+5
View File
@@ -16,6 +16,11 @@ celery_redis = Redis(
port=redis_config.get("port") or 6379,
password=redis_config.get("password") or None,
db=int(redis_config.get("virtual_host")) if redis_config.get("virtual_host") else 1,
ssl=bool(dify_config.BROKER_USE_SSL),
ssl_ca_certs=dify_config.REDIS_SSL_CA_CERTS if dify_config.BROKER_USE_SSL else None,
ssl_cert_reqs=getattr(dify_config, "REDIS_SSL_CERT_REQS", None) if dify_config.BROKER_USE_SSL else None,
ssl_certfile=getattr(dify_config, "REDIS_SSL_CERTFILE", None) if dify_config.BROKER_USE_SSL else None,
ssl_keyfile=getattr(dify_config, "REDIS_SSL_KEYFILE", None) if dify_config.BROKER_USE_SSL else None,
)
logger = logging.getLogger(__name__)
@@ -0,0 +1,104 @@
import logging
import math
import time
from collections.abc import Iterable, Sequence
from sqlalchemy import ColumnElement, and_, func, or_, select
from sqlalchemy.engine.row import Row
from sqlalchemy.orm import Session
import app
from configs import dify_config
from core.trigger.utils.locks import build_trigger_refresh_lock_keys
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from models.trigger import TriggerSubscription
from tasks.trigger_subscription_refresh_tasks import trigger_subscription_refresh
logger = logging.getLogger(__name__)
def _now_ts() -> int:
return int(time.time())
def _build_due_filter(now_ts: int):
"""Build SQLAlchemy filter for due credential or subscription refresh."""
credential_due: ColumnElement[bool] = and_(
TriggerSubscription.credential_expires_at != -1,
TriggerSubscription.credential_expires_at
<= now_ts + int(dify_config.TRIGGER_PROVIDER_CREDENTIAL_THRESHOLD_SECONDS),
)
subscription_due: ColumnElement[bool] = and_(
TriggerSubscription.expires_at != -1,
TriggerSubscription.expires_at <= now_ts + int(dify_config.TRIGGER_PROVIDER_SUBSCRIPTION_THRESHOLD_SECONDS),
)
return or_(credential_due, subscription_due)
def _acquire_locks(keys: Iterable[str], ttl_seconds: int) -> list[bool]:
"""Attempt to acquire locks in a single pipelined round-trip.
Returns a list of booleans indicating which locks were acquired.
"""
pipe = redis_client.pipeline(transaction=False)
for key in keys:
pipe.set(key, b"1", ex=ttl_seconds, nx=True)
results = pipe.execute()
return [bool(r) for r in results]
@app.celery.task(queue="trigger_refresh_publisher")
def trigger_provider_refresh() -> None:
"""
Scan due trigger subscriptions and enqueue refresh tasks with in-flight locks.
"""
now: int = _now_ts()
batch_size: int = int(dify_config.TRIGGER_PROVIDER_REFRESH_BATCH_SIZE)
lock_ttl: int = max(300, int(dify_config.TRIGGER_PROVIDER_SUBSCRIPTION_THRESHOLD_SECONDS))
with Session(db.engine, expire_on_commit=False) as session:
filter: ColumnElement[bool] = _build_due_filter(now_ts=now)
total_due: int = int(session.scalar(statement=select(func.count()).where(filter)) or 0)
logger.info("Trigger refresh scan start: due=%d", total_due)
if total_due == 0:
return
pages: int = math.ceil(total_due / batch_size)
for page in range(pages):
offset: int = page * batch_size
subscription_rows: Sequence[Row[tuple[str, str]]] = session.execute(
select(TriggerSubscription.tenant_id, TriggerSubscription.id)
.where(filter)
.order_by(TriggerSubscription.updated_at.asc())
.offset(offset)
.limit(batch_size)
).all()
if not subscription_rows:
logger.debug("Trigger refresh page %d/%d empty", page + 1, pages)
continue
subscriptions: list[tuple[str, str]] = [
(str(tenant_id), str(subscription_id)) for tenant_id, subscription_id in subscription_rows
]
lock_keys: list[str] = build_trigger_refresh_lock_keys(subscriptions)
acquired: list[bool] = _acquire_locks(keys=lock_keys, ttl_seconds=lock_ttl)
enqueued: int = 0
for (tenant_id, subscription_id), is_locked in zip(subscriptions, acquired):
if not is_locked:
continue
trigger_subscription_refresh.delay(tenant_id=tenant_id, subscription_id=subscription_id)
enqueued += 1
logger.info(
"Trigger refresh page %d/%d: scanned=%d locks_acquired=%d enqueued=%d",
page + 1,
pages,
len(subscriptions),
sum(1 for x in acquired if x),
enqueued,
)
logger.info("Trigger refresh scan done: due=%d", total_due)
@@ -1,6 +1,8 @@
import time
from collections.abc import Sequence
import click
from sqlalchemy import select
import app
from configs import dify_config
@@ -15,11 +17,9 @@ def update_tidb_serverless_status_task():
start_at = time.perf_counter()
try:
# check the number of idle tidb serverless
tidb_serverless_list = (
db.session.query(TidbAuthBinding)
.where(TidbAuthBinding.active == False, TidbAuthBinding.status == "CREATING")
.all()
)
tidb_serverless_list = db.session.scalars(
select(TidbAuthBinding).where(TidbAuthBinding.active == False, TidbAuthBinding.status == "CREATING")
).all()
if len(tidb_serverless_list) == 0:
return
# update tidb serverless status
@@ -32,7 +32,7 @@ def update_tidb_serverless_status_task():
click.echo(click.style(f"Update tidb serverless status task success latency: {end_at - start_at}", fg="green"))
def update_clusters(tidb_serverless_list: list[TidbAuthBinding]):
def update_clusters(tidb_serverless_list: Sequence[TidbAuthBinding]):
try:
# batch 20
for i in range(0, len(tidb_serverless_list), 20):
+116
View File
@@ -0,0 +1,116 @@
import logging
from celery import group, shared_task
from sqlalchemy import and_, select
from sqlalchemy.orm import Session, sessionmaker
from configs import dify_config
from extensions.ext_database import db
from libs.datetime_utils import naive_utc_now
from libs.schedule_utils import calculate_next_run_at
from models.trigger import AppTrigger, AppTriggerStatus, AppTriggerType, WorkflowSchedulePlan
from tasks.workflow_schedule_tasks import run_schedule_trigger
logger = logging.getLogger(__name__)
@shared_task(queue="schedule_poller")
def poll_workflow_schedules() -> None:
"""
Poll and process due workflow schedules.
Streaming flow:
1. Fetch due schedules in batches
2. Process each batch until all due schedules are handled
3. Optional: Limit total dispatches per tick as a circuit breaker
"""
session_factory = sessionmaker(bind=db.engine, expire_on_commit=False)
with session_factory() as session:
total_dispatched = 0
# Process in batches until we've handled all due schedules or hit the limit
while True:
due_schedules = _fetch_due_schedules(session)
if not due_schedules:
break
dispatched_count = _process_schedules(session, due_schedules)
total_dispatched += dispatched_count
logger.debug("Batch processed: %d dispatched", dispatched_count)
# Circuit breaker: check if we've hit the per-tick limit (if enabled)
if (
dify_config.WORKFLOW_SCHEDULE_MAX_DISPATCH_PER_TICK > 0
and total_dispatched >= dify_config.WORKFLOW_SCHEDULE_MAX_DISPATCH_PER_TICK
):
logger.warning(
"Circuit breaker activated: reached dispatch limit (%d), will continue next tick",
dify_config.WORKFLOW_SCHEDULE_MAX_DISPATCH_PER_TICK,
)
break
if total_dispatched > 0:
logger.info("Total processed: %d dispatched", total_dispatched)
def _fetch_due_schedules(session: Session) -> list[WorkflowSchedulePlan]:
"""
Fetch a batch of due schedules, sorted by most overdue first.
Returns up to WORKFLOW_SCHEDULE_POLLER_BATCH_SIZE schedules per call.
Used in a loop to progressively process all due schedules.
"""
now = naive_utc_now()
due_schedules = session.scalars(
(
select(WorkflowSchedulePlan)
.join(
AppTrigger,
and_(
AppTrigger.app_id == WorkflowSchedulePlan.app_id,
AppTrigger.node_id == WorkflowSchedulePlan.node_id,
AppTrigger.trigger_type == AppTriggerType.TRIGGER_SCHEDULE,
),
)
.where(
WorkflowSchedulePlan.next_run_at <= now,
WorkflowSchedulePlan.next_run_at.isnot(None),
AppTrigger.status == AppTriggerStatus.ENABLED,
)
)
.order_by(WorkflowSchedulePlan.next_run_at.asc())
.with_for_update(skip_locked=True)
.limit(dify_config.WORKFLOW_SCHEDULE_POLLER_BATCH_SIZE)
)
return list(due_schedules)
def _process_schedules(session: Session, schedules: list[WorkflowSchedulePlan]) -> int:
"""Process schedules: check quota, update next run time and dispatch to Celery in parallel."""
if not schedules:
return 0
tasks_to_dispatch: list[str] = []
for schedule in schedules:
next_run_at = calculate_next_run_at(
schedule.cron_expression,
schedule.timezone,
)
schedule.next_run_at = next_run_at
tasks_to_dispatch.append(schedule.id)
if tasks_to_dispatch:
job = group(run_schedule_trigger.s(schedule_id) for schedule_id in tasks_to_dispatch)
job.apply_async()
logger.debug("Dispatched %d tasks in parallel", len(tasks_to_dispatch))
session.commit()
return len(tasks_to_dispatch)