refactor: replace try_lock and un_lock with RedisLock for improved locking mechanism

This commit is contained in:
CaptainB 2025-07-17 13:24:59 +08:00 committed by 刘瑞斌
parent 6f6be4c90a
commit 9e80a652c4
9 changed files with 89 additions and 45 deletions

View File

@ -19,7 +19,7 @@ from application.serializers.application_chat_record import ChatRecordSerializer
ApplicationChatRecordQuerySerializers ApplicationChatRecordQuerySerializers
from common.db.search import page_search from common.db.search import page_search
from common.exception.app_exception import AppApiException from common.exception.app_exception import AppApiException
from common.utils.lock import try_lock, un_lock from common.utils.lock import RedisLock
class VoteRequest(serializers.Serializer): class VoteRequest(serializers.Serializer):
@ -48,7 +48,8 @@ class VoteSerializer(serializers.Serializer):
if with_valid: if with_valid:
self.is_valid(raise_exception=True) self.is_valid(raise_exception=True)
VoteRequest(data=instance).is_valid(raise_exception=True) VoteRequest(data=instance).is_valid(raise_exception=True)
if not try_lock(self.data.get('chat_record_id')): rlock = RedisLock()
if not rlock.try_lock(self.data.get('chat_record_id')):
raise AppApiException(500, raise AppApiException(500,
gettext( gettext(
"Voting on the current session minutes, please do not send repeated requests")) "Voting on the current session minutes, please do not send repeated requests"))
@ -75,7 +76,7 @@ class VoteSerializer(serializers.Serializer):
else: else:
raise AppApiException(500, gettext("Already voted, please cancel first and then vote again")) raise AppApiException(500, gettext("Already voted, please cancel first and then vote again"))
finally: finally:
un_lock(self.data.get('chat_record_id')) rlock.un_lock(self.data.get('chat_record_id'))
ChatCountSerializer(data={'chat_id': self.data.get('chat_id')}).update_chat() ChatCountSerializer(data={'chat_id': self.data.get('chat_id')}).update_chat()
return True return True

View File

