fix: 修复文档状态部分问题 (#1699)
This commit is contained in:
parent
a37a6184b4
commit
da7e9b1460
@ -181,7 +181,8 @@ class ListenerManagement:
|
|||||||
def aggregation_document_status():
|
def aggregation_document_status():
|
||||||
sql = get_file_content(
|
sql = get_file_content(
|
||||||
os.path.join(PROJECT_DIR, "apps", "dataset", 'sql', 'update_document_status_meta.sql'))
|
os.path.join(PROJECT_DIR, "apps", "dataset", 'sql', 'update_document_status_meta.sql'))
|
||||||
native_update({'document_custom_sql': QuerySet(Document).filter(dataset_id=dataset_id)}, sql)
|
native_update({'document_custom_sql': QuerySet(Document).filter(dataset_id=dataset_id)}, sql,
|
||||||
|
with_table_name=True)
|
||||||
|
|
||||||
return aggregation_document_status
|
return aggregation_document_status
|
||||||
|
|
||||||
@ -190,7 +191,7 @@ class ListenerManagement:
|
|||||||
def aggregation_document_status():
|
def aggregation_document_status():
|
||||||
sql = get_file_content(
|
sql = get_file_content(
|
||||||
os.path.join(PROJECT_DIR, "apps", "dataset", 'sql', 'update_document_status_meta.sql'))
|
os.path.join(PROJECT_DIR, "apps", "dataset", 'sql', 'update_document_status_meta.sql'))
|
||||||
native_update({'document_custom_sql': queryset}, sql)
|
native_update({'document_custom_sql': queryset}, sql, with_table_name=True)
|
||||||
|
|
||||||
return aggregation_document_status
|
return aggregation_document_status
|
||||||
|
|
||||||
@ -249,19 +250,23 @@ class ListenerManagement:
|
|||||||
"""
|
"""
|
||||||
if not try_lock('embedding' + str(document_id)):
|
if not try_lock('embedding' + str(document_id)):
|
||||||
return
|
return
|
||||||
max_kb.info(f"开始--->向量化文档:{document_id}")
|
|
||||||
# 批量修改状态为PADDING
|
|
||||||
ListenerManagement.update_status(QuerySet(Document).filter(id=document_id), TaskType.EMBEDDING, State.STARTED)
|
|
||||||
try:
|
try:
|
||||||
# 删除文档向量数据
|
|
||||||
VectorStore.get_embedding_vector().delete_by_document_id(document_id)
|
|
||||||
|
|
||||||
def is_the_task_interrupted():
|
def is_the_task_interrupted():
|
||||||
document = QuerySet(Document).filter(id=document_id).first()
|
document = QuerySet(Document).filter(id=document_id).first()
|
||||||
if document is None or Status(document.status)[TaskType.EMBEDDING] == State.REVOKE:
|
if document is None or Status(document.status)[TaskType.EMBEDDING] == State.REVOKE:
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
if is_the_task_interrupted():
|
||||||
|
return
|
||||||
|
max_kb.info(f"开始--->向量化文档:{document_id}")
|
||||||
|
# 批量修改状态为PADDING
|
||||||
|
ListenerManagement.update_status(QuerySet(Document).filter(id=document_id), TaskType.EMBEDDING,
|
||||||
|
State.STARTED)
|
||||||
|
|
||||||
|
# 删除文档向量数据
|
||||||
|
VectorStore.get_embedding_vector().delete_by_document_id(document_id)
|
||||||
|
|
||||||
# 根据段落进行向量化处理
|
# 根据段落进行向量化处理
|
||||||
page(QuerySet(Paragraph).filter(document_id=document_id).values('id'), 5,
|
page(QuerySet(Paragraph).filter(document_id=document_id).values('id'), 5,
|
||||||
ListenerManagement.get_embedding_paragraph_apply(embedding_model, is_the_task_interrupted,
|
ListenerManagement.get_embedding_paragraph_apply(embedding_model, is_the_task_interrupted,
|
||||||
|
|||||||
@ -7,6 +7,11 @@ import dataset
|
|||||||
from common.event import ListenerManagement
|
from common.event import ListenerManagement
|
||||||
from dataset.models import State, TaskType
|
from dataset.models import State, TaskType
|
||||||
|
|
||||||
|
sql = """
|
||||||
|
UPDATE "document"
|
||||||
|
SET status ="replace"(status, '1', '3')
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
def updateDocumentStatus(apps, schema_editor):
|
def updateDocumentStatus(apps, schema_editor):
|
||||||
ParagraphModel = apps.get_model('dataset', 'Paragraph')
|
ParagraphModel = apps.get_model('dataset', 'Paragraph')
|
||||||
@ -43,5 +48,6 @@ class Migration(migrations.Migration):
|
|||||||
name='status',
|
name='status',
|
||||||
field=models.CharField(default=dataset.models.data_set.Status.__str__, max_length=20, verbose_name='状态'),
|
field=models.CharField(default=dataset.models.data_set.Status.__str__, max_length=20, verbose_name='状态'),
|
||||||
),
|
),
|
||||||
|
migrations.RunSQL(sql),
|
||||||
migrations.RunPython(updateDocumentStatus)
|
migrations.RunPython(updateDocumentStatus)
|
||||||
]
|
]
|
||||||
|
|||||||
@ -297,6 +297,9 @@ class DocumentSerializers(ApiMixin, serializers.Serializer):
|
|||||||
ListenerManagement.update_status(QuerySet(Document).filter(id__in=document_id_list),
|
ListenerManagement.update_status(QuerySet(Document).filter(id__in=document_id_list),
|
||||||
TaskType.EMBEDDING,
|
TaskType.EMBEDDING,
|
||||||
State.PENDING)
|
State.PENDING)
|
||||||
|
ListenerManagement.update_status(QuerySet(Paragraph).filter(document_id__in=document_id_list),
|
||||||
|
TaskType.EMBEDDING,
|
||||||
|
State.PENDING)
|
||||||
embedding_by_document_list.delay(document_id_list, model_id)
|
embedding_by_document_list.delay(document_id_list, model_id)
|
||||||
else:
|
else:
|
||||||
update_embedding_dataset_id(pid_list, target_dataset_id)
|
update_embedding_dataset_id(pid_list, target_dataset_id)
|
||||||
|
|||||||
@ -51,21 +51,28 @@ def get_generate_problem(llm_model, prompt, post_apply=lambda: None, is_the_task
|
|||||||
return generate_problem
|
return generate_problem
|
||||||
|
|
||||||
|
|
||||||
@celery_app.task(base=QueueOnce, once={'keys': ['document_id']},
|
def get_is_the_task_interrupted(document_id):
|
||||||
name='celery:generate_related_by_document')
|
|
||||||
def generate_related_by_document_id(document_id, model_id, prompt):
|
|
||||||
try:
|
|
||||||
ListenerManagement.update_status(QuerySet(Document).filter(id=document_id),
|
|
||||||
TaskType.GENERATE_PROBLEM,
|
|
||||||
State.STARTED)
|
|
||||||
llm_model = get_llm_model(model_id)
|
|
||||||
|
|
||||||
def is_the_task_interrupted():
|
def is_the_task_interrupted():
|
||||||
document = QuerySet(Document).filter(id=document_id).first()
|
document = QuerySet(Document).filter(id=document_id).first()
|
||||||
if document is None or Status(document.status)[TaskType.GENERATE_PROBLEM] == State.REVOKE:
|
if document is None or Status(document.status)[TaskType.GENERATE_PROBLEM] == State.REVOKE:
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
return is_the_task_interrupted
|
||||||
|
|
||||||
|
|
||||||
|
@celery_app.task(base=QueueOnce, once={'keys': ['document_id']},
|
||||||
|
name='celery:generate_related_by_document')
|
||||||
|
def generate_related_by_document_id(document_id, model_id, prompt):
|
||||||
|
try:
|
||||||
|
is_the_task_interrupted = get_is_the_task_interrupted(document_id)
|
||||||
|
if is_the_task_interrupted():
|
||||||
|
return
|
||||||
|
ListenerManagement.update_status(QuerySet(Document).filter(id=document_id),
|
||||||
|
TaskType.GENERATE_PROBLEM,
|
||||||
|
State.STARTED)
|
||||||
|
llm_model = get_llm_model(model_id)
|
||||||
|
|
||||||
# 生成问题函数
|
# 生成问题函数
|
||||||
generate_problem = get_generate_problem(llm_model, prompt,
|
generate_problem = get_generate_problem(llm_model, prompt,
|
||||||
ListenerManagement.get_aggregation_document_status(
|
ListenerManagement.get_aggregation_document_status(
|
||||||
@ -82,6 +89,12 @@ def generate_related_by_document_id(document_id, model_id, prompt):
|
|||||||
name='celery:generate_related_by_paragraph_list')
|
name='celery:generate_related_by_paragraph_list')
|
||||||
def generate_related_by_paragraph_id_list(document_id, paragraph_id_list, model_id, prompt):
|
def generate_related_by_paragraph_id_list(document_id, paragraph_id_list, model_id, prompt):
|
||||||
try:
|
try:
|
||||||
|
is_the_task_interrupted = get_is_the_task_interrupted(document_id)
|
||||||
|
if is_the_task_interrupted():
|
||||||
|
ListenerManagement.update_status(QuerySet(Document).filter(id=document_id),
|
||||||
|
TaskType.GENERATE_PROBLEM,
|
||||||
|
State.REVOKED)
|
||||||
|
return
|
||||||
ListenerManagement.update_status(QuerySet(Document).filter(id=document_id),
|
ListenerManagement.update_status(QuerySet(Document).filter(id=document_id),
|
||||||
TaskType.GENERATE_PROBLEM,
|
TaskType.GENERATE_PROBLEM,
|
||||||
State.STARTED)
|
State.STARTED)
|
||||||
|
|||||||
@ -102,6 +102,7 @@ def embedding_by_dataset(dataset_id, model_id):
|
|||||||
max_kb.info(f"数据集文档:{[d.name for d in document_list]}")
|
max_kb.info(f"数据集文档:{[d.name for d in document_list]}")
|
||||||
for document in document_list:
|
for document in document_list:
|
||||||
try:
|
try:
|
||||||
|
print(document.id, model_id)
|
||||||
embedding_by_document.delay(document.id, model_id)
|
embedding_by_document.delay(document.id, model_id)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
pass
|
pass
|
||||||
|
|||||||
@ -32,9 +32,11 @@ CELERY_WORKER_REDIRECT_STDOUTS = True
|
|||||||
CELERY_WORKER_REDIRECT_STDOUTS_LEVEL = "INFO"
|
CELERY_WORKER_REDIRECT_STDOUTS_LEVEL = "INFO"
|
||||||
CELERY_TASK_SOFT_TIME_LIMIT = 3600
|
CELERY_TASK_SOFT_TIME_LIMIT = 3600
|
||||||
CELERY_WORKER_CANCEL_LONG_RUNNING_TASKS_ON_CONNECTION_LOSS = True
|
CELERY_WORKER_CANCEL_LONG_RUNNING_TASKS_ON_CONNECTION_LOSS = True
|
||||||
|
CELERY_ACKS_LATE = True
|
||||||
|
celery_once_path = os.path.join(celery_data_dir, "celery_once")
|
||||||
CELERY_ONCE = {
|
CELERY_ONCE = {
|
||||||
'backend': 'celery_once.backends.File',
|
'backend': 'celery_once.backends.File',
|
||||||
'settings': {'location': os.path.join(celery_data_dir, "celery_once")}
|
'settings': {'location': celery_once_path}
|
||||||
}
|
}
|
||||||
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True
|
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True
|
||||||
CELERY_LOG_DIR = os.path.join(PROJECT_DIR, 'logs', 'celery')
|
CELERY_LOG_DIR = os.path.join(PROJECT_DIR, 'logs', 'celery')
|
||||||
|
|||||||
@ -24,13 +24,19 @@
|
|||||||
</el-text>
|
</el-text>
|
||||||
</el-col>
|
</el-col>
|
||||||
<el-col :span="7">
|
<el-col :span="7">
|
||||||
|
<span
|
||||||
|
:style="{ color: [State.FAILURE, State.REVOKED].includes(status.state) ? '#F54A45' : '' }"
|
||||||
|
>
|
||||||
完成
|
完成
|
||||||
{{
|
{{
|
||||||
Object.keys(status.aggs ? status.aggs : {})
|
Object.keys(status.aggs ? status.aggs : {})
|
||||||
.filter((k) => k == State.SUCCESS)
|
.filter((k) => k == State.SUCCESS)
|
||||||
.map((k) => status.aggs[k])
|
.map((k) => status.aggs[k])
|
||||||
.reduce((x: any, y: any) => x + y, 0)
|
.reduce((x: any, y: any) => x + y, 0)
|
||||||
}}/{{ Object.values(status.aggs ? status.aggs : {}).reduce((x: any, y: any) => x + y, 0) }}
|
}}/{{
|
||||||
|
Object.values(status.aggs ? status.aggs : {}).reduce((x: any, y: any) => x + y, 0)
|
||||||
|
}}</span
|
||||||
|
>
|
||||||
</el-col>
|
</el-col>
|
||||||
<el-col :span="9">
|
<el-col :span="9">
|
||||||
{{
|
{{
|
||||||
|
|||||||
@ -235,7 +235,25 @@
|
|||||||
<template #default="{ row }">
|
<template #default="{ row }">
|
||||||
<div v-if="datasetDetail.type === '0'">
|
<div v-if="datasetDetail.type === '0'">
|
||||||
<span class="mr-4">
|
<span class="mr-4">
|
||||||
<el-tooltip effect="dark" content="向量化" placement="top">
|
<el-tooltip
|
||||||
|
effect="dark"
|
||||||
|
v-if="
|
||||||
|
([State.STARTED, State.PENDING] as Array<string>).includes(
|
||||||
|
getTaskState(row.status, TaskType.EMBEDDING)
|
||||||
|
)
|
||||||
|
"
|
||||||
|
content="取消向量化"
|
||||||
|
placement="top"
|
||||||
|
>
|
||||||
|
<el-button
|
||||||
|
type="primary"
|
||||||
|
text
|
||||||
|
@click.stop="cancelTask(row, TaskType.EMBEDDING)"
|
||||||
|
>
|
||||||
|
<AppIcon iconName="app-close" style="font-size: 16px"></AppIcon>
|
||||||
|
</el-button>
|
||||||
|
</el-tooltip>
|
||||||
|
<el-tooltip v-else effect="dark" content="向量化" placement="top">
|
||||||
<el-button type="primary" text @click.stop="refreshDocument(row)">
|
<el-button type="primary" text @click.stop="refreshDocument(row)">
|
||||||
<AppIcon iconName="app-document-refresh" style="font-size: 16px"></AppIcon>
|
<AppIcon iconName="app-document-refresh" style="font-size: 16px"></AppIcon>
|
||||||
</el-button>
|
</el-button>
|
||||||
@ -255,9 +273,20 @@
|
|||||||
</el-button>
|
</el-button>
|
||||||
<template #dropdown>
|
<template #dropdown>
|
||||||
<el-dropdown-menu>
|
<el-dropdown-menu>
|
||||||
<el-dropdown-item @click="openGenerateDialog(row)">
|
<el-dropdown-item
|
||||||
|
v-if="
|
||||||
|
([State.STARTED, State.PENDING] as Array<string>).includes(
|
||||||
|
getTaskState(row.status, TaskType.GENERATE_PROBLEM)
|
||||||
|
)
|
||||||
|
"
|
||||||
|
@click="cancelTask(row, TaskType.GENERATE_PROBLEM)"
|
||||||
|
>
|
||||||
<el-icon><Connection /></el-icon>
|
<el-icon><Connection /></el-icon>
|
||||||
生成关联问题
|
取消生成问题
|
||||||
|
</el-dropdown-item>
|
||||||
|
<el-dropdown-item v-else @click="openGenerateDialog(row)">
|
||||||
|
<el-icon><Connection /></el-icon>
|
||||||
|
生成问题
|
||||||
</el-dropdown-item>
|
</el-dropdown-item>
|
||||||
<el-dropdown-item @click="openDatasetDialog(row)">
|
<el-dropdown-item @click="openDatasetDialog(row)">
|
||||||
<AppIcon iconName="app-migrate"></AppIcon>
|
<AppIcon iconName="app-migrate"></AppIcon>
|
||||||
@ -286,7 +315,11 @@
|
|||||||
<span class="mr-4">
|
<span class="mr-4">
|
||||||
<el-tooltip
|
<el-tooltip
|
||||||
effect="dark"
|
effect="dark"
|
||||||
v-if="getTaskState(row.status, TaskType.EMBEDDING) == State.STARTED"
|
v-if="
|
||||||
|
([State.STARTED, State.PENDING] as Array<string>).includes(
|
||||||
|
getTaskState(row.status, TaskType.EMBEDDING)
|
||||||
|
)
|
||||||
|
"
|
||||||
content="取消向量化"
|
content="取消向量化"
|
||||||
placement="top"
|
placement="top"
|
||||||
>
|
>
|
||||||
@ -318,7 +351,9 @@
|
|||||||
>
|
>
|
||||||
<el-dropdown-item
|
<el-dropdown-item
|
||||||
v-if="
|
v-if="
|
||||||
getTaskState(row.status, TaskType.GENERATE_PROBLEM) == State.STARTED
|
([State.STARTED, State.PENDING] as Array<string>).includes(
|
||||||
|
getTaskState(row.status, TaskType.GENERATE_PROBLEM)
|
||||||
|
)
|
||||||
"
|
"
|
||||||
@click="cancelTask(row, TaskType.GENERATE_PROBLEM)"
|
@click="cancelTask(row, TaskType.GENERATE_PROBLEM)"
|
||||||
>
|
>
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user