# -*- coding: utf-8 -*- """ 音视频处理器 - MaxKB集成层 """ from typing import List from common.handle.base_split_handle import BaseSplitHandle from common.utils.logger import maxkb_logger class MediaSplitHandle(BaseSplitHandle): """ 音视频处理器 - MaxKB集成层 """ def __init__(self): super().__init__() self.adapter = None def support(self, file, get_buffer, **kwargs): """检查是否支持该文件类型""" file_name = file.name.lower() # 支持的音频格式 audio_exts = ('.mp3', '.wav', '.m4a', '.flac', '.aac', '.ogg', '.wma') # 支持的视频格式 video_exts = ('.mp4', '.avi', '.mov', '.mkv', '.webm', '.flv', '.wmv') return any(file_name.endswith(ext) for ext in audio_exts + video_exts) def handle(self, file, pattern_list: List, with_filter: bool, limit: int, get_buffer, save_image, **kwargs): """处理音视频文件""" maxkb_logger.info(f"MediaSplitHandle.handle called with file: {file.name}") maxkb_logger.info(f"Split parameters - limit: {limit}, patterns: {pattern_list}, with_filter: {with_filter}") # 检查是否需要实际处理 use_actual_processing = kwargs.get('use_actual_processing', False) stt_model_id = kwargs.get('stt_model_id') if use_actual_processing and stt_model_id: # 进行实际处理 result = self._handle_actual_processing(file, get_buffer, **kwargs) # 应用智能分块(如果需要) chunk_limit = limit if limit > 0 else 1000 # 默认1000字符 if len(result.get('content', [])) > 0: result = self._apply_smart_split(result, chunk_limit, with_filter) return result else: # 使用默认文本 result = self._handle_default_text(file, **kwargs) # 即使是默认文本也应用分块(如果有limit参数) if limit > 0: result = self._apply_smart_split(result, limit, with_filter) return result def _get_audio_default_segments(self, file_name: str) -> List[dict]: """生成音频文件的默认段落""" base_name = file_name.split('.')[0] return [ { 'title': '开场介绍', 'content': f'这是音频文件 "{base_name}" 的第一段内容演示。本段包含了会议的开场介绍和主要议题的说明。\n\n主要内容:\n- 会议目的和议程说明\n- 参会人员介绍\n- 会议背景和重要性\n- 预期成果和目标设定', 'start_time': 0, 'end_time': 180 }, { 'title': '主要内容讨论', 'content': f'这是音频文件 "{base_name}" 的第二段内容演示。本段详细讨论了项目的进展情况和下一步的工作计划。\n\n主要内容:\n- 项目当前进展汇报\n- 关键问题和挑战分析\n- 解决方案讨论\n- 资源需求和分配', 'start_time': 180, 'end_time': 360 }, { 'title': '总结与行动项', 'content': f'这是音频文件 "{base_name}" 的第三段内容演示。本段总结了会议的主要结论和行动项,明确了责任人和时间节点。\n\n主要内容:\n- 会议要点总结\n- 行动项和责任分配\n- 时间节点和里程碑\n- 后续跟进计划', 'start_time': 360, 'end_time': 540 } ] def _get_video_default_segments(self, file_name: str) -> List[dict]: """生成视频文件的默认段落""" base_name = file_name.split('.')[0] return [ { 'title': '开场介绍', 'content': f'这是视频文件 "{base_name}" 的第一段内容演示。本段包含了视频的开场介绍和主要内容概述。\n\n主要内容:\n- 产品/服务介绍\n- 功能特性概述\n- 目标用户群体\n- 使用场景说明', 'start_time': 0, 'end_time': 120 }, { 'title': '功能演示', 'content': f'这是视频文件 "{base_name}" 的第二段内容演示。本段详细展示了产品的功能特性和使用方法。\n\n主要内容:\n- 核心功能演示\n- 操作步骤说明\n- 使用技巧和注意事项\n- 常见问题解答', 'start_time': 120, 'end_time': 300 }, { 'title': '总结与联系方式', 'content': f'这是视频文件 "{base_name}" 的第三段内容演示。本段总结了产品的主要优势和适用场景,提供了联系方式。\n\n主要内容:\n- 产品优势总结\n- 价格和套餐信息\n- 适用场景和行业\n- 联系方式和售后服务', 'start_time': 300, 'end_time': 420 } ] def _get_media_default_segments(self, file_name: str) -> List[dict]: """生成其他媒体文件的默认段落""" base_name = file_name.split('.')[0] return [ { 'title': '文件概述', 'content': f'这是媒体文件 "{base_name}" 的第一段内容演示。本段包含了文件的基本信息和主要内容概述。\n\n主要内容:\n- 文件基本信息\n- 内容类型说明\n- 主要用途和价值\n- 处理建议和注意事项', 'start_time': 0, 'end_time': 120 }, { 'title': '详细内容', 'content': f'这是媒体文件 "{base_name}" 的第二段内容演示。本段详细介绍了文件的核心内容和关键信息。\n\n主要内容:\n- 核心内容分析\n- 关键信息提取\n- 重要要点总结\n- 后续处理建议', 'start_time': 120, 'end_time': 240 } ] def _handle_default_text(self, file, **kwargs) -> dict: """使用默认文本处理音视频文件""" maxkb_logger.info(f"Using default text for media processing: {file.name}") # 获取文件名和类型 file_name = file.name file_ext = file_name.lower().split('.')[-1] # 判断媒体类型 audio_exts = {'mp3', 'wav', 'm4a', 'flac', 'aac', 'ogg', 'wma'} video_exts = {'mp4', 'avi', 'mov', 'mkv', 'webm', 'flv', 'wmv'} if file_ext in audio_exts: media_type = "音频" default_segments = self._get_audio_default_segments(file_name) elif file_ext in video_exts: media_type = "视频" default_segments = self._get_video_default_segments(file_name) else: media_type = "媒体" default_segments = self._get_media_default_segments(file_name) maxkb_logger.info(f"Processing {media_type} file: {file_name}") maxkb_logger.info(f"Generating {len(default_segments)} default segments") # 转换为MaxKB段落格式 paragraphs = [] for i, segment_data in enumerate(default_segments): paragraph = { 'content': segment_data['content'], 'title': segment_data['title'], 'metadata': { 'start_time': segment_data.get('start_time'), 'end_time': segment_data.get('end_time'), 'index': i, 'is_demo': True, 'media_type': media_type, 'file_name': file_name } } paragraphs.append(paragraph) # 添加处理元数据 metadata = { 'media_processing_status': 'success', 'media_type': media_type, 'is_demo_content': True, 'processing_mode': 'default_text' } maxkb_logger.info(f"Successfully created {len(paragraphs)} default paragraphs for {file_name}") return { 'name': file.name, 'content': paragraphs, 'metadata': metadata } def _handle_actual_processing(self, file, get_buffer, **kwargs) -> dict: """实际处理音视频文件""" maxkb_logger.info(f"Starting actual processing for media file: {file.name}") # 初始化适配器 if not self.adapter: from .media_adapter import MediaAdapter from .media_adapter.logger import MediaLogger logger_wrapper = MediaLogger(maxkb_logger) self.adapter = MediaAdapter(logger=logger_wrapper) # 获取文件内容 buffer = get_buffer(file) # 获取模型ID和工作空间ID stt_model_id = kwargs.get('stt_model_id') llm_model_id = kwargs.get('llm_model_id') workspace_id = kwargs.get('workspace_id') maxkb_logger.info(f"Extracted from kwargs - stt_model_id: {stt_model_id}, llm_model_id: {llm_model_id}, workspace_id: {workspace_id}") # 处理选项 options_param = kwargs.get('options', {}) options = { 'language': options_param.get('language', kwargs.get('language', 'auto')), 'segment_duration': options_param.get('segment_duration', kwargs.get('segment_duration', 300)), 'enable_punctuation': options_param.get('enable_punctuation', kwargs.get('enable_punctuation', True)), 'enable_summary': True, 'extract_keyframes': options_param.get('extract_keyframes', kwargs.get('extract_keyframes', False)) } try: # 调用适配器处理 result = self.adapter.process_media( file_content=buffer, file_name=file.name, stt_model_id=stt_model_id, llm_model_id=llm_model_id, workspace_id=workspace_id, options=options ) # 转换为MaxKB段落格式 paragraphs = [] for segment in result.get('segments', []): # 使用优化后的文本(如果有) text = segment.get('enhanced_text', segment.get('text', '')) # 添加时间戳信息 if segment.get('start_time') is not None: time_info = f"[{self._format_time(segment['start_time'])} - {self._format_time(segment['end_time'])}]" text = f"{time_info}\n{text}" # 添加摘要(如果有) if segment.get('summary'): text = f"## 摘要\n\n{segment['summary']}\n\n---\n\n{text}" maxkb_logger.info(f"Adding summary to paragraph: {segment['summary'][:50]}...") paragraph = { 'content': text, 'title': f"段落 {segment.get('index', 0) + 1}", 'metadata': { 'start_time': segment.get('start_time'), 'end_time': segment.get('end_time'), 'index': segment.get('index'), 'is_demo': False, 'media_type': 'actual' } } paragraphs.append(paragraph) # 添加成功处理的标记 metadata = result.get('metadata', {}) metadata['media_processing_status'] = 'success' metadata['is_demo_content'] = False metadata['processing_mode'] = 'actual_processing' maxkb_logger.info(f"Successfully processed {file.name}, generated {len(paragraphs)} actual paragraphs") return { 'name': file.name, 'content': paragraphs, 'metadata': metadata } except Exception as e: maxkb_logger.error(f"实际处理音视频文件失败: {str(e)}") # 返回错误信息 return { 'name': file.name, 'content': [{ 'content': f'实际处理失败: {str(e)}', 'title': '错误' }], 'metadata': {'error': str(e), 'media_processing_status': 'failed'} } def get_content(self, file, save_image): """获取文件内容(用于预览)""" try: file_name = file.name # 判断媒体类型 file_ext = file_name.lower().split('.')[-1] video_exts = {'mp4', 'avi', 'mov', 'mkv', 'webm', 'flv', 'wmv'} if file_ext in video_exts: return f"[视频文件: {file_name}]\n\n该文件需要进行音频提取和语音识别处理。" else: return f"[音频文件: {file_name}]\n\n该文件需要进行语音识别处理。" except Exception as e: return f"读取文件失败: {str(e)}" def _format_time(self, seconds: float) -> str: """格式化时间""" if seconds is None: return "00:00" hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) secs = int(seconds % 60) if hours > 0: return f"{hours:02d}:{minutes:02d}:{secs:02d}" else: return f"{minutes:02d}:{secs:02d}" def smart_split_transcription(self, text, limit=1000, overlap=100): """智能分割转录文本""" if len(text) <= limit: return [text] # 预处理:移除过多的空行 text = '\n'.join(line for line in text.split('\n') if line.strip()) chunks = [] start = 0 while start < len(text): # 计算当前块的结束位置 end = min(start + limit, len(text)) # 如果不是最后一块,尝试在句号处断开 if end < len(text): # 寻找最佳断点 best_end = end # 优先在句号处断开 last_period = text.rfind('。', start, end) if last_period > start + limit // 2: best_end = last_period + 1 else: # 其次在换行符处断开 last_newline = text.rfind('\n', start, end) if last_newline > start + limit // 3: best_end = last_newline + 1 end = best_end # 提取当前块 chunk = text[start:end].strip() if chunk: chunks.append(chunk) # 计算下一块的开始位置(考虑重叠) start = max(end - overlap, len(chunks[-1]) if chunks else 0) # 避免无限循环 if start >= len(text): break return chunks def _apply_smart_split(self, result: dict, limit: int = 1000, with_filter: bool = False): """应用智能分块到转录结果""" overlap = 100 # 前后重叠字符数 new_paragraphs = [] for paragraph in result.get('content', []): content = paragraph.get('content', '') # 应用文本过滤(如果需要) if with_filter: content = self._clean_text(content) if content: # 应用智能分块 chunks = self.smart_split_transcription(content, limit, overlap) # 如果只有一个块且没有变化,保持原样 if len(chunks) == 1 and chunks[0] == content: new_paragraphs.append(paragraph) else: # 创建新的段落 for idx, chunk in enumerate(chunks): # 保留原始元数据,但更新分段相关信息 metadata = paragraph.get('metadata', {}).copy() metadata.update({ 'chunk_index': idx, 'total_chunks': len(chunks), 'split_method': 'smart_transcription', 'split_limit': limit, 'split_overlap': overlap, 'with_filter': with_filter }) new_paragraph = { 'content': chunk, 'title': f"{paragraph.get('title', '段落')} - 第{idx + 1}部分" if len(chunks) > 1 else paragraph.get('title', '段落'), 'metadata': metadata } new_paragraphs.append(new_paragraph) else: new_paragraphs.append(paragraph) # 更新结果 result['content'] = new_paragraphs # 更新元数据 metadata = result.get('metadata', {}) metadata['smart_split_applied'] = True metadata['total_chunks'] = len(new_paragraphs) result['metadata'] = metadata maxkb_logger.info(f"Applied smart transcription split: {len(new_paragraphs)} chunks") return result def _clean_text(self, text): """清理文本:去掉重复多余符号空格、空行、制表符""" import re # 移除多余的空白字符 text = re.sub(r'\s+', ' ', text) # 移除开头和结尾的空白 text = text.strip() # 移除重复的标点符号 text = re.sub(r'([。!?,])\1+', r'\1', text) # 移除多余的换行 text = re.sub(r'\n{3,}', '\n\n', text) return text