Merge origin/dev into dev

This commit is contained in:
csh28 2026-05-11 20:56:47 +08:00
commit 6d9bcc0400
16 changed files with 1469 additions and 39 deletions

View File

@ -43,6 +43,7 @@ from .mem0_middleware import create_mem0_middleware
from .mem0_config import Mem0Config
from agent.prompt_loader import load_system_prompt_async, load_mcp_settings_async
from agent.agent_memory_cache import get_memory_cache_manager
from .subagent_loader import load_subagents
from .checkpoint_manager import get_checkpointer_manager
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from langgraph.checkpoint.memory import InMemorySaver
@ -64,6 +65,8 @@ from deepagents.graph import BASE_AGENT_PROMPT
from deepagents_cli.local_context import LocalContextMiddleware
# Custom: FilesystemMiddleware with full SKILL.md reading support
from .custom_filesystem_middleware import CustomFilesystemMiddleware
# Sub-agent support
from deepagents.middleware.subagents import SubAgent, SubAgentMiddleware
# Global MemorySaver instance
# from langgraph.checkpoint.memory import MemorySaver
@ -308,6 +311,15 @@ async def init_agent(config: AgentConfig):
sandbox, sandbox_type, workspace_root = await sandbox_task
logger.info(f"init_agent sandbox ready, elapsed: {time.time() - create_start:.3f}s")
# Load sub-agents from skill directories
subagents = await load_subagents(
bot_id=config.bot_id,
tools=mcp_tools,
model=llm_instance,
)
if subagents:
logger.info(f"Loaded {len(subagents)} sub-agents: {[s['name'] for s in subagents]}")
agent, composite_backend = create_custom_cli_agent(
model=llm_instance,
assistant_id=config.bot_id,
@ -319,6 +331,7 @@ async def init_agent(config: AgentConfig):
checkpointer=checkpointer,
sandbox=sandbox,
sandbox_type=sandbox_type,
subagents=subagents if subagents else None,
shell_env={
"ASSISTANT_ID": config.bot_id,
"USER_IDENTIFIER": config.user_identifier,
@ -385,6 +398,7 @@ def create_custom_cli_agent(
checkpointer: Checkpointer | None = None,
store: BaseStore | None = None,
shell_env: dict[str, str] | None = None,
subagents: list[SubAgent] | None = None,
) -> tuple[Pregel, CompositeBackend]:
"""Create a CLI-configured agent with custom workspace_root for shell commands.
@ -521,9 +535,19 @@ def create_custom_cli_agent(
TodoListMiddleware(),
FilePathFixMiddleware(), # Fix extra spaces in CJK file names within tool call arguments
CustomFilesystemMiddleware(backend=composite_backend), # Use the custom FilesystemMiddleware with full SKILL.md reading support
]
# Insert SubAgentMiddleware after FilesystemMiddleware (matches create_deep_agent ordering)
if subagents:
subagent_middleware = SubAgentMiddleware(
backend=composite_backend,
subagents=subagents,
)
deepagent_middleware.append(subagent_middleware)
logger.info(f"SubAgentMiddleware added with {len(subagents)} sub-agents: {[s['name'] for s in subagents]}")
deepagent_middleware.extend([
AnthropicPromptCachingMiddleware(unsupported_model_behavior="ignore"),
PatchToolCallsMiddleware(),
]
])
if agent_middleware:
deepagent_middleware.extend(agent_middleware)
if interrupt_on is not None:

188
agent/subagent_loader.py Normal file
View File

@ -0,0 +1,188 @@
"""Sub-agent loader for discovering and parsing sub-agent definitions from skill directories.
Sub-agents are defined as markdown files with YAML frontmatter in skill directories:
projects/robot/{bot_id}/skills/{skill_name}/agents/*.md
Each file has the format:
---
name: code-reviewer
description: Reviews code for quality and security issues.
tools: rag_retrieve, table_rag_retrieve
---
System prompt for the sub-agent...
"""
import logging
import os
import re
from pathlib import Path
from typing import Optional
import yaml
from deepagents.middleware.subagents import SubAgent
from langchain.tools import BaseTool
from langchain_core.language_models import BaseChatModel
from agent.plugin_hook_loader import _get_skill_dirs
logger = logging.getLogger('app')
# Regex to extract YAML frontmatter and body from markdown files
_FRONTMATTER_RE = re.compile(r"^---\s*\n(.*?)\n---\s*\n?(.*)$", re.DOTALL)
def _parse_agent_md(file_path: Path) -> Optional[dict]:
"""Parse a sub-agent markdown file with YAML frontmatter.
Args:
file_path: Path to the .md file.
Returns:
Dict with keys: name, description, system_prompt, tool_names (list[str] | None).
None if parsing fails.
"""
try:
content = file_path.read_text(encoding="utf-8")
except OSError as e:
logger.warning(f"Failed to read sub-agent file {file_path}: {e}")
return None
match = _FRONTMATTER_RE.match(content)
if not match:
logger.warning(f"Sub-agent file {file_path} has no valid frontmatter")
return None
frontmatter_str, body = match.group(1), match.group(2)
try:
frontmatter = yaml.safe_load(frontmatter_str)
except yaml.YAMLError as e:
logger.warning(f"Invalid YAML in sub-agent file {file_path}: {e}")
return None
if not isinstance(frontmatter, dict):
logger.warning(f"Frontmatter in {file_path} is not a dict")
return None
name = frontmatter.get("name", "").strip() if isinstance(frontmatter.get("name"), str) else ""
description = frontmatter.get("description", "").strip() if isinstance(frontmatter.get("description"), str) else ""
if not name:
logger.warning(f"Sub-agent file {file_path} missing required 'name' field")
return None
if not description:
logger.warning(f"Sub-agent file {file_path} missing required 'description' field")
return None
# Parse optional tools field: comma-separated tool names
tool_names = None
tools_field = frontmatter.get("tools")
if tools_field is not None:
if isinstance(tools_field, str):
tool_names = [t.strip() for t in tools_field.split(",") if t.strip()]
elif isinstance(tools_field, list):
tool_names = [str(t).strip() for t in tools_field if str(t).strip()]
else:
logger.warning(f"Invalid 'tools' field in {file_path}, expected string or list")
return {
"name": name,
"description": description,
"system_prompt": body.strip(),
"tool_names": tool_names,
"source": str(file_path),
}
def _filter_tools_by_names(all_tools: list[BaseTool], tool_names: list[str]) -> list[BaseTool]:
"""Filter MCP tools by name whitelist.
Args:
all_tools: All available MCP tools.
tool_names: Whitelist of tool names to include.
Returns:
Filtered list of tools. Logs warning for names not found.
"""
tool_lookup = {tool.name: tool for tool in all_tools}
filtered = []
for name in tool_names:
if name in tool_lookup:
filtered.append(tool_lookup[name])
else:
available = list(tool_lookup.keys())
logger.warning(f"Sub-agent tool '{name}' not found in MCP tools. Available: {available}")
return filtered
async def load_subagents(
bot_id: str,
tools: list[BaseTool],
model: BaseChatModel,
) -> list[SubAgent]:
"""Load sub-agent definitions from skill directories.
Scans all skill directories for the given bot_id, looking for agents/*.md files
in each skill subdirectory.
Args:
bot_id: Bot identifier for locating skill directories.
tools: All available MCP tools for filtering.
model: The main agent's model, used by each sub-agent.
Returns:
List of SubAgent dicts. Empty list if no sub-agents found.
"""
skill_dirs = _get_skill_dirs(bot_id)
parsed_agents: dict[str, dict] = {} # name -> parsed dict (last-wins for dedup)
for skill_dir in skill_dirs:
if not os.path.exists(skill_dir):
continue
for skill_name in os.listdir(skill_dir):
skill_path = os.path.join(skill_dir, skill_name)
if not os.path.isdir(skill_path):
continue
agents_dir = Path(skill_path) / "agents"
if not agents_dir.exists():
continue
for md_file in agents_dir.glob("*.md"):
parsed = _parse_agent_md(md_file)
if parsed is None:
continue
name = parsed["name"]
if name in parsed_agents:
logger.warning(
f"Duplicate sub-agent name '{name}': "
f"{parsed_agents[name]['source']} overridden by {parsed['source']}"
)
parsed_agents[name] = parsed
if not parsed_agents:
return []
# Build SubAgent dicts with model and filtered tools
subagents: list[SubAgent] = []
for name, parsed in parsed_agents.items():
# Filter tools: if tool_names specified, filter; otherwise inherit all
if parsed["tool_names"] is not None:
filtered_tools = _filter_tools_by_names(tools, parsed["tool_names"])
else:
filtered_tools = list(tools)
subagent: SubAgent = {
"name": name,
"description": parsed["description"],
"system_prompt": parsed["system_prompt"],
"model": model,
"tools": filtered_tools,
}
subagents.append(subagent)
logger.info(f"Loaded sub-agent '{name}' with {len(filtered_tools)} tools from {parsed['source']}")
return subagents

View File

@ -37,26 +37,37 @@ Execute **sequentially, one at a time**. Do NOT run in parallel. Do NOT probe fi
## 4. Retrieval Breadth (`top_k`)
- Apply `top_k` only to `rag_retrieve`. Use smallest sufficient value, expand if insufficient.
- `30` for simple fact lookup → `50` for moderate synthesis/comparison → `100` for broad recall (comprehensive analysis, scattered knowledge, multi-entity, list/catalog/timeline).
- Expansion order: `30 → 50 → 100`. If unsure, use `100`.
- Apply `top_k` only to `rag_retrieve`. Choose the appropriate value upfront to maximize first-call success.
- Use `50` for simple fact lookup or moderate synthesis, comparison, summarization, disambiguation.
- Use `100` for broad recall (comprehensive analysis, scattered knowledge, multi-entity, list/catalog/timeline).
- If unsure, use `50`. Only escalate to `100` on the retry call if first results are insufficient.
## 5. Result Evaluation
Treat as insufficient if: empty, `Error:`, `no excel files found`, off-topic, missing core entity/scope, no usable evidence, partial coverage, or truncated results.
**Maximum 3 retrieval calls per question.** After each call, evaluate immediately:
### Sufficient — answer now
- The core entity/topic in the user's question has been hit.
- There is direct evidence supporting the main intent of the question.
- Partial but usable coverage is sufficient — you do NOT need exhaustive or perfect coverage to answer.
- **When results are sufficient, compose the answer immediately. Do NOT call retrieval again to "double-check" or "get more context".**
### Insufficient — retry
- Empty, `Error:`, `no excel files found`, off-topic, missing core entity/scope, no usable evidence at all.
## 6. Fallback and Sequential Retry
On insufficient results, follow this sequence:
On insufficient results, you may retry **up to 2 more times** (3 calls total):
1. Rewrite query, retry same tool (once)
2. Switch to next retrieval source in default order
3. For `rag_retrieve`, expand `top_k`: `30 → 50 → 100`
4. `table_rag_retrieve` insufficient → try `rag_retrieve`; `rag_retrieve` insufficient → try `table_rag_retrieve`
1. Rewrite query, retry same tool.
2. Switch to next retrieval source in default order.
3. For `rag_retrieve`, escalate `top_k` to `100` on retry.
4. `table_rag_retrieve` insufficient → try `rag_retrieve`; `rag_retrieve` insufficient → try `table_rag_retrieve`.
- `table_rag_retrieve` internally falls back to `rag_retrieve` on `no excel files found`, but this does NOT change the higher-level order.
- Say "no relevant information was found" **only after** exhausting all retrieval sources.
- Do NOT switch to local filesystem inspection at any point.
- Do NOT call any retrieval tool more than 3 times in total.
## 7. Table RAG Result Handling
@ -99,7 +110,8 @@ This section applies only when self-knowledge is enabled.
Before replying to a knowledge retrieval task, verify:
- Used only whitelisted retrieval tools — no local filesystem inspection?
- Exhausted retrieval flow before concluding "not found"?
- Called retrieval at most 3 times total (not more)?
- Answered immediately when results were sufficient (did NOT call again unnecessarily)?
- Citations placed immediately after each relevant paragraph?
- If self-knowledge was used, was it clearly separated from retrieved facts and limited to allowed supplement scope?

View File

@ -37,26 +37,37 @@ Execute **sequentially, one at a time**. Do NOT run in parallel. Do NOT probe fi
## 4. Retrieval Breadth (`top_k`)
- Apply `top_k` only to `rag_retrieve`. Use smallest sufficient value, expand if insufficient.
- `30` for simple fact lookup → `50` for moderate synthesis/comparison → `100` for broad recall (comprehensive analysis, scattered knowledge, multi-entity, list/catalog/timeline).
- Expansion order: `30 → 50 → 100`. If unsure, use `100`.
- Apply `top_k` only to `rag_retrieve`. Choose the appropriate value upfront to maximize first-call success.
- Use `50` for simple fact lookup or moderate synthesis, comparison, summarization, disambiguation.
- Use `100` for broad recall (comprehensive analysis, scattered knowledge, multi-entity, list/catalog/timeline).
- If unsure, use `50`. Only escalate to `100` on the retry call if first results are insufficient.
## 5. Result Evaluation
Treat as insufficient if: empty, `Error:`, `no excel files found`, off-topic, missing core entity/scope, no usable evidence, partial coverage, or truncated results.
**Maximum 3 retrieval calls per question.** After each call, evaluate immediately:
### Sufficient — answer now
- The core entity/topic in the user's question has been hit.
- There is direct evidence supporting the main intent of the question.
- Partial but usable coverage is sufficient — you do NOT need exhaustive or perfect coverage to answer.
- **When results are sufficient, compose the answer immediately. Do NOT call retrieval again to "double-check" or "get more context".**
### Insufficient — retry
- Empty, `Error:`, `no excel files found`, off-topic, missing core entity/scope, no usable evidence at all.
## 6. Fallback and Sequential Retry
On insufficient results, follow this sequence:
On insufficient results, you may retry **up to 2 more times** (3 calls total):
1. Rewrite query, retry same tool (once)
2. Switch to next retrieval source in default order
3. For `rag_retrieve`, expand `top_k`: `30 → 50 → 100`
4. `table_rag_retrieve` insufficient → try `rag_retrieve`; `rag_retrieve` insufficient → try `table_rag_retrieve`
1. Rewrite query, retry same tool.
2. Switch to next retrieval source in default order.
3. For `rag_retrieve`, escalate `top_k` to `100` on retry.
4. `table_rag_retrieve` insufficient → try `rag_retrieve`; `rag_retrieve` insufficient → try `table_rag_retrieve`.
- `table_rag_retrieve` internally falls back to `rag_retrieve` on `no excel files found`, but this does NOT change the higher-level order.
- Say "no relevant information was found" **only after** exhausting all retrieval sources.
- Do NOT switch to local filesystem inspection at any point.
- Do NOT call any retrieval tool more than 3 times in total.
## 7. Table RAG Result Handling
@ -99,7 +110,8 @@ This section applies only when self-knowledge is enabled.
Before replying to a knowledge retrieval task, verify:
- Used only whitelisted retrieval tools — no local filesystem inspection?
- Exhausted retrieval flow before concluding "not found"?
- Called retrieval at most 3 times total (not more)?
- Answered immediately when results were sufficient (did NOT call again unnecessarily)?
- Citations placed immediately after each relevant paragraph?
- If self-knowledge was used, was it clearly separated from retrieved facts and limited to allowed supplement scope?

View File

@ -0,0 +1,21 @@
{
"name": "pmda-drug-info",
"description": "PMDA drug information tools for Japanese pharmaceutical package insert queries. Provides drug search, master info, interactions, restrictions, dosing, and full-text chapter retrieval via PostgreSQL + OpenSearch.",
"hooks": {
"PrePrompt": [
{
"type": "command",
"command": "python hooks/pre_prompt.py"
}
]
},
"mcpServers": {
"pmda_drug_info": {
"transport": "stdio",
"command": "python",
"args": [
"./pmda_server.py"
]
}
}
}

View File

@ -0,0 +1,31 @@
---
name: adverse_event
description: Reverse lookup drugs by adverse event name. Find which drugs have reported a specific side effect.
Invoke when the user asks "Which drugs cause Stevens-Johnson syndrome?" or "Drugs that prolong QT interval?".
Causal inference is prohibited — information presentation only.
tools: search_section_text, search_drugs, get_drug_master, list_drug_chapters, read_drug_chapter
---
あなたは「副作用 → 該当薬剤の逆引き」専門の sub-agent です。
【ツール戦略】
1. `search_section_text(keyword=副作用名, section_filter="副作用")` で逆引き。
total_drugs は必ず本文中に明示する。
2. 同義語が必要なケース:
"Stevens-Johnson" ⇔ "皮膚粘膜眼症候群" / "SJS"
"QT延長" ⇔ "Torsades de pointes"
"間質性肺炎" ⇔ "肺臓炎"
OS の synonym filter が自動展開するので 1 回の検索で OK。
3. hit から代表薬を 3〜5 件選び、`read_drug_chapter` で 11.1 重大な副作用 / 11.2 その他の副作用
verbatim を引用。
4. 因果推論("この薬がこの患者の症状を起こした")は **絶対しない**
情報提示のみ。
【絶対ルール】
1. ツール呼び出し必須。トレーニング知識・教科書・ガイドラインからの推測は禁止。
2. 数値・固有名・条件は本文表現を改変せず逐語引用。
3. 出典は **必ず** `[出典: <販売名> (yj_full=<id>) / <章番号 章タイトル>]` の形式。
- fact 表 row には `_citation` フィールドが入っているので **そのまま転記**
- `[出典: 薬品マスター]` `[出典: 添付文書]` 等の汎用出典は **絶対禁止**
- read_drug_chapter で実際に読んだ section 以外の出典を捏造しない。
4. 該当情報が無ければ "添付文書からは確認できません" と書く。

View File

@ -0,0 +1,28 @@
---
name: interaction
description: Investigate drug-drug interactions between two drugs, or list all interactions for a single drug.
Invoke when the user asks "Can drug A and B be used together?" or "What are the interactions of drug A?".
tools: search_drugs, get_drug_master, get_drug_interactions, search_section_text, list_drug_chapters, read_drug_chapter
---
あなたは「薬剤間相互作用」専門の sub-agent です。
【ツール戦略】
- A・B 両薬の yj_code を `search_drugs` で取得。
- `get_drug_interactions(drug_a_yj=A, drug_b_yj=B)` で双方向検索A→B も B→A も拾える)。
- ヒットしたら drug_a の側の出典 section10.1 / 10.2)を `list_drug_chapters` + `read_drug_chapter`
verbatim 取得。drug_b 側にも該当記載があるか確認。
- ヒットゼロ → "添付文書上は併用禁忌・併用注意の明確な記載なし" と書く(自由記述/警告等は
別途 `search_section_text(keyword=B薬名, section_filter="相互作用")` で念押し)。
- 1 薬名のみ与えられた場合は `get_drug_interactions(drug_a_yj=...)` で全相互作用一覧。
severity は本文の "併用禁忌" / "併用注意" の語をそのまま転記。
【絶対ルール】
1. ツール呼び出し必須。トレーニング知識・教科書・ガイドラインからの推測は禁止。
2. 数値・固有名・条件は本文表現を改変せず逐語引用。
3. 出典は **必ず** `[出典: <販売名> (yj_full=<id>) / <章番号 章タイトル>]` の形式。
- fact 表 row には `_citation` フィールドが入っているので **そのまま転記**
- `[出典: 薬品マスター]` `[出典: 添付文書]` 等の汎用出典は **絶対禁止**
- read_drug_chapter で実際に読んだ section 以外の出典を捏造しない。
4. 該当情報が無ければ "添付文書からは確認できません" と書く。

View File

@ -0,0 +1,32 @@
---
name: patient_specific
description: Determine drug administration feasibility and dosage adjustment for specific patient conditions (renal impairment, hepatic impairment, pregnancy, elderly, pediatric, allergy).
Invoke when the user asks "Can this drug be used in a patient with eGFR 25?", "Is it contraindicated in pregnancy?", etc.
tools: search_drugs, get_drug_master, get_drug_restrictions, get_drug_dosing, list_drug_chapters, read_drug_chapter
---
あなたは「特定患者への投与可否・用量調整」専門の sub-agent です。
【ツール戦略】
1. 薬名から yj_code を `search_drugs` で取得。
2. 患者条件を condition_type に対応付け:
- 腎機能 (eGFR/CrCl) → "腎機能障害"
- 肝機能 (Child-Pugh) → "肝機能障害"
- 妊娠/授乳 → "妊婦"/"授乳婦"
- 年齢 (小児/高齢) → "小児等"/"高齢者"
- アレルギー既往 → "過敏症"
- 合併症 (糖尿病/喘息など) → "疾患"
3. `get_drug_restrictions(drug_yj=..., condition_type=...)` で該当 restriction を取得。
condition_params の数値(例: {"eGFR_max": 30})を必ず確認。
4. `get_drug_dosing(drug_yj=..., patient_segment=...)` で患者層別用量を取得。
5. 必要なら原文 `read_drug_chapter` で 9.x 章 verbatim 引用。
6. 数値判定(例: eGFR=25 ⇔ eGFR_max=30 → 該当)を agent が責任もって行う。
【絶対ルール】
1. ツール呼び出し必須。トレーニング知識・教科書・ガイドラインからの推測は禁止。
2. 数値・固有名・条件は本文表現を改変せず逐語引用。
3. 出典は **必ず** `[出典: <販売名> (yj_full=<id>) / <章番号 章タイトル>]` の形式。
- fact 表 row には `_citation` フィールドが入っているので **そのまま転記**
- `[出典: 薬品マスター]` `[出典: 添付文書]` 等の汎用出典は **絶対禁止**
- read_drug_chapter で実際に読んだ section 以外の出典を捏造しない。
4. 該当情報が無ければ "添付文書からは確認できません" と書く。

View File

@ -0,0 +1,26 @@
---
name: single_drug
description: Answer factual questions about a single drug (brand name, generic name, indications, dosing, contraindications, side effects, etc.).
Invoke when the question is focused on one drug and requires detailed information from the package insert.
tools: search_drugs, get_drug_master, get_drug_dosing, get_drug_restrictions, list_drug_chapters, read_drug_chapter
---
あなたは「単一薬の事実回答」専門の sub-agent です。
【ツール戦略】
1. 質問から薬名/yj_code を特定 → `search_drugs` または直接 yj_code が分かれば次へ。
2. `get_drug_master(yj_code)` で基本情報(販売名・一般名・薬効分類・規制)を確定。
3. 必要に応じて `get_drug_dosing` で用法用量、`get_drug_restrictions(drug_yj=...)` で禁忌・特定患者注意。
4. 自由記述や上記テーブルに無い情報(例: 重大な副作用一覧、薬物動態の数値)は
`list_drug_chapters(yj_full)``read_drug_chapter(yj_full, section_title)` で原文取得。
最終回答は箇条書き or 表で、各事実に出典を付ける。
【絶対ルール】
1. ツール呼び出し必須。トレーニング知識・教科書・ガイドラインからの推測は禁止。
2. 数値・固有名・条件は本文表現を改変せず逐語引用。
3. 出典は **必ず** `[出典: <販売名> (yj_full=<id>) / <章番号 章タイトル>]` の形式。
- fact 表 row には `_citation` フィールドが入っているので **そのまま転記**
- `[出典: 薬品マスター]` `[出典: 添付文書]` 等の汎用出典は **絶対禁止**
- read_drug_chapter で実際に読んだ section 以外の出典を捏造しない。
4. 該当情報が無ければ "添付文書からは確認できません" と書く。

View File

@ -0,0 +1,22 @@
# PMDA Drug Information Tools
You have access to Japanese pharmaceutical package insert (添付文書) data via the following tools.
## Core Rules
- **Tool calls are mandatory.** Never answer from training knowledge alone. All facts must come from tool results.
- Cite sources in the format: `[出典: <販売名> (yj_full=<id>) / <章番号 章タイトル>]`
- Fact table rows include a `_citation` field — use it directly.
- Generic citations like `[出典: 薬品マスター]` or `[出典: 添付文書]` are **prohibited**.
- For urgent questions (suicide/drug abuse/severe acute symptoms), state: "緊急対応として担当医・薬剤師に直接相談してください"
## When to Use Sub-agents (task tool)
- **patient_specific**: Renal/hepatic/pregnancy/elderly/pediatric/allergy conditions × dosing decisions
- **interaction**: Pairwise drug interaction investigation
- **adverse_event**: Reverse lookup from adverse event name to drugs
- **single_drug**: Detailed info not in fact tables (e.g., full adverse event list, pharmacokinetics)
## Direct Tool Usage (do NOT delegate)
- Simple lookups → use tools directly
- Multi-drug comparisons → call tools sequentially, output as markdown table
- Symptom → candidate drug reverse lookup → `search_section_text`
- Mechanism/pharmacokinetics → `list_drug_chapters` + `read_drug_chapter`

View File

@ -0,0 +1,18 @@
#!/usr/bin/env python3
"""
PrePrompt hook for PMDA drug info skill.
Injects usage instructions for the drug information tools.
"""
import sys
from pathlib import Path
def main():
prompt_file = Path(__file__).parent / "pmda-instructions.md"
if prompt_file.exists():
print(prompt_file.read_text(encoding="utf-8"))
return 0
if __name__ == '__main__':
sys.exit(main())

View File

@ -0,0 +1,252 @@
#!/usr/bin/env python3
"""
Shared utility functions for the MCP server.
Provides common functionality for path handling, file validation, and request processing.
"""
import json
import os
import sys
import asyncio
from typing import Any, Dict, List, Optional, Union
import re
def get_allowed_directory():
"""Get the directory that is allowed to be accessed."""
# Prefer dataset_dir passed through command-line arguments.
if len(sys.argv) > 1:
dataset_dir = sys.argv[1]
return os.path.abspath(dataset_dir)
# Read the project data directory from the environment variable.
project_dir = os.getenv("PROJECT_DATA_DIR", "./projects/data")
return os.path.abspath(project_dir)
def resolve_file_path(file_path: str, default_subfolder: str = "default") -> str:
"""
Resolve a file path, supporting both folder/document.txt and document.txt formats.
Args:
file_path: Input file path.
default_subfolder: Default subfolder name to use when only a filename is provided.
Returns:
The resolved full file path.
"""
# If the path contains a folder separator, use it directly.
if '/' in file_path or '\\' in file_path:
clean_path = file_path.replace('\\', '/')
# Remove the projects/ prefix if it exists.
if clean_path.startswith('projects/'):
clean_path = clean_path[9:] # Remove the 'projects/' prefix.
elif clean_path.startswith('./projects/'):
clean_path = clean_path[11:] # Remove the './projects/' prefix.
else:
# If only a filename is provided, add the default subfolder.
clean_path = f"{default_subfolder}/{file_path}"
# Get the allowed directory.
project_data_dir = get_allowed_directory()
# Try to locate the file directly under the project directory.
full_path = os.path.join(project_data_dir, clean_path.lstrip('./'))
if os.path.exists(full_path):
return full_path
# If the direct path does not exist, try a recursive search.
found = find_file_in_project(clean_path, project_data_dir)
if found:
return found
# If this is a bare filename and it was not found under the default subfolder,
# try looking in the project root.
if '/' not in file_path and '\\' not in file_path:
root_path = os.path.join(project_data_dir, file_path)
if os.path.exists(root_path):
return root_path
raise FileNotFoundError(f"File not found: {file_path} (searched in {project_data_dir})")
def find_file_in_project(filename: str, project_dir: str) -> Optional[str]:
"""Recursively search for a file inside the project directory."""
# If filename includes a path, only search within the specified path.
if '/' in filename:
parts = filename.split('/')
target_file = parts[-1]
search_dir = os.path.join(project_dir, *parts[:-1])
if os.path.exists(search_dir):
target_path = os.path.join(search_dir, target_file)
if os.path.exists(target_path):
return target_path
else:
# For a bare filename, recursively search the whole project directory.
for root, dirs, files in os.walk(project_dir):
if filename in files:
return os.path.join(root, filename)
return None
def load_tools_from_json(tools_file_name: str) -> List[Dict[str, Any]]:
"""Load tool definitions from a JSON file."""
try:
tools_file = os.path.join(os.path.dirname(__file__), tools_file_name)
if os.path.exists(tools_file):
with open(tools_file, 'r', encoding='utf-8') as f:
return json.load(f)
else:
# If the JSON file does not exist, use the default definitions.
return []
except Exception as e:
print(f"Warning: Unable to load tool definition JSON file: {str(e)}")
return []
def create_error_response(request_id: Any, code: int, message: str) -> Dict[str, Any]:
"""Create a standardized error response."""
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": code,
"message": message
}
}
def create_success_response(request_id: Any, result: Any) -> Dict[str, Any]:
"""Create a standardized success response."""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": result
}
def create_initialize_response(request_id: Any, server_name: str, server_version: str = "1.0.0") -> Dict[str, Any]:
"""Create a standardized initialize response."""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {}
},
"serverInfo": {
"name": server_name,
"version": server_version
}
}
}
def create_ping_response(request_id: Any) -> Dict[str, Any]:
"""Create a standardized ping response."""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"pong": True
}
}
def create_tools_list_response(request_id: Any, tools: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Create a standardized tools/list response."""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"tools": tools
}
}
def is_regex_pattern(pattern: str) -> bool:
"""Check whether a string should be treated as a regular expression pattern."""
# Check the /pattern/ format.
if pattern.startswith('/') and pattern.endswith('/') and len(pattern) > 2:
return True
# Check the r"pattern" or r'pattern' format.
if pattern.startswith(('r"', "r'")) and pattern.endswith(('"', "'")) and len(pattern) > 3:
return True
# Check whether it contains regex metacharacters.
regex_chars = {'*', '+', '?', '|', '(', ')', '[', ']', '{', '}', '^', '$', '\\', '.'}
return any(char in pattern for char in regex_chars)
def compile_pattern(pattern: str) -> Union[re.Pattern, str, None]:
"""Compile a regex pattern, or return the original string if it is not regex."""
if not is_regex_pattern(pattern):
return pattern
try:
# Handle the /pattern/ format.
if pattern.startswith('/') and pattern.endswith('/'):
regex_body = pattern[1:-1]
return re.compile(regex_body)
# Handle the r"pattern" or r'pattern' format.
if pattern.startswith(('r"', "r'")) and pattern.endswith(('"', "'")):
regex_body = pattern[2:-1]
return re.compile(regex_body)
# Directly compile strings that contain regex metacharacters.
return re.compile(pattern)
except re.error as e:
# If compilation fails, return None to indicate an invalid regex.
print(f"Warning: Regular expression '{pattern}' compilation failed: {e}")
return None
async def handle_mcp_streaming(request_handler):
"""Handle the standard main loop for MCP requests."""
try:
while True:
# Read from stdin
line = await asyncio.get_event_loop().run_in_executor(None, sys.stdin.readline)
if not line:
break
line = line.strip()
if not line:
continue
try:
request = json.loads(line)
response = await request_handler(request)
# Write to stdout
sys.stdout.write(json.dumps(response, ensure_ascii=False) + "\n")
sys.stdout.flush()
except json.JSONDecodeError:
error_response = {
"jsonrpc": "2.0",
"error": {
"code": -32700,
"message": "Parse error"
}
}
sys.stdout.write(json.dumps(error_response, ensure_ascii=False) + "\n")
sys.stdout.flush()
except Exception as e:
error_response = {
"jsonrpc": "2.0",
"error": {
"code": -32603,
"message": f"Internal error: {str(e)}"
}
}
sys.stdout.write(json.dumps(error_response, ensure_ascii=False) + "\n")
sys.stdout.flush()
except KeyboardInterrupt:
pass

View File

@ -0,0 +1,533 @@
#!/usr/bin/env python3
"""
PMDA drug information MCP server (mock data version).
Provides drug search, master info, interactions, restrictions, dosing,
and full-text chapter retrieval with mock data for testing.
"""
import asyncio
import json
import sys
from typing import Any, Dict, Optional
from mcp_common import (
create_error_response,
create_initialize_response,
create_ping_response,
create_tools_list_response,
load_tools_from_json,
handle_mcp_streaming,
)
def _dump(obj) -> str:
return json.dumps(obj, ensure_ascii=False)
# ---------------------------------------------------------------------------
# Mock data
# ---------------------------------------------------------------------------
MOCK_DRUG_MASTER = {
"2149039F1082": {
"yj_code": "2149039F1082",
"yj_full": "2149039F1082_1_17",
"brand_name": "ロサルタンK錠50mg「科研」",
"generic_name": "ロサルタンカリウム",
"category_code": "214",
"category_name": "アンジオテンシンII受容体拮抗薬",
"regulation": "劇薬, 処方箋医薬品",
"manufacturer": "科研製薬株式会社",
"revision_date": "2024-06",
},
"3399007H1021": {
"yj_code": "3399007H1021",
"yj_full": "3399007H1021_1_21",
"brand_name": "バイアスピリン錠100mg",
"generic_name": "アスピリン",
"category_code": "339",
"category_name": "血液・体液用薬",
"regulation": "処方箋医薬品",
"manufacturer": "バイエル薬品株式会社",
"revision_date": "2024-03",
},
"2179004F1026": {
"yj_code": "2179004F1026",
"yj_full": "2179004F1026_1_14",
"brand_name": "ルバスク錠5mg",
"generic_name": "アムロジピンベシル酸塩",
"category_code": "217",
"category_name": "カルシウム拮抗薬",
"regulation": "処方箋医薬品",
"manufacturer": "ファイザー株式会社",
"revision_date": "2024-01",
},
}
MOCK_CATEGORIES = [
{"category_code": "214", "category_name": "アンジオテンシンII受容体拮抗薬", "level": "L2", "drug_count": 35},
{"category_code": "217", "category_name": "カルシウム拮抗薬", "level": "L2", "drug_count": 48},
{"category_code": "339", "category_name": "血液・体液用薬", "level": "L2", "drug_count": 22},
{"category_code": "612", "category_name": "消化性潰瘍用剤", "level": "L2", "drug_count": 40},
]
MOCK_INTERACTIONS = [
{
"drug_a_yj": "2149039F1082",
"drug_b_yj": "3399007H1021",
"drug_b_class": "アスピリン(抗血小板剤)",
"severity": "併用注意",
"mechanism": "ARBの降圧作用を減弱するおそれがある。また、腎機能低下・高カリウム血症のリスクを増大。",
"clinical_effect": "降圧効果の減弱、腎機能悪化、高カリウム血症に注意。",
"source_drug_yj": "2149039F1082",
"source_section": "10.2 併用注意",
},
{
"drug_a_yj": "3399007H1021",
"drug_b_yj": "2149039F1082",
"drug_b_class": "ロサルタンカリウムARB",
"severity": "併用注意",
"mechanism": "アスピリンの副作用(消化性潰瘍、腎機能低下)を増強するおそれ。",
"clinical_effect": "消化性潰瘍、腎機能低下に注意。血清カリウム値の上昇に注意。",
"source_drug_yj": "3399007H1021",
"source_section": "10.2 併用注意",
},
]
MOCK_RESTRICTIONS = [
{
"drug_yj": "2149039F1082",
"condition_type": "腎機能障害",
"condition_text": "腎機能障害患者",
"condition_params": {"eGFR_max": 30},
"severity": "慎重投与",
"source_section": "9.2 腎機能障害患者",
},
{
"drug_yj": "2149039F1082",
"condition_type": "妊婦",
"condition_text": "妊娠中の女性",
"condition_params": {},
"severity": "禁忌",
"source_section": "9.5 妊婦",
},
{
"drug_yj": "2149039F1082",
"condition_type": "高齢者",
"condition_text": "高齢者65歳以上",
"condition_params": {},
"severity": "慎重投与",
"source_section": "9.8 高齢者",
},
{
"drug_yj": "3399007H1021",
"condition_type": "過敏症",
"condition_text": "本剤の成分に対し過敏症の既往歴のある患者",
"condition_params": {},
"severity": "禁忌",
"source_section": "2. 禁忌",
},
]
MOCK_DOSING = [
{
"drug_yj": "2149039F1082",
"patient_segment": "成人",
"segment_params": {},
"indication_code": "高血圧症",
"dose_amount": "50",
"dose_unit": "mg",
"frequency": "1日1回",
"duration": "",
"adjustment_text": "効果不十分な場合は100mgまで増量可",
"source_section": "6. 用法及び用量",
},
{
"drug_yj": "2149039F1082",
"patient_segment": "腎機能障害患者",
"segment_params": {"eGFR_max": 30},
"indication_code": "高血圧症",
"dose_amount": "25",
"dose_unit": "mg",
"frequency": "1日1回",
"duration": "",
"adjustment_text": "eGFR 30以下では用量を減ずること。血清カリウム・クレアチニンの推移に注意。",
"source_section": "9.2 腎機能障害患者",
},
]
MOCK_CHAPTERS = {
"2149039F1082_1_17": [
{"section_title": "1. 警告", "line_num": 1, "text_len": 120},
{"section_title": "2. 禁忌", "line_num": 5, "text_len": 80},
{"section_title": "4. 効能・効果", "line_num": 12, "text_len": 60},
{"section_title": "6. 用法及び用量", "line_num": 20, "text_len": 150},
{"section_title": "9.2 腎機能障害患者", "line_num": 45, "text_len": 200},
{"section_title": "9.5 妊婦", "line_num": 52, "text_len": 180},
{"section_title": "9.8 高齢者", "line_num": 60, "text_len": 100},
{"section_title": "10.2 併用注意", "line_num": 75, "text_len": 350},
{"section_title": "11.1 重大な副作用", "line_num": 90, "text_len": 400},
{"section_title": "11.2 その他の副作用", "line_num": 110, "text_len": 300},
],
"3399007H1021_1_21": [
{"section_title": "1. 警告", "line_num": 1, "text_len": 100},
{"section_title": "2. 禁忌", "line_num": 4, "text_len": 90},
{"section_title": "4. 効能・効果", "line_num": 10, "text_len": 55},
{"section_title": "6. 用法及び用量", "line_num": 18, "text_len": 130},
{"section_title": "10.2 併用注意", "line_num": 70, "text_len": 300},
{"section_title": "11.1 重大な副作用", "line_num": 85, "text_len": 450},
{"section_title": "11.2 その他の副作用", "line_num": 105, "text_len": 280},
],
}
MOCK_SECTION_TEXT = {
("2149039F1082_1_17", "9.2 腎機能障害患者"): (
"9.2 腎機能障害患者\n"
"腎機能障害患者eGFR 30 mL/min/1.73m²以下)には、ロサルタンカリウムの"
"投与開始用量を25mg/日とし、血清カリウム及び血清クレアチニンの推移に"
"十分注意すること。\n"
"【理由】腎機能障害患者では、本剤の投与により急速に腎機能が悪化する"
"おそれがある。また、高カリウム血症があらわれやすい。"
),
("2149039F1082_1_17", "9.5 妊婦"): (
"9.5 妊婦\n"
"妊婦又は妊娠している可能性のある女性には投与しないこと。\n"
"【理由】妊娠中期・末期にレニン-アンジオテンシン系に作用する薬剤を"
"投与された患者では、胎児の腎機能低下、羊水過少症、頭蓋の発育不全、"
"肺低形成等があらわれるおそれがある。"
),
("2149039F1082_1_17", "10.2 併用注意"): (
"10.2 併用注意\n"
"・アスピリン(抗血小板剤)\n"
" 【リスク】ARBの降圧作用を減弱するおそれがある。\n"
" 腎機能低下・高カリウム血症のリスクを増大。\n"
" 【措置】降圧効果の減弱、腎機能悪化、高カリウム血症に注意すること。"
),
("2149039F1082_1_17", "11.1 重大な副作用"): (
"11.1 重大な副作用\n"
"・血管浮腫(頻度不明):顔面、口唇、咽頭、舌等の腫脹があらわれた場合には"
"直ちに投与を中止し、適切な処置を行うこと。\n"
"・高カリウム血症0.1%未満):血清カリウム値の上昇があらわれることがある。\n"
"・腎機能悪化0.1%未満BUN、クレアチニンの上昇があらわれることがある。"
),
("3399007H1021_1_21", "10.2 併用注意"): (
"10.2 併用注意\n"
"・ロサルタンカリウムARB\n"
" 【リスク】アスピリンの副作用(消化性潰瘍、腎機能低下)を増強するおそれ。\n"
" 【措置】消化性潰瘍、腎機能低下に注意。血清カリウム値の上昇に注意すること。"
),
("3399007H1021_1_21", "11.1 重大な副作用"): (
"11.1 重大な副作用\n"
"・ショック、アナフィラキシー(頻度不明):呼吸困難、血圧低下等があらわれた\n"
" 場合には直ちに投与を中止し、適切な処置を行うこと。\n"
"・消化性潰瘍0.1%未満):出血、穿孔があらわれることがある。\n"
"・腎機能障害0.1%未満):急性腎不全があらわれることがある。"
),
}
def _citation(drug_yj: str, section: Optional[str]) -> str:
drug = MOCK_DRUG_MASTER.get(drug_yj, {})
brand = drug.get("brand_name", "")
yj_full = drug.get("yj_full", drug_yj)
chap = section or "(章不明)"
return f"[出典: {brand} (yj_full={yj_full}) / {chap}]"
# ---------------------------------------------------------------------------
# Tool implementations (mock)
# ---------------------------------------------------------------------------
def _tool_search_drugs(query: str, kind: str = "auto", limit: int = 10) -> str:
results = []
for code, d in MOCK_DRUG_MASTER.items():
q = query.lower()
if (kind == "brand" and q in d["brand_name"].lower()) or \
(kind == "generic" and q in d["generic_name"].lower()) or \
(kind == "yj" and (q in d["yj_code"].lower() or q in d["yj_full"].lower())) or \
(kind == "auto" and (q in d["brand_name"].lower() or q in d["generic_name"].lower()
or q in d["yj_code"].lower() or q in d["yj_full"].lower())):
results.append({
"yj_full": d["yj_full"],
"yj_code": d["yj_code"],
"brand": d["brand_name"],
"generic": d["generic_name"],
"category": f"{d['category_code']} {d['category_name']}",
"score": 1.0,
})
return _dump(results[:limit])
def _tool_list_categories() -> str:
return _dump(MOCK_CATEGORIES)
def _tool_list_drugs_in_category(l2_code: str, limit_generics: int = 50) -> str:
results = []
seen_generics = set()
for code, d in MOCK_DRUG_MASTER.items():
if d["category_code"].startswith(l2_code) and d["generic_name"] not in seen_generics:
seen_generics.add(d["generic_name"])
results.append({
"generic_name": d["generic_name"],
"brands": [{"yj_code": d["yj_code"], "brand_name": d["brand_name"], "yj_full": d["yj_full"]}],
})
return _dump(results[:limit_generics])
def _tool_get_drug_master(yj_code: str) -> str:
d = MOCK_DRUG_MASTER.get(yj_code)
if not d:
return _dump({"error": f"yj_code {yj_code} not found"})
result = dict(d)
result["_citation"] = f"[出典: {d['brand_name']} (yj_full={d['yj_full']}) / 添付文書冒頭]"
return _dump(result)
def _tool_get_drug_interactions(
drug_a_yj: Optional[str] = None,
drug_b_yj: Optional[str] = None,
severity: Optional[str] = None,
keyword: Optional[str] = None,
limit: int = 30,
) -> str:
results = []
for r in MOCK_INTERACTIONS:
if drug_a_yj and r["drug_a_yj"] != drug_a_yj:
continue
if drug_b_yj and r["drug_b_yj"] != drug_b_yj:
continue
if severity and r["severity"] != severity:
continue
if keyword and keyword.lower() not in (
(r.get("drug_b_class") or "").lower()
+ (r.get("mechanism") or "").lower()
+ (r.get("clinical_effect") or "").lower()
):
continue
results.append({**r, "_citation": _citation(r["source_drug_yj"], r["source_section"])})
return _dump(results[:limit])
def _tool_get_drug_restrictions(
drug_yj: Optional[str] = None,
condition_type: Optional[str] = None,
severity: Optional[str] = None,
keyword: Optional[str] = None,
limit: int = 30,
) -> str:
results = []
for r in MOCK_RESTRICTIONS:
if drug_yj and r["drug_yj"] != drug_yj:
continue
if condition_type and r["condition_type"] != condition_type:
continue
if severity and r["severity"] != severity:
continue
if keyword and keyword.lower() not in (r.get("condition_text") or "").lower():
continue
results.append({**r, "_citation": _citation(r["drug_yj"], r["source_section"])})
return _dump(results[:limit])
def _tool_get_drug_dosing(
drug_yj: str,
patient_segment: Optional[str] = None,
limit: int = 20,
) -> str:
results = []
for r in MOCK_DOSING:
if r["drug_yj"] != drug_yj:
continue
if patient_segment and r["patient_segment"] != patient_segment:
continue
results.append({**r, "_citation": _citation(drug_yj, r["source_section"])})
return _dump(results[:limit])
def _tool_search_section_text(
keyword: str,
section_filter: str = "",
limit: int = 30,
) -> str:
if not keyword.strip():
return _dump({"keyword": keyword, "total_drugs": 0, "shown": 0, "hits": []})
# Simple mock: search through section text
hits_out = []
for (yj_full, section_title), text in MOCK_SECTION_TEXT.items():
if section_filter and section_filter not in section_title:
continue
if keyword.lower() in text.lower():
drug = None
for d in MOCK_DRUG_MASTER.values():
if d["yj_full"] == yj_full:
drug = d
break
if not drug:
continue
brand = drug["brand_name"]
# Deduplicate by yj_full
existing = [h for h in hits_out if h["yj_full"] == yj_full]
if existing:
existing[0]["matches"].append({
"section_title": section_title,
"snippet": text[:160],
})
continue
hits_out.append({
"yj_full": yj_full,
"brand": brand,
"generic": drug["generic_name"],
"l2": f"{drug['category_code']} {drug['category_name']}",
"matches": [{"section_title": section_title, "snippet": text[:160]}],
"_citation_template": f"[出典: {brand} (yj_full={yj_full}) / <該当章>]",
})
return _dump({
"keyword": keyword,
"section_filter": section_filter or None,
"total_drugs": len({h["yj_full"] for h in hits_out}),
"shown": len(hits_out),
"hits": hits_out[:limit],
})
def _tool_list_drug_chapters(yj_full: str) -> str:
sections = MOCK_CHAPTERS.get(yj_full)
if not sections:
return _dump({"error": f"yj_full {yj_full} の章節が見つかりません。"})
drug = None
for d in MOCK_DRUG_MASTER.values():
if d["yj_full"] == yj_full:
drug = d
break
return _dump({
"yj_full": yj_full,
"brand": drug["brand_name"] if drug else "",
"generic": drug["generic_name"] if drug else "",
"n_sections": len(sections),
"sections": sections,
})
def _tool_read_drug_chapter(yj_full: str, section_title: str) -> str:
text = MOCK_SECTION_TEXT.get((yj_full, section_title))
if text:
return text[:8000]
return _dump({
"error": f"section_title {section_title!r}{yj_full} に存在しません。",
"hint": "list_drug_chapters で取得した sections[].section_title をそのまま渡してください。",
})
# ---------------------------------------------------------------------------
# MCP request handler
# ---------------------------------------------------------------------------
_TOOL_DISPATCH = {
"search_drugs": lambda args: _tool_search_drugs(
query=args.get("query", ""),
kind=args.get("kind", "auto"),
limit=args.get("limit", 10),
),
"list_categories": lambda args: _tool_list_categories(),
"list_drugs_in_category": lambda args: _tool_list_drugs_in_category(
l2_code=args.get("l2_code", ""),
limit_generics=args.get("limit_generics", 50),
),
"get_drug_master": lambda args: _tool_get_drug_master(
yj_code=args.get("yj_code", ""),
),
"get_drug_interactions": lambda args: _tool_get_drug_interactions(
drug_a_yj=args.get("drug_a_yj"),
drug_b_yj=args.get("drug_b_yj"),
severity=args.get("severity"),
keyword=args.get("keyword"),
limit=args.get("limit", 30),
),
"get_drug_restrictions": lambda args: _tool_get_drug_restrictions(
drug_yj=args.get("drug_yj"),
condition_type=args.get("condition_type"),
severity=args.get("severity"),
keyword=args.get("keyword"),
limit=args.get("limit", 30),
),
"get_drug_dosing": lambda args: _tool_get_drug_dosing(
drug_yj=args.get("drug_yj", ""),
patient_segment=args.get("patient_segment"),
limit=args.get("limit", 20),
),
"search_section_text": lambda args: _tool_search_section_text(
keyword=args.get("keyword", ""),
section_filter=args.get("section_filter", ""),
limit=args.get("limit", 30),
),
"list_drug_chapters": lambda args: _tool_list_drug_chapters(
yj_full=args.get("yj_full", ""),
),
"read_drug_chapter": lambda args: _tool_read_drug_chapter(
yj_full=args.get("yj_full", ""),
section_title=args.get("section_title", ""),
),
}
async def handle_request(request: Dict[str, Any]) -> Dict[str, Any]:
"""Handle an MCP request."""
try:
method = request.get("method")
params = request.get("params", {})
request_id = request.get("id")
if method == "initialize":
return create_initialize_response(request_id, "pmda-drug-info")
elif method == "ping":
return create_ping_response(request_id)
elif method == "tools/list":
tools = load_tools_from_json("pmda_tools.json")
return create_tools_list_response(request_id, tools)
elif method == "tools/call":
tool_name = params.get("name")
arguments = params.get("arguments", {})
if tool_name not in _TOOL_DISPATCH:
return create_error_response(request_id, -32601, f"Unknown tool: {tool_name}")
try:
result_text = _TOOL_DISPATCH[tool_name](arguments)
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [{"type": "text", "text": result_text}]
},
}
except Exception as e:
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [{"type": "text", "text": f"Error: {str(e)}"}]
},
}
else:
return create_error_response(request_id, -32601, f"Unknown method: {method}")
except Exception as e:
return create_error_response(request.get("id"), -32603, f"Internal error: {str(e)}")
async def main():
await handle_mcp_streaming(handle_request)
if __name__ == "__main__":
asyncio.run(main())

View File

@ -0,0 +1,207 @@
[
{
"name": "search_drugs",
"description": "Search drugs by brand name, generic name, or YJ code. Returns list of matching drugs with yj_code, brand name, generic name, and category.",
"inputSchema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query: drug brand name, generic name, or YJ code."
},
"kind": {
"type": "string",
"enum": ["auto", "brand", "generic", "yj"],
"description": "Search type. 'auto' searches all fields.",
"default": "auto"
},
"limit": {
"type": "integer",
"description": "Maximum number of results.",
"default": 10
}
},
"required": ["query"]
}
},
{
"name": "list_categories",
"description": "List all L1/L2 drug categories (pharmacological classification) with drug counts per category.",
"inputSchema": {
"type": "object",
"properties": {}
}
},
{
"name": "list_drugs_in_category",
"description": "List all drugs (generic → brand names) under a specific L2 pharmacological category code.",
"inputSchema": {
"type": "object",
"properties": {
"l2_code": {
"type": "string",
"description": "3-digit L2 category code."
},
"limit_generics": {
"type": "integer",
"description": "Maximum number of generic names to return.",
"default": 50
}
},
"required": ["l2_code"]
}
},
{
"name": "get_drug_master",
"description": "Get basic information for a drug by yj_code: brand name, generic name, pharmacological category, regulatory classification, manufacturer, revision date.",
"inputSchema": {
"type": "object",
"properties": {
"yj_code": {
"type": "string",
"description": "12-character YJ code."
}
},
"required": ["yj_code"]
}
},
{
"name": "get_drug_interactions",
"description": "Search drug interactions. With drug_a only: all interactions for that drug. With both drug_a and drug_b: bidirectional interaction between A and B. Filter by severity (併用禁忌/併用注意) or keyword.",
"inputSchema": {
"type": "object",
"properties": {
"drug_a_yj": {
"type": "string",
"description": "YJ code for drug A."
},
"drug_b_yj": {
"type": "string",
"description": "YJ code for drug B (optional, for pairwise lookup)."
},
"severity": {
"type": "string",
"description": "Filter by severity: '併用禁忌' or '併用注意'."
},
"keyword": {
"type": "string",
"description": "Search keyword in drug_b_class, mechanism, or clinical_effect."
},
"limit": {
"type": "integer",
"description": "Maximum number of results.",
"default": 30
}
}
}
},
{
"name": "get_drug_restrictions",
"description": "Search drug restrictions (contraindications, precautions) by patient condition. condition_type options: 疾患, 腎機能障害, 肝機能障害, 生殖能, 妊婦, 授乳婦, 小児等, 高齢者, 過敏症, 遺伝子多型, その他. severity options: 禁忌, 原則禁忌, 慎重投与.",
"inputSchema": {
"type": "object",
"properties": {
"drug_yj": {
"type": "string",
"description": "YJ code for the drug."
},
"condition_type": {
"type": "string",
"description": "Patient condition type to filter by."
},
"severity": {
"type": "string",
"description": "Filter by severity: 禁忌, 原則禁忌, or 慎重投与."
},
"keyword": {
"type": "string",
"description": "Search keyword in condition_text."
},
"limit": {
"type": "integer",
"description": "Maximum number of results.",
"default": 30
}
}
}
},
{
"name": "get_drug_dosing",
"description": "Get dosing information for a drug, optionally filtered by patient segment. patient_segment options: 成人, 小児等, 高齢者, 腎機能障害患者, 肝機能障害患者, 透析患者, 妊婦.",
"inputSchema": {
"type": "object",
"properties": {
"drug_yj": {
"type": "string",
"description": "YJ code for the drug."
},
"patient_segment": {
"type": "string",
"description": "Patient segment to filter by (e.g., 成人, 高齢者, 腎機能障害患者)."
},
"limit": {
"type": "integer",
"description": "Maximum number of results.",
"default": 20
}
},
"required": ["drug_yj"]
}
},
{
"name": "search_section_text",
"description": "Full-text search in drug package insert sections. Returns matching sections with snippets. Use section_filter to narrow by chapter title (e.g., '副作用', '禁忌', '妊婦', '相互作用').",
"inputSchema": {
"type": "object",
"properties": {
"keyword": {
"type": "string",
"description": "Search keyword."
},
"section_filter": {
"type": "string",
"description": "Filter by section title substring (e.g., '副作用', '禁忌', '妊婦').",
"default": ""
},
"limit": {
"type": "integer",
"description": "Maximum number of results.",
"default": 30
}
},
"required": ["keyword"]
}
},
{
"name": "list_drug_chapters",
"description": "List all chapter titles for a drug's package insert. Use yj_full (full YJ code with revision suffix). Returns section titles with line numbers.",
"inputSchema": {
"type": "object",
"properties": {
"yj_full": {
"type": "string",
"description": "Full YJ code (with revision suffix, e.g., 3399007H1021_1_21)."
}
},
"required": ["yj_full"]
}
},
{
"name": "read_drug_chapter",
"description": "Read the verbatim text of a specific chapter from a drug's package insert. section_title must match exactly from list_drug_chapters output.",
"inputSchema": {
"type": "object",
"properties": {
"yj_full": {
"type": "string",
"description": "Full YJ code."
},
"section_title": {
"type": "string",
"description": "Exact section title from list_drug_chapters (e.g., '9.2 腎機能障害患者', '11.1 重大な副作用')."
}
},
"required": ["yj_full", "section_title"]
}
}
]

View File

@ -35,24 +35,35 @@ Execute **sequentially, one at a time**. Do NOT run in parallel. Do NOT probe fi
## 4. Retrieval Breadth (`top_k`)
- Apply `top_k` only to `rag_retrieve`. Use smallest sufficient value, expand if insufficient.
- `30` for simple fact lookup → `50` for moderate synthesis/comparison → `100` for broad recall (comprehensive analysis, scattered knowledge, multi-entity, list/catalog/timeline).
- Expansion order: `30 → 50 → 100`. If unsure, use `100`.
- Apply `top_k` only to `rag_retrieve`. Choose the appropriate value upfront to maximize first-call success.
- Use `50` for simple fact lookup or moderate synthesis, comparison, summarization, disambiguation.
- Use `100` for broad recall (comprehensive analysis, scattered knowledge, multi-entity, list/catalog/timeline).
- If unsure, use `50`. Only escalate to `100` on the retry call if first results are insufficient.
## 5. Result Evaluation
Treat as insufficient if: empty, `Error:`, off-topic, missing core entity/scope, no usable evidence, partial coverage, or truncated results.
**Maximum 3 retrieval calls per question.** After each call, evaluate immediately:
### Sufficient — answer now
- The core entity/topic in the user's question has been hit.
- There is direct evidence supporting the main intent of the question.
- Partial but usable coverage is sufficient — you do NOT need exhaustive or perfect coverage to answer.
- **When results are sufficient, compose the answer immediately. Do NOT call retrieval again to "double-check" or "get more context".**
### Insufficient — retry
- Empty, `Error:`, off-topic, missing core entity/scope, no usable evidence at all.
## 6. Fallback and Sequential Retry
On insufficient results, follow this sequence:
On insufficient results, you may retry **up to 2 more times** (3 calls total):
1. Rewrite query, retry same tool (once)
2. Switch to next retrieval source in default order
3. For `rag_retrieve`, expand `top_k`: `30 → 50 → 100`
1. Rewrite query, retry same tool.
2. Switch to next retrieval source in default order.
3. For `rag_retrieve`, escalate `top_k` to `100` on retry.
- Say "no relevant information was found" **only after** exhausting all retrieval sources.
- Do NOT switch to local filesystem inspection at any point.
- Do NOT call any retrieval tool more than 3 times in total.
## 7. Image Handling
@ -89,7 +100,8 @@ This section applies only when self-knowledge is enabled.
Before replying to a knowledge retrieval task, verify:
- Used only whitelisted retrieval tools — no local filesystem inspection?
- Exhausted retrieval flow before concluding "not found"?
- Called retrieval at most 3 times total (not more)?
- Answered immediately when results were sufficient (did NOT call again unnecessarily)?
- Citations placed immediately after each relevant paragraph?
- If self-knowledge was used, was it clearly separated from retrieved facts and limited to allowed supplement scope?

View File

@ -35,24 +35,35 @@ Execute **sequentially, one at a time**. Do NOT run in parallel. Do NOT probe fi
## 4. Retrieval Breadth (`top_k`)
- Apply `top_k` only to `rag_retrieve`. Use smallest sufficient value, expand if insufficient.
- `30` for simple fact lookup → `50` for moderate synthesis/comparison → `100` for broad recall (comprehensive analysis, scattered knowledge, multi-entity, list/catalog/timeline).
- Expansion order: `30 → 50 → 100`. If unsure, use `100`.
- Apply `top_k` only to `rag_retrieve`. Choose the appropriate value upfront to maximize first-call success.
- Use `50` for simple fact lookup or moderate synthesis, comparison, summarization, disambiguation.
- Use `100` for broad recall (comprehensive analysis, scattered knowledge, multi-entity, list/catalog/timeline).
- If unsure, use `50`. Only escalate to `100` on the retry call if first results are insufficient.
## 5. Result Evaluation
Treat as insufficient if: empty, `Error:`, off-topic, missing core entity/scope, no usable evidence, partial coverage, or truncated results.
**Maximum 3 retrieval calls per question.** After each call, evaluate immediately:
### Sufficient — answer now
- The core entity/topic in the user's question has been hit.
- There is direct evidence supporting the main intent of the question.
- Partial but usable coverage is sufficient — you do NOT need exhaustive or perfect coverage to answer.
- **When results are sufficient, compose the answer immediately. Do NOT call retrieval again to "double-check" or "get more context".**
### Insufficient — retry
- Empty, `Error:`, off-topic, missing core entity/scope, no usable evidence at all.
## 6. Fallback and Sequential Retry
On insufficient results, follow this sequence:
On insufficient results, you may retry **up to 2 more times** (3 calls total):
1. Rewrite query, retry same tool (once)
2. Switch to next retrieval source in default order
3. For `rag_retrieve`, expand `top_k`: `30 → 50 → 100`
1. Rewrite query, retry same tool.
2. Switch to next retrieval source in default order.
3. For `rag_retrieve`, escalate `top_k` to `100` on retry.
- Say "no relevant information was found" **only after** exhausting all retrieval sources.
- Do NOT switch to local filesystem inspection at any point.
- Do NOT call any retrieval tool more than 3 times in total.
## 7. Image Handling
@ -89,7 +100,8 @@ This section applies only when self-knowledge is enabled.
Before replying to a knowledge retrieval task, verify:
- Used only whitelisted retrieval tools — no local filesystem inspection?
- Exhausted retrieval flow before concluding "not found"?
- Called retrieval at most 3 times total (not more)?
- Answered immediately when results were sufficient (did NOT call again unnecessarily)?
- Citations placed immediately after each relevant paragraph?
- If self-knowledge was used, was it clearly separated from retrieved facts and limited to allowed supplement scope?