@ -12,6 +12,7 @@ from django.utils.translation import gettext as _
from .listener_manage import * from .listener_manage import *
from ..constants.cache_version import Cache_Version from ..constants.cache_version import Cache_Version
from ..db.sql_execute import update_execute from ..db.sql_execute import update_execute
from ..utils.lock import RedisLock
update_document_status_sql = """ update_document_status_sql = """
UPDATE "public"."document" UPDATE "public"."document"
@ -22,8 +23,8 @@ update_document_status_sql = """
def run(): def run():
from models_provider.models import Model, Status from models_provider.models import Model, Status
rlock = RedisLock()
if try_lock('event_init', 30 * 30): if rlock.try_lock('event_init', 30 * 30):
try: try:
# 修改Model状态为ERROR # 修改Model状态为ERROR
QuerySet(Model).filter( QuerySet(Model).filter(
@ -36,4 +37,4 @@ def run():
version, get_key = Cache_Version.SYSTEM.value version, get_key = Cache_Version.SYSTEM.value
cache.delete(get_key(key='rsa_key'), version=version) cache.delete(get_key(key='rsa_key'), version=version)
finally: finally:
un_lock('event_init') rlock.un_lock('event_init')

View File

@ -20,7 +20,7 @@ from langchain_core.embeddings import Embeddings
from common.config.embedding_config import VectorStore from common.config.embedding_config import VectorStore
from common.db.search import native_search, get_dynamics_model, native_update from common.db.search import native_search, get_dynamics_model, native_update
from common.utils.common import get_file_content from common.utils.common import get_file_content
from common.utils.lock import try_lock, un_lock from common.utils.lock import RedisLock
from common.utils.logger import maxkb_logger from common.utils.logger import maxkb_logger
from common.utils.page_utils import page_desc from common.utils.page_utils import page_desc
from knowledge.models import Paragraph, Status, Document, ProblemParagraphMapping, TaskType, State,SourceType, SearchMode from knowledge.models import Paragraph, Status, Document, ProblemParagraphMapping, TaskType, State,SourceType, SearchMode
@ -253,7 +253,8 @@ class ListenerManagement:
""" """
if state_list is None: if state_list is None:
state_list = [State.PENDING, State.SUCCESS, State.FAILURE, State.REVOKE, State.REVOKED] state_list = [State.PENDING, State.SUCCESS, State.FAILURE, State.REVOKE, State.REVOKED]
if not try_lock('embedding:' + str(document_id)): rlock = RedisLock()
if not rlock.try_lock('embedding:' + str(document_id)):
return return
try: try:
def is_the_task_interrupted(): def is_the_task_interrupted():
@ -290,7 +291,7 @@ class ListenerManagement:
ListenerManagement.post_update_document_status(document_id, TaskType.EMBEDDING) ListenerManagement.post_update_document_status(document_id, TaskType.EMBEDDING)
ListenerManagement.get_aggregation_document_status(document_id)() ListenerManagement.get_aggregation_document_status(document_id)()
maxkb_logger.info(_('End--->Embedding document: {document_id}').format(document_id=document_id)) maxkb_logger.info(_('End--->Embedding document: {document_id}').format(document_id=document_id))
un_lock('embedding:' + str(document_id)) rlock.un_lock('embedding:' + str(document_id))
@staticmethod @staticmethod
def embedding_by_knowledge(knowledge_id, embedding_model: Embeddings): def embedding_by_knowledge(knowledge_id, embedding_model: Embeddings):

View File

@ -8,7 +8,7 @@ from django.utils import timezone
from application.models import Application, Chat, ChatRecord from application.models import Application, Chat, ChatRecord
from common.job.scheduler import scheduler from common.job.scheduler import scheduler
from common.utils.lock import try_lock, un_lock, lock from common.utils.lock import lock, RedisLock
from common.utils.logger import maxkb_logger from common.utils.logger import maxkb_logger
from knowledge.models import File from knowledge.models import File
@ -70,7 +70,8 @@ def clean_chat_log_job_lock():
def run(): def run():
if try_lock('clean_chat_log_job', 30 * 30): rlock = RedisLock()
if rlock.try_lock('clean_chat_log_job', 30 * 30):
try: try:
maxkb_logger.debug('get lock clean_chat_log_job') maxkb_logger.debug('get lock clean_chat_log_job')
@ -79,4 +80,4 @@ def run():
existing_job.remove() existing_job.remove()
scheduler.add_job(clean_chat_log_job, 'cron', hour='0', minute='5', id='clean_chat_log') scheduler.add_job(clean_chat_log_job, 'cron', hour='0', minute='5', id='clean_chat_log')
finally: finally:
un_lock('clean_chat_log_job') rlock.un_lock('clean_chat_log_job')

View File

@ -5,7 +5,7 @@ from django.db.models import Q
from django.utils import timezone from django.utils import timezone
from common.job.scheduler import scheduler from common.job.scheduler import scheduler
from common.utils.lock import un_lock, try_lock, lock from common.utils.lock import lock, RedisLock
from common.utils.logger import maxkb_logger from common.utils.logger import maxkb_logger
from knowledge.models import File, FileSourceType from knowledge.models import File, FileSourceType
@ -25,12 +25,14 @@ def clean_debug_file_lock():
File.objects.filter( File.objects.filter(
Q(create_time__lt=one_days_ago, source_type=FileSourceType.TEMPORARY_1_DAY.value) | Q(create_time__lt=one_days_ago, source_type=FileSourceType.TEMPORARY_1_DAY.value) |
Q(create_time__lt=two_hours_ago, source_type=FileSourceType.TEMPORARY_120_MINUTE.value) | Q(create_time__lt=two_hours_ago, source_type=FileSourceType.TEMPORARY_120_MINUTE.value) |
Q(create_time__lt=minutes_30_ago, source_type=FileSourceType.TEMPORARY_30_MINUTE.value)).delete() Q(create_time__lt=minutes_30_ago, source_type=FileSourceType.TEMPORARY_30_MINUTE.value)
).delete()
maxkb_logger.debug(_('end clean debug file')) maxkb_logger.debug(_('end clean debug file'))
def run(): def run():
if try_lock('clean_debug_file', 30 * 30): rlock = RedisLock()
if rlock.try_lock('clean_debug_file', 30 * 30):
try: try:
maxkb_logger.debug('get lock clean_debug_file') maxkb_logger.debug('get lock clean_debug_file')
@ -39,4 +41,4 @@ def run():
clean_debug_file_job.remove() clean_debug_file_job.remove()
scheduler.add_job(clean_debug_file, 'cron', hour='*', minute='*/30', second='0', id='clean_debug_file') scheduler.add_job(clean_debug_file, 'cron', hour='*', minute='*/30', second='0', id='clean_debug_file')
finally: finally:
un_lock('clean_debug_file') rlock.un_lock('clean_debug_file')

View File

@ -11,7 +11,7 @@ from django.db.models import QuerySet
from application.models import ApplicationChatUserStats from application.models import ApplicationChatUserStats
from common.job.scheduler import scheduler from common.job.scheduler import scheduler
from common.utils.lock import try_lock, un_lock, lock from common.utils.lock import lock, RedisLock
from common.utils.logger import maxkb_logger from common.utils.logger import maxkb_logger
@ -28,7 +28,8 @@ def client_access_num_reset_job_lock():
def run(): def run():
if try_lock('access_num_reset', 30 * 30): rlock = RedisLock()
if rlock.try_lock('access_num_reset', 30 * 30):
try: try:
maxkb_logger.debug('get lock access_num_reset') maxkb_logger.debug('get lock access_num_reset')
@ -38,4 +39,4 @@ def run():
scheduler.add_job(client_access_num_reset_job, 'cron', hour='0', minute='0', second='0', scheduler.add_job(client_access_num_reset_job, 'cron', hour='0', minute='0', second='0',
id='access_num_reset') id='access_num_reset')
finally: finally:
un_lock('access_num_reset') rlock.un_lock('access_num_reset')

View File

@ -1,3 +1,5 @@
import subprocess
from .base import BaseService from .base import BaseService
from ..hands import * from ..hands import *
@ -35,3 +37,15 @@ class GunicornService(BaseService):
@property @property
def cwd(self): def cwd(self):
return APPS_DIR return APPS_DIR
def open_subprocess(self):
# 复制当前环境变量,并设置 ENABLE_SCHEDULER=1
env = os.environ.copy()
env['ENABLE_SCHEDULER'] = '1'
kwargs = {
'cwd': self.cwd,
'stderr': self.log_file,
'stdout': self.log_file,
'env': env
}
self._process = subprocess.Popen(self.cmd, **kwargs)

View File

@ -6,32 +6,47 @@
@date2023/9/11 11:45 @date2023/9/11 11:45
@desc: @desc:
""" """
from datetime import timedelta from functools import wraps
import uuid_utils.compat as uuid
from django.core.cache import caches from django.core.cache import caches
from django_redis import get_redis_connection
memory_cache = caches['default'] memory_cache = caches['default']
class RedisLock():
def __init__(self):
self.lock_value = None
def try_lock(key: str, timeout=None): def try_lock(self, key: str, timeout=None):
""" """
获取锁 获取锁
:param key: 获取锁 key :param key: 获取锁 key
:param timeout 超时时间 :param timeout 超时时间
:return: 是否获取到锁 :return: 是否获取到锁
""" """
if timeout is None: redis_client = get_redis_connection("default")
timeout = 3600 # 默认超时时间为3600秒 if timeout is None:
return memory_cache.add(key, 'lock', timeout=timeout) timeout = 3600 # 默认超时时间为3600秒
self.lock_value = str(uuid.uuid7())
return redis_client.set(key, self.lock_value, nx=True, ex=timeout)
def un_lock(key: str): def un_lock(self, key: str):
""" """
解锁 解锁
:param key: 解锁 key :param key: 解锁 key
:return: 是否解锁成功 :return: 是否解锁成功
""" """
return memory_cache.delete(key) redis_client = get_redis_connection("default")
unlock_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
redis_client.eval(unlock_script, 1, key, self.lock_value)
def lock(lock_key, timeout=None): def lock(lock_key, timeout=None):
@ -43,15 +58,19 @@ def lock(lock_key, timeout=None):
""" """
def inner(func): def decorator(func):
def run(*args, **kwargs): @wraps(func)
def wrapper(*args, **kwargs):
key = lock_key(*args, **kwargs) if callable(lock_key) else lock_key key = lock_key(*args, **kwargs) if callable(lock_key) else lock_key
rlock = RedisLock()
if not rlock.try_lock(key, timeout):
# 获取锁失败,可自定义异常或返回
return None
try: try:
if try_lock(key=key, timeout=timeout): return func(*args, **kwargs)
return func(*args, **kwargs)
finally: finally:
un_lock(key=key) rlock.un_lock(key)
return run return wrapper
return inner return decorator

View File

@ -26,4 +26,8 @@ def post_handler():
job.run() job.run()
DatabaseModelManage.init() DatabaseModelManage.init()
post_handler() # 仅在web中启动定时任务local_model celery 不需要
if os.environ.get('ENABLE_SCHEDULER') == '1':
post_handler()