fix: 修复文档状态数据数据错误问题 (#1697)

This commit is contained in:
shaohuzhang1 2024-11-26 16:24:03 +08:00 committed by GitHub
parent 52575360ed
commit 93a5c6eb2d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 130 additions and 110 deletions

View File

@ -36,8 +36,9 @@ def update_execute(sql: str, params):
""" """
with connection.cursor() as cursor: with connection.cursor() as cursor:
cursor.execute(sql, params) cursor.execute(sql, params)
affected_rows = cursor.rowcount
cursor.close() cursor.close()
return None return affected_rows
def select_list(sql: str, params: List): def select_list(sql: str, params: List):

View File

@ -10,11 +10,12 @@ import datetime
import logging import logging
import os import os
import threading import threading
import time
import traceback import traceback
from typing import List from typing import List
import django.db.models import django.db.models
from django.db import models from django.db import models, transaction
from django.db.models import QuerySet from django.db.models import QuerySet
from django.db.models.functions import Substr, Reverse from django.db.models.functions import Substr, Reverse
from langchain_core.embeddings import Embeddings from langchain_core.embeddings import Embeddings
@ -168,6 +169,7 @@ class ListenerManagement:
@staticmethod @staticmethod
def get_aggregation_document_status(document_id): def get_aggregation_document_status(document_id):
def aggregation_document_status(): def aggregation_document_status():
pass
sql = get_file_content( sql = get_file_content(
os.path.join(PROJECT_DIR, "apps", "dataset", 'sql', 'update_document_status_meta.sql')) os.path.join(PROJECT_DIR, "apps", "dataset", 'sql', 'update_document_status_meta.sql'))
native_update({'document_custom_sql': QuerySet(Document).filter(id=document_id)}, sql, with_table_name=True) native_update({'document_custom_sql': QuerySet(Document).filter(id=document_id)}, sql, with_table_name=True)

View File

@ -18,10 +18,11 @@ def page(query_set, page_size, handler, is_the_task_interrupted=lambda: False):
@param is_the_task_interrupted: 任务是否被中断 @param is_the_task_interrupted: 任务是否被中断
@return: @return:
""" """
query = query_set.order_by("id")
count = query_set.count() count = query_set.count()
for i in range(0, ceil(count / page_size)): for i in range(0, ceil(count / page_size)):
if is_the_task_interrupted(): if is_the_task_interrupted():
return return
offset = i * page_size offset = i * page_size
paragraph_list = query_set[offset: offset + page_size] paragraph_list = query.all()[offset: offset + page_size]
handler(paragraph_list) handler(paragraph_list)

View File

@ -613,7 +613,8 @@ class DocumentSerializers(ApiMixin, serializers.Serializer):
document_id = self.data.get("document_id") document_id = self.data.get("document_id")
ListenerManagement.update_status(QuerySet(Document).filter(id=document_id), TaskType.EMBEDDING, ListenerManagement.update_status(QuerySet(Document).filter(id=document_id), TaskType.EMBEDDING,
State.PENDING) State.PENDING)
ListenerManagement.update_status(QuerySet(Paragraph).filter(document_id=document_id), TaskType.EMBEDDING, ListenerManagement.update_status(QuerySet(Paragraph).filter(document_id=document_id),
TaskType.EMBEDDING,
State.PENDING) State.PENDING)
ListenerManagement.get_aggregation_document_status(document_id)() ListenerManagement.get_aggregation_document_status(document_id)()
embedding_model_id = get_embedding_model_id_by_dataset_id(dataset_id=self.data.get('dataset_id')) embedding_model_id = get_embedding_model_id_by_dataset_id(dataset_id=self.data.get('dataset_id'))
@ -708,8 +709,8 @@ class DocumentSerializers(ApiMixin, serializers.Serializer):
@staticmethod @staticmethod
def post_embedding(result, document_id, dataset_id): def post_embedding(result, document_id, dataset_id):
model_id = get_embedding_model_id_by_dataset_id(dataset_id) DocumentSerializers.Operate(
embedding_by_document.delay(document_id, model_id) data={'dataset_id': dataset_id, 'document_id': document_id}).refresh()
return result return result
@staticmethod @staticmethod
@ -907,8 +908,8 @@ class DocumentSerializers(ApiMixin, serializers.Serializer):
@staticmethod @staticmethod
def post_embedding(document_list, dataset_id): def post_embedding(document_list, dataset_id):
for document_dict in document_list: for document_dict in document_list:
model_id = get_embedding_model_id_by_dataset_id(dataset_id) DocumentSerializers.Operate(
embedding_by_document.delay(document_dict.get('id'), model_id) data={'dataset_id': dataset_id, 'document_id': document_dict.get('id')}).refresh()
return document_list return document_list
@post(post_function=post_embedding) @post(post_function=post_embedding)

View File

@ -1,51 +1,13 @@
<template> <template>
<el-popover placement="top" :width="450" trigger="hover"> <el-popover v-model:visible="visible" placement="top" :width="450" trigger="hover">
<template #default> <template #default
<el-row :gutter="3" v-for="status in statusTable" :key="status.type"> ><StatusTable
<el-col :span="4">{{ taskTypeMap[status.type] }} </el-col> v-if="visible"
<el-col :span="4"> :status="status"
<el-text v-if="status.state === State.SUCCESS || status.state === State.REVOKED"> :statusMeta="statusMeta"
<el-icon class="success"><SuccessFilled /></el-icon> :taskTypeMap="taskTypeMap"
{{ stateMap[status.state](status.type) }} :stateMap="stateMap"
</el-text> ></StatusTable>
<el-text v-else-if="status.state === State.FAILURE">
<el-icon class="danger"><CircleCloseFilled /></el-icon>
{{ stateMap[status.state](status.type) }}
</el-text>
<el-text v-else-if="status.state === State.STARTED">
<el-icon class="is-loading primary"><Loading /></el-icon>
{{ stateMap[status.state](status.type) }}
</el-text>
<el-text v-else-if="status.state === State.PENDING">
<el-icon class="is-loading primary"><Loading /></el-icon>
{{ stateMap[status.state](status.type) }}
</el-text>
<el-text v-else-if="aggStatus?.value === State.REVOKE">
<el-icon class="is-loading primary"><Loading /></el-icon>
{{ stateMap[aggStatus.value](aggStatus.key) }}
</el-text>
</el-col>
<el-col :span="5">
完成
{{
Object.keys(status.aggs ? status.aggs : {})
.filter((k) => k == State.SUCCESS)
.map((k) => status.aggs[k])
.reduce((x: any, y: any) => x + y, 0)
}}/{{
Object.values(status.aggs ? status.aggs : {}).reduce((x: any, y: any) => x + y, 0)
}}
</el-col>
<el-col :span="9">
{{
status.time
? status.time[
status.state == State.REVOKED ? State.REVOKED : State.PENDING
]?.substring(0, 19)
: undefined
}}
</el-col>
</el-row>
</template> </template>
<template #reference> <template #reference>
<el-text v-if="aggStatus?.value === State.SUCCESS || aggStatus?.value === State.REVOKED"> <el-text v-if="aggStatus?.value === State.SUCCESS || aggStatus?.value === State.REVOKED">
@ -72,11 +34,11 @@
</el-popover> </el-popover>
</template> </template>
<script setup lang="ts"> <script setup lang="ts">
import { computed } from 'vue' import { computed, ref } from 'vue'
import { Status, TaskType, State, type TaskTypeInterface } from '@/utils/status' import { TaskType, State } from '@/utils/status'
import { mergeWith } from 'lodash' import StatusTable from '@/views/document/component/StatusTable.vue'
const props = defineProps<{ status: string; statusMeta: any }>() const props = defineProps<{ status: string; statusMeta: any }>()
const visible = ref<boolean>(false)
const checkList: Array<string> = [ const checkList: Array<string> = [
State.REVOKE, State.REVOKE,
State.STARTED, State.STARTED,
@ -112,56 +74,5 @@ const stateMap: any = {
[State.FAILURE]: (type: number) => '失败', [State.FAILURE]: (type: number) => '失败',
[State.SUCCESS]: (type: number) => '成功' [State.SUCCESS]: (type: number) => '成功'
} }
const parseAgg = (agg: { count: number; status: string }) => {
const status = new Status(agg.status)
return Object.keys(TaskType)
.map((key) => {
const value = TaskType[key as keyof TaskTypeInterface]
return { [value]: { [status.task_status[value]]: agg.count } }
})
.reduce((x, y) => ({ ...x, ...y }), {})
}
const customizer: (x: any, y: any) => any = (objValue: any, srcValue: any) => {
if (objValue == undefined && srcValue) {
return srcValue
}
if (srcValue == undefined && objValue) {
return objValue
}
//
if (typeof objValue === 'object' && typeof srcValue === 'object') {
// object
return mergeWith(objValue, srcValue, customizer)
} else {
//
return objValue + srcValue
}
}
const aggs = computed(() => {
return (props.statusMeta.aggs ? props.statusMeta.aggs : [])
.map((agg: any) => {
return parseAgg(agg)
})
.reduce((x: any, y: any) => {
return mergeWith(x, y, customizer)
}, {})
})
const statusTable = computed(() => {
return Object.keys(TaskType)
.map((key) => {
const value = TaskType[key as keyof TaskTypeInterface]
const parseStatus = new Status(props.status)
return {
type: value,
state: parseStatus.task_status[value],
aggs: aggs.value[value],
time: props.statusMeta.state_time[value]
}
})
.filter((item) => item.state !== State.IGNORED)
})
</script> </script>
<style lang="scss" scoped></style> <style lang="scss" scoped></style>

View File

@ -0,0 +1,104 @@
<template>
<el-row :gutter="3" v-for="status in statusTable" :key="status.type">
<el-col :span="4">{{ taskTypeMap[status.type] }} </el-col>
<el-col :span="4">
<el-text v-if="status.state === State.SUCCESS || status.state === State.REVOKED">
<el-icon class="success"><SuccessFilled /></el-icon>
{{ stateMap[status.state](status.type) }}
</el-text>
<el-text v-else-if="status.state === State.FAILURE">
<el-icon class="danger"><CircleCloseFilled /></el-icon>
{{ stateMap[status.state](status.type) }}
</el-text>
<el-text v-else-if="status.state === State.STARTED">
<el-icon class="is-loading primary"><Loading /></el-icon>
{{ stateMap[status.state](status.type) }}
</el-text>
<el-text v-else-if="status.state === State.PENDING">
<el-icon class="is-loading primary"><Loading /></el-icon>
{{ stateMap[status.state](status.type) }}
</el-text>
<el-text v-else-if="status.state === State.REVOKE">
<el-icon class="is-loading primary"><Loading /></el-icon>
{{ stateMap[status.state](status.type) }}
</el-text>
</el-col>
<el-col :span="7">
完成
{{
Object.keys(status.aggs ? status.aggs : {})
.filter((k) => k == State.SUCCESS)
.map((k) => status.aggs[k])
.reduce((x: any, y: any) => x + y, 0)
}}/{{ Object.values(status.aggs ? status.aggs : {}).reduce((x: any, y: any) => x + y, 0) }}
</el-col>
<el-col :span="9">
{{
status.time
? status.time[status.state == State.REVOKED ? State.REVOKED : State.PENDING]?.substring(
0,
19
)
: undefined
}}
</el-col>
</el-row>
</template>
<script setup lang="ts">
import { computed } from 'vue'
import { Status, TaskType, State, type TaskTypeInterface } from '@/utils/status'
import { mergeWith } from 'lodash'
const props = defineProps<{ status: string; statusMeta: any; stateMap: any; taskTypeMap: any }>()
const parseAgg = (agg: { count: number; status: string }) => {
const status = new Status(agg.status)
return Object.keys(TaskType)
.map((key) => {
const value = TaskType[key as keyof TaskTypeInterface]
return { [value]: { [status.task_status[value]]: agg.count } }
})
.reduce((x, y) => ({ ...x, ...y }), {})
}
const customizer: (x: any, y: any) => any = (objValue: any, srcValue: any) => {
if (objValue == undefined && srcValue) {
return srcValue
}
if (srcValue == undefined && objValue) {
return objValue
}
//
if (typeof objValue === 'object' && typeof srcValue === 'object') {
// object
return mergeWith(objValue, srcValue, customizer)
} else {
//
return objValue + srcValue
}
}
const aggs = computed(() => {
return (props.statusMeta.aggs ? props.statusMeta.aggs : [])
.map((agg: any) => {
return parseAgg(agg)
})
.reduce((x: any, y: any) => {
return mergeWith(x, y, customizer)
}, {})
})
const statusTable = computed(() => {
return Object.keys(TaskType)
.map((key) => {
const value = TaskType[key as keyof TaskTypeInterface]
const parseStatus = new Status(props.status)
return {
type: value,
state: parseStatus.task_status[value],
aggs: aggs.value[value],
time: props.statusMeta.state_time[value]
}
})
.filter((item) => item.state !== State.IGNORED)
})
</script>
<style lang="scss"></style>