#!/usr/bin/env python3 """ Multi-keyword search MCP server Support keyword array matching and sort output by match count Reference the implementation style of json_reader_server.py """ import json import os import sys import asyncio import re from typing import Any, Dict, List, Optional, Union from mcp_common import ( get_allowed_directory, load_tools_from_json, resolve_file_path, find_file_in_project, is_regex_pattern, compile_pattern, create_error_response, create_success_response, create_initialize_response, create_ping_response, create_tools_list_response, handle_mcp_streaming ) def parse_patterns_with_weights(patterns: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Parse the search pattern list; only weighted format is supported: [{"pattern": "keyword1", "weight": 2.0}, {"pattern": "/regex/", "weight": 0.5}] """ parsed_patterns = [] for item in patterns: if not isinstance(item, dict): raise ValueError(f"Error: Search pattern must be in dictionary format with 'pattern' and 'weight' fields. Invalid item: {item}") pattern = item.get('pattern') weight = item.get('weight') if pattern is None: raise ValueError(f"Error: Missing 'pattern' field. Invalid item: {item}") if weight is None: raise ValueError(f"Error: Missing 'weight' field. Invalid item: {item}") # Ensure the weight is numeric try: weight = float(weight) if weight <= 0: raise ValueError(f"Error: Weight must be a positive number. Invalid weight: {weight}") except (ValueError, TypeError): raise ValueError(f"Error: Weight must be a valid number. Invalid weight: {weight}") parsed_patterns.append({ 'pattern': pattern, 'weight': weight }) return parsed_patterns def search_count(patterns: List[Dict[str, Any]], file_paths: List[str], case_sensitive: bool = False) -> Dict[str, Any]: """Evaluate multi-pattern match counts (keywords and regex) with required weights""" if not patterns: return { "content": [ { "type": "text", "text": "Error: Search pattern list cannot be empty" } ] } # Parse search patterns and weights try: parsed_patterns = parse_patterns_with_weights(patterns) except ValueError as e: return { "content": [ { "type": "text", "text": str(e) } ] } if not parsed_patterns: return { "content": [ { "type": "text", "text": "Error: No valid search patterns" } ] } if not file_paths: return { "content": [ { "type": "text", "text": "Error: File path list cannot be empty" } ] } # Preprocess and validate regex patterns in the search patterns valid_patterns = [] regex_errors = [] for pattern_info in parsed_patterns: pattern = pattern_info['pattern'] compiled = compile_pattern(pattern) if compiled is None: regex_errors.append(pattern) else: valid_patterns.append({ 'pattern': pattern, 'weight': pattern_info['weight'], 'compiled_pattern': compiled }) if regex_errors: error_msg = f"Warning: The following regular expressions failed to compile and will be ignored: {', '.join(regex_errors)}" print(error_msg) # Validate file paths valid_paths = [] for file_path in file_paths: try: # Resolve file paths, supporting folder/document.txt and document.txt formats resolved_path = resolve_file_path(file_path) valid_paths.append(resolved_path) except Exception as e: continue if not valid_paths: return { "content": [ { "type": "text", "text": f"Error: Specified files not found in project directory {get_allowed_directory()}" } ] } # Count all match results all_results = [] for file_path in valid_paths: try: results = search_patterns_in_file(file_path, valid_patterns, case_sensitive) all_results.extend(results) except Exception as e: continue # Compute statistics total_lines_searched = 0 total_weight_score = 0.0 pattern_match_stats = {} file_match_stats = {} # Initialize pattern statistics for pattern_info in valid_patterns: pattern_key = pattern_info['pattern'] pattern_match_stats[pattern_key] = { 'match_count': 0, 'weight_score': 0.0, 'lines_matched': set() } # Count lines across all files for file_path in valid_paths: try: with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: lines = f.readlines() total_lines_searched += len(lines) except Exception: continue # Process match results for result in all_results: total_weight_score += result.get('weight_score', 0) # File-level statistics file_path = result['file_path'] if file_path not in file_match_stats: file_match_stats[file_path] = { 'match_count': 0, 'weight_score': 0.0, 'lines_matched': set() } file_match_stats[file_path]['match_count'] += 1 file_match_stats[file_path]['weight_score'] += result.get('weight_score', 0) file_match_stats[file_path]['lines_matched'].add(result['line_number']) # Pattern-level statistics for pattern in result['matched_patterns']: original_pattern = pattern['original'] if original_pattern in pattern_match_stats: pattern_match_stats[original_pattern]['match_count'] += pattern['match_count'] pattern_match_stats[original_pattern]['weight_score'] += pattern['weight_score'] pattern_match_stats[original_pattern]['lines_matched'].add(result['line_number']) # Format statistical output formatted_lines = [] formatted_lines.append("=== Matching Statistics Evaluation ===") formatted_lines.append(f"Files searched: {len(valid_paths)}") formatted_lines.append(f"Total lines searched: {total_lines_searched}") formatted_lines.append(f"Total matched lines: {len(all_results)}") formatted_lines.append(f"Total weight score: {total_weight_score:.2f}") formatted_lines.append(f"Match rate: {(len(all_results)/total_lines_searched*100):.2f}%" if total_lines_searched > 0 else "Match rate: 0.00%") formatted_lines.append("") # Statistics by file formatted_lines.append("=== Statistics by File ===") for file_path, stats in sorted(file_match_stats.items(), key=lambda x: x[1]['weight_score'], reverse=True): file_name = os.path.basename(file_path) formatted_lines.append(f"File: {file_name}") formatted_lines.append(f" Matched lines: {len(stats['lines_matched'])}") formatted_lines.append(f" Weight score: {stats['weight_score']:.2f}") formatted_lines.append("") # Statistics by pattern formatted_lines.append("=== Statistics by Pattern ===") for pattern, stats in sorted(pattern_match_stats.items(), key=lambda x: x[1]['weight_score'], reverse=True): formatted_lines.append(f"Pattern: {pattern}") formatted_lines.append(f" Match count: {stats['match_count']}") formatted_lines.append(f" Matched lines: {len(stats['lines_matched'])}") formatted_lines.append(f" Weight score: {stats['weight_score']:.2f}") formatted_lines.append("") formatted_output = "\n".join(formatted_lines) return { "content": [ { "type": "text", "text": formatted_output } ] } def search(patterns: List[Dict[str, Any]], file_paths: List[str], limit: int = 10, case_sensitive: bool = False) -> Dict[str, Any]: """Run multi-pattern search (keywords and regex) with required weights""" if not patterns: return { "content": [ { "type": "text", "text": "Error: Search pattern list cannot be empty" } ] } # Parse search patterns and weights try: parsed_patterns = parse_patterns_with_weights(patterns) except ValueError as e: return { "content": [ { "type": "text", "text": str(e) } ] } if not parsed_patterns: return { "content": [ { "type": "text", "text": "Error: No valid search patterns" } ] } if not file_paths: return { "content": [ { "type": "text", "text": "Error: File path list cannot be empty" } ] } # Preprocess and validate regex patterns in the search patterns valid_patterns = [] regex_errors = [] for pattern_info in parsed_patterns: pattern = pattern_info['pattern'] compiled = compile_pattern(pattern) if compiled is None: regex_errors.append(pattern) else: valid_patterns.append({ 'pattern': pattern, 'weight': pattern_info['weight'], 'compiled_pattern': compiled }) if regex_errors: error_msg = f"Warning: The following regular expressions failed to compile and will be ignored: {', '.join(regex_errors)}" print(error_msg) # Validate file paths valid_paths = [] for file_path in file_paths: try: # Resolve file paths, supporting folder/document.txt and document.txt formats resolved_path = resolve_file_path(file_path) valid_paths.append(resolved_path) except Exception as e: continue if not valid_paths: return { "content": [ { "type": "text", "text": f"Error: Specified files not found in project directory {get_allowed_directory()}" } ] } # Collect all match results all_results = [] for file_path in valid_paths: try: results = search_patterns_in_file(file_path, valid_patterns, case_sensitive) all_results.extend(results) except Exception as e: continue # Sort by weight score in descending order, then by match count when scores are equal all_results.sort(key=lambda x: (x.get('weight_score', 0), x['match_count']), reverse=True) # Limit the number of results limited_results = all_results[:limit] # Format the output if not limited_results: return { "content": [ { "type": "text", "text": "No matching results found" } ] } # Enhance formatted output by showing total matched lines on the first line, followed by weight score, match type, and details formatted_lines = [] # Show total match count and displayed result count on the first line total_matches = len(all_results) showing_count = len(limited_results) summary_line = f"Found {total_matches} matches, showing top {showing_count} results:" formatted_lines.append(summary_line) # Add formatted search results for result in limited_results: weight_score = result.get('weight_score', 0) line_prefix = f"{result['line_number']}:weight({weight_score:.2f}):" # Build match details match_details = [] for pattern in result['matched_patterns']: if pattern['type'] == 'regex': match_details.append(f"[regex:{pattern['original']}={pattern['match']}]") else: match_details.append(f"[keyword:{pattern['match']}]") match_info = " ".join(match_details) if match_details else "" formatted_line = f"{line_prefix}{match_info}:{result['content']}" if match_info else f"{line_prefix}{result['content']}" formatted_lines.append(formatted_line) formatted_output = "\n".join(formatted_lines) return { "content": [ { "type": "text", "text": formatted_output } ] } def search_patterns_in_file(file_path: str, patterns: List[Dict[str, Any]], case_sensitive: bool) -> List[Dict[str, Any]]: """Search patterns in a single file, supporting keywords, regular expressions, and weighted scoring.""" results = [] try: with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: lines = f.readlines() except Exception as e: return results # Preprocess all patterns, including weight information processed_patterns = [] for pattern_info in patterns: compiled = pattern_info['compiled_pattern'] if compiled is not None: # Skip invalid regular expressions processed_patterns.append({ 'original': pattern_info['pattern'], 'pattern': compiled, 'is_regex': isinstance(compiled, re.Pattern), 'weight': pattern_info['weight'] }) for line_number, line in enumerate(lines, 1): line_content = line.rstrip('\n\r') search_line = line_content if case_sensitive else line_content.lower() # Count matched patterns and compute weighted scores matched_patterns = [] weight_score = 0.0 for pattern_info in processed_patterns: pattern = pattern_info['pattern'] is_regex = pattern_info['is_regex'] weight = pattern_info['weight'] match_found = False match_details = None match_count_in_line = 0 if is_regex: # Regular expression matching if case_sensitive: matches = list(pattern.finditer(line_content)) else: # For case-insensitive regex, recompile it if isinstance(pattern, re.Pattern): # Create a case-insensitive version flags = pattern.flags | re.IGNORECASE case_insensitive_pattern = re.compile(pattern.pattern, flags) matches = list(case_insensitive_pattern.finditer(line_content)) else: # For string patterns, convert to lowercase before matching search_pattern = pattern.lower() if isinstance(pattern, str) else pattern matches = list(re.finditer(search_pattern, search_line)) if matches: match_found = True match_details = matches[0].group(0) # Repeated regex matches only count once for weighting match_count_in_line = 1 else: # Plain string matching search_keyword = pattern if case_sensitive else pattern.lower() if search_keyword in search_line: match_found = True match_details = pattern # Repeated keyword matches only count once for weighting match_count_in_line = 1 if match_found: # Compute this pattern's weight contribution (weight * match count) pattern_weight_score = weight * match_count_in_line weight_score += pattern_weight_score matched_patterns.append({ 'original': pattern_info['original'], 'type': 'regex' if is_regex else 'keyword', 'match': match_details, 'weight': weight, 'match_count': match_count_in_line, 'weight_score': pattern_weight_score }) if weight_score > 0: results.append({ 'line_number': line_number, 'content': line_content, 'match_count': len(matched_patterns), 'weight_score': weight_score, 'matched_patterns': matched_patterns, 'file_path': file_path }) return results def regex_grep(patterns: Union[str, List[str]], file_paths: List[str], context_lines: int = 0, case_sensitive: bool = False, limit: int = 50) -> Dict[str, Any]: """Search file contents with regular expressions, supporting multiple patterns and context lines.""" # Handle pattern input if isinstance(patterns, str): patterns = [patterns] # Validate pattern list if not patterns or not any(p.strip() for p in patterns): return { "content": [ { "type": "text", "text": "Error: Patterns cannot be empty" } ] } # Filter out empty patterns patterns = [p.strip() for p in patterns if p.strip()] if not file_paths: return { "content": [ { "type": "text", "text": "Error: File path list cannot be empty" } ] } # Compile regular expressions compiled_patterns = [] for pattern in patterns: try: flags = 0 if case_sensitive else re.IGNORECASE compiled_pattern = re.compile(pattern, flags) compiled_patterns.append((pattern, compiled_pattern)) except re.error as e: # For invalid regular expressions, skip them but log a warning print(f"Warning: Invalid regular expression '{pattern}': {str(e)}, skipping...") continue if not compiled_patterns: return { "content": [ { "type": "text", "text": "Error: No valid regular expressions found" } ] } # Validate file paths valid_paths = [] for file_path in file_paths: try: # Resolve file paths, supporting folder/document.txt and document.txt formats resolved_path = resolve_file_path(file_path) valid_paths.append(resolved_path) except Exception as e: continue if not valid_paths: return { "content": [ { "type": "text", "text": f"Error: Specified files not found in project directory {get_allowed_directory()}" } ] } # Collect all match results all_results = [] for file_path in valid_paths: try: for pattern, compiled_pattern in compiled_patterns: results = regex_search_in_file(file_path, compiled_pattern, context_lines, case_sensitive, pattern) all_results.extend(results) except Exception as e: continue # Sort by file path and line number all_results.sort(key=lambda x: (x['file_path'], x['match_line_number'])) # Limit the number of results limited_results = all_results[:limit] # Format the output if not limited_results: return { "content": [ { "type": "text", "text": "No matches found" } ] } # Format the output formatted_lines = [] # Show total match count and pattern count total_matches = len(all_results) showing_count = len(limited_results) summary_line = f"Found {total_matches} matches for {len(compiled_patterns)} patterns, showing top {showing_count} results:" formatted_lines.append(summary_line) # Display results grouped by file current_file = None for result in limited_results: file_path = result['file_path'] if file_path != current_file: current_file = file_path file_name = os.path.basename(file_path) formatted_lines.append(f"\n--- File: {file_name} ---") match_line = result['match_line_number'] match_text = result['match_text'] matched_content = result['matched_content'] pattern = result.get('pattern', 'unknown') # Display the matched line and pattern formatted_lines.append(f"{match_line}[pattern: {pattern}]:{matched_content}") # Display context lines if 'context_before' in result: for context_line in result['context_before']: formatted_lines.append(f"{context_line['line_number']}:{context_line['content']}") if 'context_after' in result: for context_line in result['context_after']: formatted_lines.append(f"{context_line['line_number']}:{context_line['content']}") formatted_output = "\n".join(formatted_lines) return { "content": [ { "type": "text", "text": formatted_output } ] } def regex_grep_count(patterns: Union[str, List[str]], file_paths: List[str], case_sensitive: bool = False) -> Dict[str, Any]: """Count regex matches, supporting multiple patterns.""" # Handle pattern input if isinstance(patterns, str): patterns = [patterns] # Validate pattern list if not patterns or not any(p.strip() for p in patterns): return { "content": [ { "type": "text", "text": "Error: Patterns cannot be empty" } ] } # Filter out empty patterns patterns = [p.strip() for p in patterns if p.strip()] if not file_paths: return { "content": [ { "type": "text", "text": "Error: File path list cannot be empty" } ] } # Compile regular expressions compiled_patterns = [] for pattern in patterns: try: flags = 0 if case_sensitive else re.IGNORECASE compiled_pattern = re.compile(pattern, flags) compiled_patterns.append((pattern, compiled_pattern)) except re.error as e: # For invalid regular expressions, skip them but log a warning print(f"Warning: Invalid regular expression '{pattern}': {str(e)}, skipping...") continue if not compiled_patterns: return { "content": [ { "type": "text", "text": "Error: No valid regular expressions found" } ] } # Validate file paths valid_paths = [] for file_path in file_paths: try: # Resolve file paths, supporting folder/document.txt and document.txt formats resolved_path = resolve_file_path(file_path) valid_paths.append(resolved_path) except Exception as e: continue if not valid_paths: return { "content": [ { "type": "text", "text": f"Error: Specified files not found in project directory {get_allowed_directory()}" } ] } # Count match results total_matches = 0 total_lines_with_matches = 0 file_stats = {} pattern_stats = {} # Initialize pattern statistics for pattern, _ in compiled_patterns: pattern_stats[pattern] = { 'matches': 0, 'lines_with_matches': 0 } for file_path in valid_paths: file_name = os.path.basename(file_path) file_matches = 0 file_lines_with_matches = 0 try: for pattern, compiled_pattern in compiled_patterns: matches, lines_with_matches = regex_count_in_file(file_path, compiled_pattern, case_sensitive) total_matches += matches total_lines_with_matches += lines_with_matches file_matches += matches file_lines_with_matches = max(file_lines_with_matches, lines_with_matches) # Avoid double-counting line totals # Update pattern statistics pattern_stats[pattern]['matches'] += matches pattern_stats[pattern]['lines_with_matches'] += lines_with_matches file_stats[file_name] = { 'matches': file_matches, 'lines_with_matches': file_lines_with_matches } except Exception as e: continue # Format the output formatted_lines = [] formatted_lines.append("=== Regex Match Statistics ===") formatted_lines.append(f"Patterns: {', '.join([p for p, _ in compiled_patterns])}") formatted_lines.append(f"Files searched: {len(valid_paths)}") formatted_lines.append(f"Total matches: {total_matches}") formatted_lines.append(f"Total lines with matches: {total_lines_with_matches}") formatted_lines.append("") # Statistics by pattern formatted_lines.append("=== Statistics by Pattern ===") for pattern, stats in sorted(pattern_stats.items()): formatted_lines.append(f"Pattern: {pattern}") formatted_lines.append(f" Matches: {stats['matches']}") formatted_lines.append(f" Lines with matches: {stats['lines_with_matches']}") formatted_lines.append("") # Statistics by file formatted_lines.append("=== Statistics by File ===") for file_name, stats in sorted(file_stats.items()): formatted_lines.append(f"File: {file_name}") formatted_lines.append(f" Matches: {stats['matches']}") formatted_lines.append(f" Lines with matches: {stats['lines_with_matches']}") formatted_lines.append("") formatted_output = "\n".join(formatted_lines) return { "content": [ { "type": "text", "text": formatted_output } ] } def regex_search_in_file(file_path: str, pattern: re.Pattern, context_lines: int, case_sensitive: bool, pattern_str: str = None) -> List[Dict[str, Any]]: """Search a single file with a regex and optional context.""" results = [] try: with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: lines = f.readlines() except Exception as e: return results for line_number, line in enumerate(lines, 1): line_content = line.rstrip('\n\r') # Search for matches matches = list(pattern.finditer(line_content)) if matches: # Prepare context context_before = [] context_after = [] if context_lines > 0: # Get preceding context start_line = max(0, line_number - 1 - context_lines) for i in range(start_line, line_number - 1): if i < len(lines): context_before.append({ 'line_number': i + 1, 'content': lines[i].rstrip('\n\r') }) # Get following context end_line = min(len(lines), line_number + context_lines) for i in range(line_number, end_line): if i < len(lines): context_after.append({ 'line_number': i + 1, 'content': lines[i].rstrip('\n\r') }) # Create a result for each match for match in matches: result = { 'file_path': file_path, 'match_line_number': line_number, 'match_text': line_content, 'matched_content': match.group(0), 'pattern': pattern_str or 'unknown', 'start_pos': match.start(), 'end_pos': match.end() } if context_before: result['context_before'] = context_before if context_after: result['context_after'] = context_after results.append(result) return results def regex_count_in_file(file_path: str, pattern: re.Pattern, case_sensitive: bool) -> tuple[int, int]: """Count matches in a file.""" total_matches = 0 lines_with_matches = 0 try: with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: lines = f.readlines() except Exception as e: return total_matches, lines_with_matches for line_number, line in enumerate(lines, 1): line_content = line.rstrip('\n\r') # Search for matches matches = list(pattern.finditer(line_content)) if matches: total_matches += len(matches) lines_with_matches += 1 return total_matches, lines_with_matches async def handle_request(request: Dict[str, Any]) -> Dict[str, Any]: """Handle MCP request""" try: method = request.get("method") params = request.get("params", {}) request_id = request.get("id") if method == "initialize": return create_initialize_response(request_id, "multi-keyword-search") elif method == "ping": return create_ping_response(request_id) elif method == "tools/list": # Load tool definitions from the JSON file tools = load_tools_from_json("multi_keyword_search_tools.json") return create_tools_list_response(request_id, tools) elif method == "tools/call": tool_name = params.get("name") arguments = params.get("arguments", {}) if tool_name == "search": patterns = arguments.get("patterns", []) file_paths = arguments.get("file_paths", []) limit = arguments.get("limit", 10) case_sensitive = arguments.get("case_sensitive", False) result = search(patterns, file_paths, limit, case_sensitive) return { "jsonrpc": "2.0", "id": request_id, "result": result } elif tool_name == "search_count": patterns = arguments.get("patterns", []) file_paths = arguments.get("file_paths", []) case_sensitive = arguments.get("case_sensitive", False) result = search_count(patterns, file_paths, case_sensitive) return { "jsonrpc": "2.0", "id": request_id, "result": result } elif tool_name == "regex_grep": patterns = arguments.get("patterns", []) # Backward-compatible support for the legacy pattern parameter if not patterns and "pattern" in arguments: patterns = arguments.get("pattern", "") file_paths = arguments.get("file_paths", []) context_lines = arguments.get("context_lines", 0) case_sensitive = arguments.get("case_sensitive", False) limit = arguments.get("limit", 50) result = regex_grep(patterns, file_paths, context_lines, case_sensitive, limit) return { "jsonrpc": "2.0", "id": request_id, "result": result } elif tool_name == "regex_grep_count": patterns = arguments.get("patterns", []) # Backward-compatible support for the legacy pattern parameter if not patterns and "pattern" in arguments: patterns = arguments.get("pattern", "") file_paths = arguments.get("file_paths", []) case_sensitive = arguments.get("case_sensitive", False) result = regex_grep_count(patterns, file_paths, case_sensitive) return { "jsonrpc": "2.0", "id": request_id, "result": result } else: return create_error_response(request_id, -32601, f"Unknown tool: {tool_name}") else: return create_error_response(request_id, -32601, f"Unknown method: {method}") except Exception as e: return create_error_response(request.get("id"), -32603, f"Internal error: {str(e)}") async def main(): """Main entry point.""" await handle_mcp_streaming(handle_request) if __name__ == "__main__": asyncio.run(main())