perf: Optimize workflow logic (#1996)

This commit is contained in:
shaohuzhang1 2025-01-08 16:05:27 +08:00 committed by GitHub
parent b07641cf66
commit 3e327d52d6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -342,15 +342,19 @@ class WorkflowManage:
self.run_chain_async(current_node, node_result_future) self.run_chain_async(current_node, node_result_future)
return tools.to_stream_response_simple(self.await_result()) return tools.to_stream_response_simple(self.await_result())
def is_run(self, timeout=0.1): def is_run(self, timeout=0.5):
self.lock.acquire() future_list_len = len(self.future_list)
try: try:
r = concurrent.futures.wait(self.future_list, timeout) r = concurrent.futures.wait(self.future_list, timeout)
return len(r.not_done) > 0 if len(r.not_done) > 0:
return True
else:
if future_list_len == len(self.future_list):
return False
else:
return True
except Exception as e: except Exception as e:
return True return True
finally:
self.lock.release()
def await_result(self): def await_result(self):
try: try:
@ -403,12 +407,8 @@ class WorkflowManage:
# 获取到可执行的子节点 # 获取到可执行的子节点
result_list = [{'node': node, 'future': executor.submit(self.run_chain_manage, node, None)} for node in result_list = [{'node': node, 'future': executor.submit(self.run_chain_manage, node, None)} for node in
sorted_node_run_list] sorted_node_run_list]
try: for r in result_list:
self.lock.acquire() self.future_list.append(r.get('future'))
for r in result_list:
self.future_list.append(r.get('future'))
finally:
self.lock.release()
def run_chain(self, current_node, node_result_future=None): def run_chain(self, current_node, node_result_future=None):
if node_result_future is None: if node_result_future is None: