refactor: Workflow execution logic (#1886)
This commit is contained in:
parent
abef79efa6
commit
a00af1e288
@ -29,7 +29,7 @@ from function_lib.models.function import FunctionLib
|
|||||||
from setting.models import Model
|
from setting.models import Model
|
||||||
from setting.models_provider import get_model_credential
|
from setting.models_provider import get_model_credential
|
||||||
|
|
||||||
executor = ThreadPoolExecutor(max_workers=50)
|
executor = ThreadPoolExecutor(max_workers=200)
|
||||||
|
|
||||||
|
|
||||||
class Edge:
|
class Edge:
|
||||||
@ -271,7 +271,7 @@ class WorkflowManage:
|
|||||||
self.current_result = None
|
self.current_result = None
|
||||||
self.answer = ""
|
self.answer = ""
|
||||||
self.answer_list = ['']
|
self.answer_list = ['']
|
||||||
self.status = 0
|
self.status = 200
|
||||||
self.base_to_response = base_to_response
|
self.base_to_response = base_to_response
|
||||||
self.chat_record = chat_record
|
self.chat_record = chat_record
|
||||||
self.await_future_map = {}
|
self.await_future_map = {}
|
||||||
@ -384,8 +384,23 @@ class WorkflowManage:
|
|||||||
'', True, message_tokens, answer_tokens, {})
|
'', True, message_tokens, answer_tokens, {})
|
||||||
|
|
||||||
def run_chain_async(self, current_node, node_result_future):
|
def run_chain_async(self, current_node, node_result_future):
|
||||||
future = executor.submit(self.run_chain, current_node, node_result_future)
|
return executor.submit(self.run_chain_manage, current_node, node_result_future)
|
||||||
return future
|
|
||||||
|
def run_chain_manage(self, current_node, node_result_future):
|
||||||
|
if current_node is None:
|
||||||
|
start_node = self.get_start_node()
|
||||||
|
current_node = get_node(start_node.type)(start_node, self.params, self)
|
||||||
|
result = self.run_chain(current_node, node_result_future)
|
||||||
|
node_list = self.get_next_node_list(current_node, result)
|
||||||
|
if len(node_list) == 1:
|
||||||
|
self.run_chain_manage(node_list[0], None)
|
||||||
|
elif len(node_list) > 1:
|
||||||
|
|
||||||
|
# 获取到可执行的子节点
|
||||||
|
result_list = [{'node': node, 'future': executor.submit(self.run_chain_manage, node, None)} for node in
|
||||||
|
node_list]
|
||||||
|
self.set_await_map(result_list)
|
||||||
|
[r.get('future').result() for r in result_list]
|
||||||
|
|
||||||
def set_await_map(self, node_run_list):
|
def set_await_map(self, node_run_list):
|
||||||
sorted_node_run_list = sorted(node_run_list, key=lambda n: n.get('node').node.y)
|
sorted_node_run_list = sorted(node_run_list, key=lambda n: n.get('node').node.y)
|
||||||
@ -395,9 +410,6 @@ class WorkflowManage:
|
|||||||
for i in range(index)]
|
for i in range(index)]
|
||||||
|
|
||||||
def run_chain(self, current_node, node_result_future=None):
|
def run_chain(self, current_node, node_result_future=None):
|
||||||
if current_node is None:
|
|
||||||
start_node = self.get_start_node()
|
|
||||||
current_node = get_node(start_node.type)(start_node, self.params, self)
|
|
||||||
if node_result_future is None:
|
if node_result_future is None:
|
||||||
node_result_future = self.run_node_future(current_node)
|
node_result_future = self.run_node_future(current_node)
|
||||||
try:
|
try:
|
||||||
@ -409,18 +421,10 @@ class WorkflowManage:
|
|||||||
result = self.hand_event_node_result(current_node,
|
result = self.hand_event_node_result(current_node,
|
||||||
node_result_future) if is_stream else self.hand_node_result(
|
node_result_future) if is_stream else self.hand_node_result(
|
||||||
current_node, node_result_future)
|
current_node, node_result_future)
|
||||||
with self.lock:
|
return result
|
||||||
if current_node.status == 500:
|
|
||||||
return
|
|
||||||
node_list = self.get_next_node_list(current_node, result)
|
|
||||||
# 获取到可执行的子节点
|
|
||||||
result_list = [{'node': node, 'future': self.run_chain_async(node, None)} for node in node_list]
|
|
||||||
self.set_await_map(result_list)
|
|
||||||
[r.get('future').result() for r in result_list]
|
|
||||||
if self.status == 0:
|
|
||||||
self.status = 200
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
|
return []
|
||||||
|
|
||||||
def hand_node_result(self, current_node, node_result_future):
|
def hand_node_result(self, current_node, node_result_future):
|
||||||
try:
|
try:
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user