Convert all Chinese comments, docstrings, logger/print output, HTTPException detail messages, and API response messages to English across the entire codebase. Functional zh/ja localized strings (e.g. prompt templates, timezone display names, date formats) are preserved as-is. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
982 lines
34 KiB
Python
982 lines
34 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Multi-keyword search MCP server
|
|
Support keyword array matching and sort output by match count
|
|
Reference the implementation style of json_reader_server.py
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
import asyncio
|
|
import re
|
|
from typing import Any, Dict, List, Optional, Union
|
|
from mcp_common import (
|
|
get_allowed_directory,
|
|
load_tools_from_json,
|
|
resolve_file_path,
|
|
find_file_in_project,
|
|
is_regex_pattern,
|
|
compile_pattern,
|
|
create_error_response,
|
|
create_success_response,
|
|
create_initialize_response,
|
|
create_ping_response,
|
|
create_tools_list_response,
|
|
handle_mcp_streaming
|
|
)
|
|
|
|
|
|
def parse_patterns_with_weights(patterns: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
"""Parse the search pattern list; only weighted format is supported:
|
|
[{"pattern": "keyword1", "weight": 2.0}, {"pattern": "/regex/", "weight": 0.5}]
|
|
"""
|
|
parsed_patterns = []
|
|
|
|
for item in patterns:
|
|
if not isinstance(item, dict):
|
|
raise ValueError(f"Error: Search pattern must be in dictionary format with 'pattern' and 'weight' fields. Invalid item: {item}")
|
|
|
|
pattern = item.get('pattern')
|
|
weight = item.get('weight')
|
|
|
|
if pattern is None:
|
|
raise ValueError(f"Error: Missing 'pattern' field. Invalid item: {item}")
|
|
|
|
if weight is None:
|
|
raise ValueError(f"Error: Missing 'weight' field. Invalid item: {item}")
|
|
|
|
# Ensure the weight is numeric
|
|
try:
|
|
weight = float(weight)
|
|
if weight <= 0:
|
|
raise ValueError(f"Error: Weight must be a positive number. Invalid weight: {weight}")
|
|
except (ValueError, TypeError):
|
|
raise ValueError(f"Error: Weight must be a valid number. Invalid weight: {weight}")
|
|
|
|
parsed_patterns.append({
|
|
'pattern': pattern,
|
|
'weight': weight
|
|
})
|
|
|
|
return parsed_patterns
|
|
|
|
|
|
def search_count(patterns: List[Dict[str, Any]], file_paths: List[str],
|
|
case_sensitive: bool = False) -> Dict[str, Any]:
|
|
"""Evaluate multi-pattern match counts (keywords and regex) with required weights"""
|
|
if not patterns:
|
|
return {
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": "Error: Search pattern list cannot be empty"
|
|
}
|
|
]
|
|
}
|
|
|
|
# Parse search patterns and weights
|
|
try:
|
|
parsed_patterns = parse_patterns_with_weights(patterns)
|
|
except ValueError as e:
|
|
return {
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": str(e)
|
|
}
|
|
]
|
|
}
|
|
|
|
if not parsed_patterns:
|
|
return {
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": "Error: No valid search patterns"
|
|
}
|
|
]
|
|
}
|
|
|
|
if not file_paths:
|
|
return {
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": "Error: File path list cannot be empty"
|
|
}
|
|
]
|
|
}
|
|
|
|
# Preprocess and validate regex patterns in the search patterns
|
|
valid_patterns = []
|
|
regex_errors = []
|
|
|
|
for pattern_info in parsed_patterns:
|
|
pattern = pattern_info['pattern']
|
|
compiled = compile_pattern(pattern)
|
|
if compiled is None:
|
|
regex_errors.append(pattern)
|
|
else:
|
|
valid_patterns.append({
|
|
'pattern': pattern,
|
|
'weight': pattern_info['weight'],
|
|
'compiled_pattern': compiled
|
|
})
|
|
|
|
if regex_errors:
|
|
error_msg = f"Warning: The following regular expressions failed to compile and will be ignored: {', '.join(regex_errors)}"
|
|
print(error_msg)
|
|
|
|
# Validate file paths
|
|
valid_paths = []
|
|
for file_path in file_paths:
|
|
try:
|
|
# Resolve file paths, supporting folder/document.txt and document.txt formats
|
|
resolved_path = resolve_file_path(file_path)
|
|
valid_paths.append(resolved_path)
|
|
except Exception as e:
|
|
continue
|
|
|
|
if not valid_paths:
|
|
return {
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": f"Error: Specified files not found in project directory {get_allowed_directory()}"
|
|
}
|
|
]
|
|
}
|
|
|
|
# Count all match results
|
|
all_results = []
|
|
|
|
for file_path in valid_paths:
|
|
try:
|
|
results = search_patterns_in_file(file_path, valid_patterns, case_sensitive)
|
|
all_results.extend(results)
|
|
except Exception as e:
|
|
continue
|
|
|
|
# Compute statistics
|
|
total_lines_searched = 0
|
|
total_weight_score = 0.0
|
|
pattern_match_stats = {}
|
|
file_match_stats = {}
|
|
|
|
# Initialize pattern statistics
|
|
for pattern_info in valid_patterns:
|
|
pattern_key = pattern_info['pattern']
|
|
pattern_match_stats[pattern_key] = {
|
|
'match_count': 0,
|
|
'weight_score': 0.0,
|
|
'lines_matched': set()
|
|
}
|
|
|
|
# Count lines across all files
|
|
for file_path in valid_paths:
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
|
|
lines = f.readlines()
|
|
total_lines_searched += len(lines)
|
|
except Exception:
|
|
continue
|
|
|
|
# Process match results
|
|
for result in all_results:
|
|
total_weight_score += result.get('weight_score', 0)
|
|
|
|
# File-level statistics
|
|
file_path = result['file_path']
|
|
if file_path not in file_match_stats:
|
|
file_match_stats[file_path] = {
|
|
'match_count': 0,
|
|
'weight_score': 0.0,
|
|
'lines_matched': set()
|
|
}
|
|
|
|
file_match_stats[file_path]['match_count'] += 1
|
|
file_match_stats[file_path]['weight_score'] += result.get('weight_score', 0)
|
|
file_match_stats[file_path]['lines_matched'].add(result['line_number'])
|
|
|
|
# Pattern-level statistics
|
|
for pattern in result['matched_patterns']:
|
|
original_pattern = pattern['original']
|
|
if original_pattern in pattern_match_stats:
|
|
pattern_match_stats[original_pattern]['match_count'] += pattern['match_count']
|
|
pattern_match_stats[original_pattern]['weight_score'] += pattern['weight_score']
|
|
pattern_match_stats[original_pattern]['lines_matched'].add(result['line_number'])
|
|
|
|
# Format statistical output
|
|
formatted_lines = []
|
|
formatted_lines.append("=== Matching Statistics Evaluation ===")
|
|
formatted_lines.append(f"Files searched: {len(valid_paths)}")
|
|
formatted_lines.append(f"Total lines searched: {total_lines_searched}")
|
|
formatted_lines.append(f"Total matched lines: {len(all_results)}")
|
|
formatted_lines.append(f"Total weight score: {total_weight_score:.2f}")
|
|
formatted_lines.append(f"Match rate: {(len(all_results)/total_lines_searched*100):.2f}%" if total_lines_searched > 0 else "Match rate: 0.00%")
|
|
formatted_lines.append("")
|
|
|
|
# Statistics by file
|
|
formatted_lines.append("=== Statistics by File ===")
|
|
for file_path, stats in sorted(file_match_stats.items(), key=lambda x: x[1]['weight_score'], reverse=True):
|
|
file_name = os.path.basename(file_path)
|
|
formatted_lines.append(f"File: {file_name}")
|
|
formatted_lines.append(f" Matched lines: {len(stats['lines_matched'])}")
|
|
formatted_lines.append(f" Weight score: {stats['weight_score']:.2f}")
|
|
formatted_lines.append("")
|
|
|
|
# Statistics by pattern
|
|
formatted_lines.append("=== Statistics by Pattern ===")
|
|
for pattern, stats in sorted(pattern_match_stats.items(), key=lambda x: x[1]['weight_score'], reverse=True):
|
|
formatted_lines.append(f"Pattern: {pattern}")
|
|
formatted_lines.append(f" Match count: {stats['match_count']}")
|
|
formatted_lines.append(f" Matched lines: {len(stats['lines_matched'])}")
|
|
formatted_lines.append(f" Weight score: {stats['weight_score']:.2f}")
|
|
formatted_lines.append("")
|
|
|
|
formatted_output = "\n".join(formatted_lines)
|
|
|
|
return {
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": formatted_output
|
|
}
|
|
]
|
|
}
|
|
|
|
|
|
def search(patterns: List[Dict[str, Any]], file_paths: List[str],
|
|
limit: int = 10, case_sensitive: bool = False) -> Dict[str, Any]:
|
|
"""Run multi-pattern search (keywords and regex) with required weights"""
|
|
if not patterns:
|
|
return {
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": "Error: Search pattern list cannot be empty"
|
|
}
|
|
]
|
|
}
|
|
|
|
# Parse search patterns and weights
|
|
try:
|
|
parsed_patterns = parse_patterns_with_weights(patterns)
|
|
except ValueError as e:
|
|
return {
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": str(e)
|
|
}
|
|
]
|
|
}
|
|
|
|
if not parsed_patterns:
|
|
return {
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": "Error: No valid search patterns"
|
|
}
|
|
]
|
|
}
|
|
|
|
if not file_paths:
|
|
return {
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": "Error: File path list cannot be empty"
|
|
}
|
|
]
|
|
}
|
|
|
|
# Preprocess and validate regex patterns in the search patterns
|
|
valid_patterns = []
|
|
regex_errors = []
|
|
|
|
for pattern_info in parsed_patterns:
|
|
pattern = pattern_info['pattern']
|
|
compiled = compile_pattern(pattern)
|
|
if compiled is None:
|
|
regex_errors.append(pattern)
|
|
else:
|
|
valid_patterns.append({
|
|
'pattern': pattern,
|
|
'weight': pattern_info['weight'],
|
|
'compiled_pattern': compiled
|
|
})
|
|
|
|
if regex_errors:
|
|
error_msg = f"Warning: The following regular expressions failed to compile and will be ignored: {', '.join(regex_errors)}"
|
|
print(error_msg)
|
|
|
|
# Validate file paths
|
|
valid_paths = []
|
|
for file_path in file_paths:
|
|
try:
|
|
# Resolve file paths, supporting folder/document.txt and document.txt formats
|
|
resolved_path = resolve_file_path(file_path)
|
|
valid_paths.append(resolved_path)
|
|
except Exception as e:
|
|
continue
|
|
|
|
if not valid_paths:
|
|
return {
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": f"Error: Specified files not found in project directory {get_allowed_directory()}"
|
|
}
|
|
]
|
|
}
|
|
|
|
# Collect all match results
|
|
all_results = []
|
|
|
|
for file_path in valid_paths:
|
|
try:
|
|
results = search_patterns_in_file(file_path, valid_patterns, case_sensitive)
|
|
all_results.extend(results)
|
|
except Exception as e:
|
|
continue
|
|
|
|
# Sort by weight score in descending order, then by match count when scores are equal
|
|
all_results.sort(key=lambda x: (x.get('weight_score', 0), x['match_count']), reverse=True)
|
|
|
|
# Limit the number of results
|
|
limited_results = all_results[:limit]
|
|
|
|
# Format the output
|
|
if not limited_results:
|
|
return {
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": "No matching results found"
|
|
}
|
|
]
|
|
}
|
|
|
|
# Enhance formatted output by showing total matched lines on the first line, followed by weight score, match type, and details
|
|
formatted_lines = []
|
|
|
|
# Show total match count and displayed result count on the first line
|
|
total_matches = len(all_results)
|
|
showing_count = len(limited_results)
|
|
summary_line = f"Found {total_matches} matches, showing top {showing_count} results:"
|
|
formatted_lines.append(summary_line)
|
|
|
|
# Add formatted search results
|
|
for result in limited_results:
|
|
weight_score = result.get('weight_score', 0)
|
|
line_prefix = f"{result['line_number']}:weight({weight_score:.2f}):"
|
|
|
|
# Build match details
|
|
match_details = []
|
|
for pattern in result['matched_patterns']:
|
|
if pattern['type'] == 'regex':
|
|
match_details.append(f"[regex:{pattern['original']}={pattern['match']}]")
|
|
else:
|
|
match_details.append(f"[keyword:{pattern['match']}]")
|
|
|
|
match_info = " ".join(match_details) if match_details else ""
|
|
formatted_line = f"{line_prefix}{match_info}:{result['content']}" if match_info else f"{line_prefix}{result['content']}"
|
|
formatted_lines.append(formatted_line)
|
|
|
|
formatted_output = "\n".join(formatted_lines)
|
|
|
|
return {
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": formatted_output
|
|
}
|
|
]
|
|
}
|
|
|
|
|
|
def search_patterns_in_file(file_path: str, patterns: List[Dict[str, Any]],
|
|
case_sensitive: bool) -> List[Dict[str, Any]]:
|
|
"""Search patterns in a single file, supporting keywords, regular expressions, and weighted scoring."""
|
|
results = []
|
|
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
|
|
lines = f.readlines()
|
|
except Exception as e:
|
|
return results
|
|
|
|
# Preprocess all patterns, including weight information
|
|
processed_patterns = []
|
|
for pattern_info in patterns:
|
|
compiled = pattern_info['compiled_pattern']
|
|
if compiled is not None: # Skip invalid regular expressions
|
|
processed_patterns.append({
|
|
'original': pattern_info['pattern'],
|
|
'pattern': compiled,
|
|
'is_regex': isinstance(compiled, re.Pattern),
|
|
'weight': pattern_info['weight']
|
|
})
|
|
|
|
for line_number, line in enumerate(lines, 1):
|
|
line_content = line.rstrip('\n\r')
|
|
search_line = line_content if case_sensitive else line_content.lower()
|
|
|
|
# Count matched patterns and compute weighted scores
|
|
matched_patterns = []
|
|
weight_score = 0.0
|
|
|
|
for pattern_info in processed_patterns:
|
|
pattern = pattern_info['pattern']
|
|
is_regex = pattern_info['is_regex']
|
|
weight = pattern_info['weight']
|
|
|
|
match_found = False
|
|
match_details = None
|
|
match_count_in_line = 0
|
|
|
|
if is_regex:
|
|
# Regular expression matching
|
|
if case_sensitive:
|
|
matches = list(pattern.finditer(line_content))
|
|
else:
|
|
# For case-insensitive regex, recompile it
|
|
if isinstance(pattern, re.Pattern):
|
|
# Create a case-insensitive version
|
|
flags = pattern.flags | re.IGNORECASE
|
|
case_insensitive_pattern = re.compile(pattern.pattern, flags)
|
|
matches = list(case_insensitive_pattern.finditer(line_content))
|
|
else:
|
|
# For string patterns, convert to lowercase before matching
|
|
search_pattern = pattern.lower() if isinstance(pattern, str) else pattern
|
|
matches = list(re.finditer(search_pattern, search_line))
|
|
|
|
if matches:
|
|
match_found = True
|
|
match_details = matches[0].group(0)
|
|
# Repeated regex matches only count once for weighting
|
|
match_count_in_line = 1
|
|
else:
|
|
# Plain string matching
|
|
search_keyword = pattern if case_sensitive else pattern.lower()
|
|
if search_keyword in search_line:
|
|
match_found = True
|
|
match_details = pattern
|
|
# Repeated keyword matches only count once for weighting
|
|
match_count_in_line = 1
|
|
|
|
if match_found:
|
|
# Compute this pattern's weight contribution (weight * match count)
|
|
pattern_weight_score = weight * match_count_in_line
|
|
weight_score += pattern_weight_score
|
|
|
|
matched_patterns.append({
|
|
'original': pattern_info['original'],
|
|
'type': 'regex' if is_regex else 'keyword',
|
|
'match': match_details,
|
|
'weight': weight,
|
|
'match_count': match_count_in_line,
|
|
'weight_score': pattern_weight_score
|
|
})
|
|
|
|
if weight_score > 0:
|
|
results.append({
|
|
'line_number': line_number,
|
|
'content': line_content,
|
|
'match_count': len(matched_patterns),
|
|
'weight_score': weight_score,
|
|
'matched_patterns': matched_patterns,
|
|
'file_path': file_path
|
|
})
|
|
|
|
return results
|
|
|
|
|
|
|
|
|
|
def regex_grep(patterns: Union[str, List[str]], file_paths: List[str], context_lines: int = 0,
|
|
case_sensitive: bool = False, limit: int = 50) -> Dict[str, Any]:
|
|
"""Search file contents with regular expressions, supporting multiple patterns and context lines."""
|
|
# Handle pattern input
|
|
if isinstance(patterns, str):
|
|
patterns = [patterns]
|
|
|
|
# Validate pattern list
|
|
if not patterns or not any(p.strip() for p in patterns):
|
|
return {
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": "Error: Patterns cannot be empty"
|
|
}
|
|
]
|
|
}
|
|
|
|
# Filter out empty patterns
|
|
patterns = [p.strip() for p in patterns if p.strip()]
|
|
|
|
if not file_paths:
|
|
return {
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": "Error: File path list cannot be empty"
|
|
}
|
|
]
|
|
}
|
|
|
|
# Compile regular expressions
|
|
compiled_patterns = []
|
|
for pattern in patterns:
|
|
try:
|
|
flags = 0 if case_sensitive else re.IGNORECASE
|
|
compiled_pattern = re.compile(pattern, flags)
|
|
compiled_patterns.append((pattern, compiled_pattern))
|
|
except re.error as e:
|
|
# For invalid regular expressions, skip them but log a warning
|
|
print(f"Warning: Invalid regular expression '{pattern}': {str(e)}, skipping...")
|
|
continue
|
|
|
|
if not compiled_patterns:
|
|
return {
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": "Error: No valid regular expressions found"
|
|
}
|
|
]
|
|
}
|
|
|
|
# Validate file paths
|
|
valid_paths = []
|
|
for file_path in file_paths:
|
|
try:
|
|
# Resolve file paths, supporting folder/document.txt and document.txt formats
|
|
resolved_path = resolve_file_path(file_path)
|
|
valid_paths.append(resolved_path)
|
|
except Exception as e:
|
|
continue
|
|
|
|
if not valid_paths:
|
|
return {
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": f"Error: Specified files not found in project directory {get_allowed_directory()}"
|
|
}
|
|
]
|
|
}
|
|
|
|
# Collect all match results
|
|
all_results = []
|
|
|
|
for file_path in valid_paths:
|
|
try:
|
|
for pattern, compiled_pattern in compiled_patterns:
|
|
results = regex_search_in_file(file_path, compiled_pattern, context_lines, case_sensitive, pattern)
|
|
all_results.extend(results)
|
|
except Exception as e:
|
|
continue
|
|
|
|
# Sort by file path and line number
|
|
all_results.sort(key=lambda x: (x['file_path'], x['match_line_number']))
|
|
|
|
# Limit the number of results
|
|
limited_results = all_results[:limit]
|
|
|
|
# Format the output
|
|
if not limited_results:
|
|
return {
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": "No matches found"
|
|
}
|
|
]
|
|
}
|
|
|
|
# Format the output
|
|
formatted_lines = []
|
|
|
|
# Show total match count and pattern count
|
|
total_matches = len(all_results)
|
|
showing_count = len(limited_results)
|
|
summary_line = f"Found {total_matches} matches for {len(compiled_patterns)} patterns, showing top {showing_count} results:"
|
|
formatted_lines.append(summary_line)
|
|
|
|
# Display results grouped by file
|
|
current_file = None
|
|
for result in limited_results:
|
|
file_path = result['file_path']
|
|
if file_path != current_file:
|
|
current_file = file_path
|
|
file_name = os.path.basename(file_path)
|
|
formatted_lines.append(f"\n--- File: {file_name} ---")
|
|
|
|
match_line = result['match_line_number']
|
|
match_text = result['match_text']
|
|
matched_content = result['matched_content']
|
|
pattern = result.get('pattern', 'unknown')
|
|
|
|
# Display the matched line and pattern
|
|
formatted_lines.append(f"{match_line}[pattern: {pattern}]:{matched_content}")
|
|
|
|
# Display context lines
|
|
if 'context_before' in result:
|
|
for context_line in result['context_before']:
|
|
formatted_lines.append(f"{context_line['line_number']}:{context_line['content']}")
|
|
|
|
if 'context_after' in result:
|
|
for context_line in result['context_after']:
|
|
formatted_lines.append(f"{context_line['line_number']}:{context_line['content']}")
|
|
|
|
formatted_output = "\n".join(formatted_lines)
|
|
|
|
return {
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": formatted_output
|
|
}
|
|
]
|
|
}
|
|
|
|
|
|
def regex_grep_count(patterns: Union[str, List[str]], file_paths: List[str],
|
|
case_sensitive: bool = False) -> Dict[str, Any]:
|
|
"""Count regex matches, supporting multiple patterns."""
|
|
# Handle pattern input
|
|
if isinstance(patterns, str):
|
|
patterns = [patterns]
|
|
|
|
# Validate pattern list
|
|
if not patterns or not any(p.strip() for p in patterns):
|
|
return {
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": "Error: Patterns cannot be empty"
|
|
}
|
|
]
|
|
}
|
|
|
|
# Filter out empty patterns
|
|
patterns = [p.strip() for p in patterns if p.strip()]
|
|
|
|
if not file_paths:
|
|
return {
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": "Error: File path list cannot be empty"
|
|
}
|
|
]
|
|
}
|
|
|
|
# Compile regular expressions
|
|
compiled_patterns = []
|
|
for pattern in patterns:
|
|
try:
|
|
flags = 0 if case_sensitive else re.IGNORECASE
|
|
compiled_pattern = re.compile(pattern, flags)
|
|
compiled_patterns.append((pattern, compiled_pattern))
|
|
except re.error as e:
|
|
# For invalid regular expressions, skip them but log a warning
|
|
print(f"Warning: Invalid regular expression '{pattern}': {str(e)}, skipping...")
|
|
continue
|
|
|
|
if not compiled_patterns:
|
|
return {
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": "Error: No valid regular expressions found"
|
|
}
|
|
]
|
|
}
|
|
|
|
# Validate file paths
|
|
valid_paths = []
|
|
for file_path in file_paths:
|
|
try:
|
|
# Resolve file paths, supporting folder/document.txt and document.txt formats
|
|
resolved_path = resolve_file_path(file_path)
|
|
valid_paths.append(resolved_path)
|
|
except Exception as e:
|
|
continue
|
|
|
|
if not valid_paths:
|
|
return {
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": f"Error: Specified files not found in project directory {get_allowed_directory()}"
|
|
}
|
|
]
|
|
}
|
|
|
|
# Count match results
|
|
total_matches = 0
|
|
total_lines_with_matches = 0
|
|
file_stats = {}
|
|
pattern_stats = {}
|
|
|
|
# Initialize pattern statistics
|
|
for pattern, _ in compiled_patterns:
|
|
pattern_stats[pattern] = {
|
|
'matches': 0,
|
|
'lines_with_matches': 0
|
|
}
|
|
|
|
for file_path in valid_paths:
|
|
file_name = os.path.basename(file_path)
|
|
file_matches = 0
|
|
file_lines_with_matches = 0
|
|
|
|
try:
|
|
for pattern, compiled_pattern in compiled_patterns:
|
|
matches, lines_with_matches = regex_count_in_file(file_path, compiled_pattern, case_sensitive)
|
|
total_matches += matches
|
|
total_lines_with_matches += lines_with_matches
|
|
file_matches += matches
|
|
file_lines_with_matches = max(file_lines_with_matches, lines_with_matches) # Avoid double-counting line totals
|
|
|
|
# Update pattern statistics
|
|
pattern_stats[pattern]['matches'] += matches
|
|
pattern_stats[pattern]['lines_with_matches'] += lines_with_matches
|
|
|
|
file_stats[file_name] = {
|
|
'matches': file_matches,
|
|
'lines_with_matches': file_lines_with_matches
|
|
}
|
|
except Exception as e:
|
|
continue
|
|
|
|
# Format the output
|
|
formatted_lines = []
|
|
formatted_lines.append("=== Regex Match Statistics ===")
|
|
formatted_lines.append(f"Patterns: {', '.join([p for p, _ in compiled_patterns])}")
|
|
formatted_lines.append(f"Files searched: {len(valid_paths)}")
|
|
formatted_lines.append(f"Total matches: {total_matches}")
|
|
formatted_lines.append(f"Total lines with matches: {total_lines_with_matches}")
|
|
formatted_lines.append("")
|
|
|
|
# Statistics by pattern
|
|
formatted_lines.append("=== Statistics by Pattern ===")
|
|
for pattern, stats in sorted(pattern_stats.items()):
|
|
formatted_lines.append(f"Pattern: {pattern}")
|
|
formatted_lines.append(f" Matches: {stats['matches']}")
|
|
formatted_lines.append(f" Lines with matches: {stats['lines_with_matches']}")
|
|
formatted_lines.append("")
|
|
|
|
# Statistics by file
|
|
formatted_lines.append("=== Statistics by File ===")
|
|
for file_name, stats in sorted(file_stats.items()):
|
|
formatted_lines.append(f"File: {file_name}")
|
|
formatted_lines.append(f" Matches: {stats['matches']}")
|
|
formatted_lines.append(f" Lines with matches: {stats['lines_with_matches']}")
|
|
formatted_lines.append("")
|
|
|
|
formatted_output = "\n".join(formatted_lines)
|
|
|
|
return {
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": formatted_output
|
|
}
|
|
]
|
|
}
|
|
|
|
|
|
def regex_search_in_file(file_path: str, pattern: re.Pattern,
|
|
context_lines: int, case_sensitive: bool, pattern_str: str = None) -> List[Dict[str, Any]]:
|
|
"""Search a single file with a regex and optional context."""
|
|
results = []
|
|
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
|
|
lines = f.readlines()
|
|
except Exception as e:
|
|
return results
|
|
|
|
for line_number, line in enumerate(lines, 1):
|
|
line_content = line.rstrip('\n\r')
|
|
|
|
# Search for matches
|
|
matches = list(pattern.finditer(line_content))
|
|
if matches:
|
|
# Prepare context
|
|
context_before = []
|
|
context_after = []
|
|
|
|
if context_lines > 0:
|
|
# Get preceding context
|
|
start_line = max(0, line_number - 1 - context_lines)
|
|
for i in range(start_line, line_number - 1):
|
|
if i < len(lines):
|
|
context_before.append({
|
|
'line_number': i + 1,
|
|
'content': lines[i].rstrip('\n\r')
|
|
})
|
|
|
|
# Get following context
|
|
end_line = min(len(lines), line_number + context_lines)
|
|
for i in range(line_number, end_line):
|
|
if i < len(lines):
|
|
context_after.append({
|
|
'line_number': i + 1,
|
|
'content': lines[i].rstrip('\n\r')
|
|
})
|
|
|
|
# Create a result for each match
|
|
for match in matches:
|
|
result = {
|
|
'file_path': file_path,
|
|
'match_line_number': line_number,
|
|
'match_text': line_content,
|
|
'matched_content': match.group(0),
|
|
'pattern': pattern_str or 'unknown',
|
|
'start_pos': match.start(),
|
|
'end_pos': match.end()
|
|
}
|
|
|
|
if context_before:
|
|
result['context_before'] = context_before
|
|
|
|
if context_after:
|
|
result['context_after'] = context_after
|
|
|
|
results.append(result)
|
|
|
|
return results
|
|
|
|
|
|
def regex_count_in_file(file_path: str, pattern: re.Pattern,
|
|
case_sensitive: bool) -> tuple[int, int]:
|
|
"""Count matches in a file."""
|
|
total_matches = 0
|
|
lines_with_matches = 0
|
|
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
|
|
lines = f.readlines()
|
|
except Exception as e:
|
|
return total_matches, lines_with_matches
|
|
|
|
for line_number, line in enumerate(lines, 1):
|
|
line_content = line.rstrip('\n\r')
|
|
|
|
# Search for matches
|
|
matches = list(pattern.finditer(line_content))
|
|
if matches:
|
|
total_matches += len(matches)
|
|
lines_with_matches += 1
|
|
|
|
return total_matches, lines_with_matches
|
|
|
|
|
|
async def handle_request(request: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Handle MCP request"""
|
|
try:
|
|
method = request.get("method")
|
|
params = request.get("params", {})
|
|
request_id = request.get("id")
|
|
|
|
if method == "initialize":
|
|
return create_initialize_response(request_id, "multi-keyword-search")
|
|
|
|
elif method == "ping":
|
|
return create_ping_response(request_id)
|
|
|
|
elif method == "tools/list":
|
|
# Load tool definitions from the JSON file
|
|
tools = load_tools_from_json("multi_keyword_search_tools.json")
|
|
return create_tools_list_response(request_id, tools)
|
|
|
|
elif method == "tools/call":
|
|
tool_name = params.get("name")
|
|
arguments = params.get("arguments", {})
|
|
|
|
if tool_name == "search":
|
|
patterns = arguments.get("patterns", [])
|
|
file_paths = arguments.get("file_paths", [])
|
|
limit = arguments.get("limit", 10)
|
|
case_sensitive = arguments.get("case_sensitive", False)
|
|
|
|
result = search(patterns, file_paths, limit, case_sensitive)
|
|
|
|
return {
|
|
"jsonrpc": "2.0",
|
|
"id": request_id,
|
|
"result": result
|
|
}
|
|
|
|
elif tool_name == "search_count":
|
|
patterns = arguments.get("patterns", [])
|
|
file_paths = arguments.get("file_paths", [])
|
|
case_sensitive = arguments.get("case_sensitive", False)
|
|
|
|
result = search_count(patterns, file_paths, case_sensitive)
|
|
|
|
return {
|
|
"jsonrpc": "2.0",
|
|
"id": request_id,
|
|
"result": result
|
|
}
|
|
|
|
elif tool_name == "regex_grep":
|
|
patterns = arguments.get("patterns", [])
|
|
# Backward-compatible support for the legacy pattern parameter
|
|
if not patterns and "pattern" in arguments:
|
|
patterns = arguments.get("pattern", "")
|
|
file_paths = arguments.get("file_paths", [])
|
|
context_lines = arguments.get("context_lines", 0)
|
|
case_sensitive = arguments.get("case_sensitive", False)
|
|
limit = arguments.get("limit", 50)
|
|
|
|
result = regex_grep(patterns, file_paths, context_lines, case_sensitive, limit)
|
|
|
|
return {
|
|
"jsonrpc": "2.0",
|
|
"id": request_id,
|
|
"result": result
|
|
}
|
|
|
|
elif tool_name == "regex_grep_count":
|
|
patterns = arguments.get("patterns", [])
|
|
# Backward-compatible support for the legacy pattern parameter
|
|
if not patterns and "pattern" in arguments:
|
|
patterns = arguments.get("pattern", "")
|
|
file_paths = arguments.get("file_paths", [])
|
|
case_sensitive = arguments.get("case_sensitive", False)
|
|
|
|
result = regex_grep_count(patterns, file_paths, case_sensitive)
|
|
|
|
return {
|
|
"jsonrpc": "2.0",
|
|
"id": request_id,
|
|
"result": result
|
|
}
|
|
|
|
else:
|
|
return create_error_response(request_id, -32601, f"Unknown tool: {tool_name}")
|
|
|
|
else:
|
|
return create_error_response(request_id, -32601, f"Unknown method: {method}")
|
|
|
|
except Exception as e:
|
|
return create_error_response(request.get("id"), -32603, f"Internal error: {str(e)}")
|
|
|
|
|
|
async def main():
|
|
"""Main entry point."""
|
|
await handle_mcp_streaming(handle_request)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|