From a92096a646f2d9bee7f4793fcb3526ea6275ce3d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E6=BD=AE?= Date: Mon, 11 May 2026 19:15:03 +0800 Subject: [PATCH] =?UTF-8?q?pmda-drug-infomock=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../developing/pmda-drug-info/pmda_server.py | 689 ++++++++---------- 1 file changed, 313 insertions(+), 376 deletions(-) diff --git a/skills/developing/pmda-drug-info/pmda_server.py b/skills/developing/pmda-drug-info/pmda_server.py index 21506f9..0255adc 100644 --- a/skills/developing/pmda-drug-info/pmda_server.py +++ b/skills/developing/pmda-drug-info/pmda_server.py @@ -1,20 +1,15 @@ #!/usr/bin/env python3 """ -PMDA drug information MCP server. +PMDA drug information MCP server (mock data version). + Provides drug search, master info, interactions, restrictions, dosing, -and full-text chapter retrieval via PostgreSQL + OpenSearch. +and full-text chapter retrieval with mock data for testing. """ import asyncio import json -import os import sys -from decimal import Decimal -from typing import Any, Dict, List, Optional - -import psycopg2 -import psycopg2.extras -from opensearchpy import OpenSearch +from typing import Any, Dict, Optional from mcp_common import ( create_error_response, @@ -25,189 +20,269 @@ from mcp_common import ( handle_mcp_streaming, ) -# --------------------------------------------------------------------------- -# Configuration from environment variables -# --------------------------------------------------------------------------- -PG_DSN = os.getenv("PMDA_PG_DSN", "") -OS_HOST = os.getenv("PMDA_OS_HOST", "localhost") -OS_PORT = int(os.getenv("PMDA_OS_PORT", "9200")) -OS_INDEX = os.getenv("PMDA_OS_INDEX", "pmda_sections") - - -def _json_default(o): - """JSON serializer for objects not serializable by default json code.""" - if isinstance(o, Decimal): - return float(o) - raise TypeError(f"non-serializable: {type(o).__name__}") - def _dump(obj) -> str: - return json.dumps(obj, ensure_ascii=False, default=_json_default) + return json.dumps(obj, ensure_ascii=False) # --------------------------------------------------------------------------- -# Lazy database connections +# Mock data # --------------------------------------------------------------------------- -_pg_conn = None -_os_client = None -# Drug lookup cache: yj_code -> (brand_name, yj_full) -_drug_lookup: Optional[Dict[str, tuple]] = None +MOCK_DRUG_MASTER = { + "2149039F1082": { + "yj_code": "2149039F1082", + "yj_full": "2149039F1082_1_17", + "brand_name": "ロサルタンK錠50mg「科研」", + "generic_name": "ロサルタンカリウム", + "category_code": "214", + "category_name": "アンジオテンシンII受容体拮抗薬", + "regulation": "劇薬, 処方箋医薬品", + "manufacturer": "科研製薬株式会社", + "revision_date": "2024-06", + }, + "3399007H1021": { + "yj_code": "3399007H1021", + "yj_full": "3399007H1021_1_21", + "brand_name": "バイアスピリン錠100mg", + "generic_name": "アスピリン", + "category_code": "339", + "category_name": "血液・体液用薬", + "regulation": "処方箋医薬品", + "manufacturer": "バイエル薬品株式会社", + "revision_date": "2024-03", + }, + "2179004F1026": { + "yj_code": "2179004F1026", + "yj_full": "2179004F1026_1_14", + "brand_name": "ノルバスク錠5mg", + "generic_name": "アムロジピンベシル酸塩", + "category_code": "217", + "category_name": "カルシウム拮抗薬", + "regulation": "処方箋医薬品", + "manufacturer": "ファイザー株式会社", + "revision_date": "2024-01", + }, +} +MOCK_CATEGORIES = [ + {"category_code": "214", "category_name": "アンジオテンシンII受容体拮抗薬", "level": "L2", "drug_count": 35}, + {"category_code": "217", "category_name": "カルシウム拮抗薬", "level": "L2", "drug_count": 48}, + {"category_code": "339", "category_name": "血液・体液用薬", "level": "L2", "drug_count": 22}, + {"category_code": "612", "category_name": "消化性潰瘍用剤", "level": "L2", "drug_count": 40}, +] -def _get_pg(): - global _pg_conn - if _pg_conn is None or _pg_conn.closed: - if not PG_DSN: - raise RuntimeError("PMDA_PG_DSN environment variable is not set") - _pg_conn = psycopg2.connect(PG_DSN) - _pg_conn.autocommit = True - return _pg_conn +MOCK_INTERACTIONS = [ + { + "drug_a_yj": "2149039F1082", + "drug_b_yj": "3399007H1021", + "drug_b_class": "アスピリン(抗血小板剤)", + "severity": "併用注意", + "mechanism": "ARBの降圧作用を減弱するおそれがある。また、腎機能低下・高カリウム血症のリスクを増大。", + "clinical_effect": "降圧効果の減弱、腎機能悪化、高カリウム血症に注意。", + "source_drug_yj": "2149039F1082", + "source_section": "10.2 併用注意", + }, + { + "drug_a_yj": "3399007H1021", + "drug_b_yj": "2149039F1082", + "drug_b_class": "ロサルタンカリウム(ARB)", + "severity": "併用注意", + "mechanism": "アスピリンの副作用(消化性潰瘍、腎機能低下)を増強するおそれ。", + "clinical_effect": "消化性潰瘍、腎機能低下に注意。血清カリウム値の上昇に注意。", + "source_drug_yj": "3399007H1021", + "source_section": "10.2 併用注意", + }, +] +MOCK_RESTRICTIONS = [ + { + "drug_yj": "2149039F1082", + "condition_type": "腎機能障害", + "condition_text": "腎機能障害患者", + "condition_params": {"eGFR_max": 30}, + "severity": "慎重投与", + "source_section": "9.2 腎機能障害患者", + }, + { + "drug_yj": "2149039F1082", + "condition_type": "妊婦", + "condition_text": "妊娠中の女性", + "condition_params": {}, + "severity": "禁忌", + "source_section": "9.5 妊婦", + }, + { + "drug_yj": "2149039F1082", + "condition_type": "高齢者", + "condition_text": "高齢者(65歳以上)", + "condition_params": {}, + "severity": "慎重投与", + "source_section": "9.8 高齢者", + }, + { + "drug_yj": "3399007H1021", + "condition_type": "過敏症", + "condition_text": "本剤の成分に対し過敏症の既往歴のある患者", + "condition_params": {}, + "severity": "禁忌", + "source_section": "2. 禁忌", + }, +] -def _get_os() -> OpenSearch: - global _os_client - if _os_client is None: - _os_client = OpenSearch( - hosts=[{"host": OS_HOST, "port": OS_PORT}], - use_ssl=False, - verify_certs=False, - ) - return _os_client +MOCK_DOSING = [ + { + "drug_yj": "2149039F1082", + "patient_segment": "成人", + "segment_params": {}, + "indication_code": "高血圧症", + "dose_amount": "50", + "dose_unit": "mg", + "frequency": "1日1回", + "duration": "", + "adjustment_text": "効果不十分な場合は100mgまで増量可", + "source_section": "6. 用法及び用量", + }, + { + "drug_yj": "2149039F1082", + "patient_segment": "腎機能障害患者", + "segment_params": {"eGFR_max": 30}, + "indication_code": "高血圧症", + "dose_amount": "25", + "dose_unit": "mg", + "frequency": "1日1回", + "duration": "", + "adjustment_text": "eGFR 30以下では用量を減ずること。血清カリウム・クレアチニンの推移に注意。", + "source_section": "9.2 腎機能障害患者", + }, +] +MOCK_CHAPTERS = { + "2149039F1082_1_17": [ + {"section_title": "1. 警告", "line_num": 1, "text_len": 120}, + {"section_title": "2. 禁忌", "line_num": 5, "text_len": 80}, + {"section_title": "4. 効能・効果", "line_num": 12, "text_len": 60}, + {"section_title": "6. 用法及び用量", "line_num": 20, "text_len": 150}, + {"section_title": "9.2 腎機能障害患者", "line_num": 45, "text_len": 200}, + {"section_title": "9.5 妊婦", "line_num": 52, "text_len": 180}, + {"section_title": "9.8 高齢者", "line_num": 60, "text_len": 100}, + {"section_title": "10.2 併用注意", "line_num": 75, "text_len": 350}, + {"section_title": "11.1 重大な副作用", "line_num": 90, "text_len": 400}, + {"section_title": "11.2 その他の副作用", "line_num": 110, "text_len": 300}, + ], + "3399007H1021_1_21": [ + {"section_title": "1. 警告", "line_num": 1, "text_len": 100}, + {"section_title": "2. 禁忌", "line_num": 4, "text_len": 90}, + {"section_title": "4. 効能・効果", "line_num": 10, "text_len": 55}, + {"section_title": "6. 用法及び用量", "line_num": 18, "text_len": 130}, + {"section_title": "10.2 併用注意", "line_num": 70, "text_len": 300}, + {"section_title": "11.1 重大な副作用", "line_num": 85, "text_len": 450}, + {"section_title": "11.2 その他の副作用", "line_num": 105, "text_len": 280}, + ], +} -def _load_drug_lookup() -> Dict[str, tuple]: - """Load yj_code -> (brand_name, yj_full) mapping from drug_master.""" - global _drug_lookup - if _drug_lookup is not None: - return _drug_lookup - conn = _get_pg() - with conn.cursor() as cur: - cur.execute("SELECT yj_code, brand_name, yj_full FROM drug_master") - _drug_lookup = { - row[0]: (row[1] or "", row[2] or row[0]) for row in cur.fetchall() - } - return _drug_lookup +MOCK_SECTION_TEXT = { + ("2149039F1082_1_17", "9.2 腎機能障害患者"): ( + "9.2 腎機能障害患者\n" + "腎機能障害患者(eGFR 30 mL/min/1.73m²以下)には、ロサルタンカリウムの" + "投与開始用量を25mg/日とし、血清カリウム及び血清クレアチニンの推移に" + "十分注意すること。\n" + "【理由】腎機能障害患者では、本剤の投与により急速に腎機能が悪化する" + "おそれがある。また、高カリウム血症があらわれやすい。" + ), + ("2149039F1082_1_17", "9.5 妊婦"): ( + "9.5 妊婦\n" + "妊婦又は妊娠している可能性のある女性には投与しないこと。\n" + "【理由】妊娠中期・末期にレニン-アンジオテンシン系に作用する薬剤を" + "投与された患者では、胎児の腎機能低下、羊水過少症、頭蓋の発育不全、" + "肺低形成等があらわれるおそれがある。" + ), + ("2149039F1082_1_17", "10.2 併用注意"): ( + "10.2 併用注意\n" + "・アスピリン(抗血小板剤)\n" + " 【リスク】ARBの降圧作用を減弱するおそれがある。\n" + " 腎機能低下・高カリウム血症のリスクを増大。\n" + " 【措置】降圧効果の減弱、腎機能悪化、高カリウム血症に注意すること。" + ), + ("2149039F1082_1_17", "11.1 重大な副作用"): ( + "11.1 重大な副作用\n" + "・血管浮腫(頻度不明):顔面、口唇、咽頭、舌等の腫脹があらわれた場合には" + "直ちに投与を中止し、適切な処置を行うこと。\n" + "・高カリウム血症(0.1%未満):血清カリウム値の上昇があらわれることがある。\n" + "・腎機能悪化(0.1%未満):BUN、クレアチニンの上昇があらわれることがある。" + ), + ("3399007H1021_1_21", "10.2 併用注意"): ( + "10.2 併用注意\n" + "・ロサルタンカリウム(ARB)\n" + " 【リスク】アスピリンの副作用(消化性潰瘍、腎機能低下)を増強するおそれ。\n" + " 【措置】消化性潰瘍、腎機能低下に注意。血清カリウム値の上昇に注意すること。" + ), + ("3399007H1021_1_21", "11.1 重大な副作用"): ( + "11.1 重大な副作用\n" + "・ショック、アナフィラキシー(頻度不明):呼吸困難、血圧低下等があらわれた\n" + " 場合には直ちに投与を中止し、適切な処置を行うこと。\n" + "・消化性潰瘍(0.1%未満):出血、穿孔があらわれることがある。\n" + "・腎機能障害(0.1%未満):急性腎不全があらわれることがある。" + ), +} def _citation(drug_yj: str, section: Optional[str]) -> str: - """Format citation string: [出典: (yj_full=) /
]""" - lk = _load_drug_lookup() - brand, yj_full = lk.get(drug_yj, ("", drug_yj)) + drug = MOCK_DRUG_MASTER.get(drug_yj, {}) + brand = drug.get("brand_name", "") + yj_full = drug.get("yj_full", drug_yj) chap = section or "(章不明)" return f"[出典: {brand} (yj_full={yj_full}) / {chap}]" # --------------------------------------------------------------------------- -# Tool implementations +# Tool implementations (mock) # --------------------------------------------------------------------------- def _tool_search_drugs(query: str, kind: str = "auto", limit: int = 10) -> str: - """Search drugs by brand name, generic name, or YJ code.""" - conn = _get_pg() - with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: - if kind == "yj": - cur.execute( - "SELECT yj_full, yj_code, brand_name, generic_name, " - "category_code, category_name FROM drug_master " - "WHERE yj_code ILIKE %s OR yj_full ILIKE %s LIMIT %s", - (f"%{query}%", f"%{query}%", limit), - ) - elif kind == "brand": - cur.execute( - "SELECT yj_full, yj_code, brand_name, generic_name, " - "category_code, category_name, " - "similarity(brand_name, %s) AS score " - "FROM drug_master " - "WHERE brand_name ILIKE %s ORDER BY score DESC LIMIT %s", - (query, f"%{query}%", limit), - ) - elif kind == "generic": - cur.execute( - "SELECT yj_full, yj_code, brand_name, generic_name, " - "category_code, category_name, " - "similarity(generic_name, %s) AS score " - "FROM drug_master " - "WHERE generic_name ILIKE %s ORDER BY score DESC LIMIT %s", - (query, f"%{query}%", limit), - ) - else: # auto - cur.execute( - "SELECT yj_full, yj_code, brand_name, generic_name, " - "category_code, category_name, " - "GREATEST(" - " similarity(brand_name, %s)," - " similarity(generic_name, %s)" - ") AS score " - "FROM drug_master " - "WHERE brand_name ILIKE %s OR generic_name ILIKE %s " - " OR yj_code ILIKE %s OR yj_full ILIKE %s " - "ORDER BY score DESC LIMIT %s", - (query, query, f"%{query}%", f"%{query}%", f"%{query}%", f"%{query}%", limit), - ) - rows = cur.fetchall() - return _dump([ - { - "yj_full": r.get("yj_full", ""), - "yj_code": r.get("yj_code", ""), - "brand": r.get("brand_name", ""), - "generic": r.get("generic_name", ""), - "category": f"{r.get('category_code', '')} {r.get('category_name', '')}".strip(), - "score": float(r.get("score", 0)) if r.get("score") else 0.0, - } - for r in rows - ]) + results = [] + for code, d in MOCK_DRUG_MASTER.items(): + q = query.lower() + if (kind == "brand" and q in d["brand_name"].lower()) or \ + (kind == "generic" and q in d["generic_name"].lower()) or \ + (kind == "yj" and (q in d["yj_code"].lower() or q in d["yj_full"].lower())) or \ + (kind == "auto" and (q in d["brand_name"].lower() or q in d["generic_name"].lower() + or q in d["yj_code"].lower() or q in d["yj_full"].lower())): + results.append({ + "yj_full": d["yj_full"], + "yj_code": d["yj_code"], + "brand": d["brand_name"], + "generic": d["generic_name"], + "category": f"{d['category_code']} {d['category_name']}", + "score": 1.0, + }) + return _dump(results[:limit]) def _tool_list_categories() -> str: - """List all L1/L2 drug categories with drug counts.""" - conn = _get_pg() - with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: - cur.execute( - "SELECT c.category_code, c.category_name, c.level, " - "COUNT(m.yj_code) AS drug_count " - "FROM drug_category c " - "LEFT JOIN drug_master m ON m.category_code = c.category_code " - "WHERE c.level IN ('L1', 'L2') " - "GROUP BY c.category_code, c.category_name, c.level " - "ORDER BY c.category_code" - ) - rows = cur.fetchall() - return _dump([dict(r) for r in rows]) + return _dump(MOCK_CATEGORIES) def _tool_list_drugs_in_category(l2_code: str, limit_generics: int = 50) -> str: - """List drugs under a specific L2 category code.""" - conn = _get_pg() - with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: - cur.execute( - "SELECT generic_name, json_agg(json_build_object(" - " 'yj_code', yj_code, 'brand_name', brand_name, 'yj_full', yj_full" - ")) AS brands " - "FROM drug_master " - "WHERE category_code ILIKE %s " - "GROUP BY generic_name " - "ORDER BY generic_name LIMIT %s", - (f"{l2_code}%", limit_generics), - ) - rows = cur.fetchall() - return _dump([{"generic_name": r["generic_name"], "brands": r["brands"]} for r in rows]) + results = [] + seen_generics = set() + for code, d in MOCK_DRUG_MASTER.items(): + if d["category_code"].startswith(l2_code) and d["generic_name"] not in seen_generics: + seen_generics.add(d["generic_name"]) + results.append({ + "generic_name": d["generic_name"], + "brands": [{"yj_code": d["yj_code"], "brand_name": d["brand_name"], "yj_full": d["yj_full"]}], + }) + return _dump(results[:limit_generics]) def _tool_get_drug_master(yj_code: str) -> str: - """Get basic info for a drug by yj_code.""" - conn = _get_pg() - with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: - cur.execute( - "SELECT * FROM drug_master WHERE yj_code = %s LIMIT 1", - (yj_code,), - ) - row = cur.fetchone() - if not row: + d = MOCK_DRUG_MASTER.get(yj_code) + if not d: return _dump({"error": f"yj_code {yj_code} not found"}) - d = dict(row) - d["_citation"] = f"[出典: {row.get('brand_name', '')} (yj_full={row.get('yj_full', '')}) / 添付文書冒頭]" - return _dump(d) + result = dict(d) + result["_citation"] = f"[出典: {d['brand_name']} (yj_full={d['yj_full']}) / 添付文書冒頭]" + return _dump(result) def _tool_get_drug_interactions( @@ -217,45 +292,22 @@ def _tool_get_drug_interactions( keyword: Optional[str] = None, limit: int = 30, ) -> str: - """Search drug_interaction table.""" - conditions = [] - params = [] - if drug_a_yj: - conditions.append("drug_a_yj = %s") - params.append(drug_a_yj) - if drug_b_yj: - conditions.append("(drug_b_yj = %s OR drug_a_yj = %s)") - params.extend([drug_b_yj, drug_b_yj]) - if severity: - conditions.append("severity = %s") - params.append(severity) - if keyword: - conditions.append("(drug_b_class ILIKE %s OR mechanism ILIKE %s OR clinical_effect ILIKE %s)") - k = f"%{keyword}%" - params.extend([k, k, k]) - - where = " AND ".join(conditions) if conditions else "1=1" - conn = _get_pg() - with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: - cur.execute( - f"SELECT * FROM drug_interaction WHERE {where} LIMIT %s", - (*params, limit), - ) - rows = cur.fetchall() - return _dump([ - { - "drug_a_yj": r.get("drug_a_yj"), - "drug_b_yj": r.get("drug_b_yj"), - "drug_b_class": r.get("drug_b_class"), - "severity": r.get("severity"), - "mechanism": r.get("mechanism"), - "clinical_effect": r.get("clinical_effect"), - "source_drug_yj": r.get("source_drug_yj"), - "source_section": r.get("source_section"), - "_citation": _citation(r.get("source_drug_yj", ""), r.get("source_section")), - } - for r in rows - ]) + results = [] + for r in MOCK_INTERACTIONS: + if drug_a_yj and r["drug_a_yj"] != drug_a_yj: + continue + if drug_b_yj and r["drug_b_yj"] != drug_b_yj: + continue + if severity and r["severity"] != severity: + continue + if keyword and keyword.lower() not in ( + (r.get("drug_b_class") or "").lower() + + (r.get("mechanism") or "").lower() + + (r.get("clinical_effect") or "").lower() + ): + continue + results.append({**r, "_citation": _citation(r["source_drug_yj"], r["source_section"])}) + return _dump(results[:limit]) def _tool_get_drug_restrictions( @@ -265,42 +317,18 @@ def _tool_get_drug_restrictions( keyword: Optional[str] = None, limit: int = 30, ) -> str: - """Search drug_restriction table.""" - conditions = [] - params = [] - if drug_yj: - conditions.append("drug_yj = %s") - params.append(drug_yj) - if condition_type: - conditions.append("condition_type = %s") - params.append(condition_type) - if severity: - conditions.append("severity = %s") - params.append(severity) - if keyword: - conditions.append("condition_text ILIKE %s") - params.append(f"%{keyword}%") - - where = " AND ".join(conditions) if conditions else "1=1" - conn = _get_pg() - with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: - cur.execute( - f"SELECT * FROM drug_restriction WHERE {where} LIMIT %s", - (*params, limit), - ) - rows = cur.fetchall() - return _dump([ - { - "drug_yj": r.get("drug_yj"), - "condition_type": r.get("condition_type"), - "condition_text": r.get("condition_text"), - "condition_params": r.get("condition_params"), - "severity": r.get("severity"), - "source_section": r.get("source_section"), - "_citation": _citation(r.get("drug_yj", ""), r.get("source_section")), - } - for r in rows - ]) + results = [] + for r in MOCK_RESTRICTIONS: + if drug_yj and r["drug_yj"] != drug_yj: + continue + if condition_type and r["condition_type"] != condition_type: + continue + if severity and r["severity"] != severity: + continue + if keyword and keyword.lower() not in (r.get("condition_text") or "").lower(): + continue + results.append({**r, "_citation": _citation(r["drug_yj"], r["source_section"])}) + return _dump(results[:limit]) def _tool_get_drug_dosing( @@ -308,36 +336,14 @@ def _tool_get_drug_dosing( patient_segment: Optional[str] = None, limit: int = 20, ) -> str: - """Search drug_dosing table.""" - conditions = ["drug_yj = %s"] - params = [drug_yj] - if patient_segment: - conditions.append("patient_segment = %s") - params.append(patient_segment) - - where = " AND ".join(conditions) - conn = _get_pg() - with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: - cur.execute( - f"SELECT * FROM drug_dosing WHERE {where} LIMIT %s", - (*params, limit), - ) - rows = cur.fetchall() - return _dump([ - { - "patient_segment": r.get("patient_segment"), - "segment_params": r.get("segment_params"), - "indication_code": r.get("indication_code"), - "dose_amount": r.get("dose_amount"), - "dose_unit": r.get("dose_unit"), - "frequency": r.get("frequency"), - "duration": r.get("duration"), - "adjustment_text": r.get("adjustment_text"), - "source_section": r.get("source_section"), - "_citation": _citation(drug_yj, r.get("source_section")), - } - for r in rows - ]) + results = [] + for r in MOCK_DOSING: + if r["drug_yj"] != drug_yj: + continue + if patient_segment and r["patient_segment"] != patient_segment: + continue + results.append({**r, "_citation": _citation(drug_yj, r["source_section"])}) + return _dump(results[:limit]) def _tool_search_section_text( @@ -345,141 +351,73 @@ def _tool_search_section_text( section_filter: str = "", limit: int = 30, ) -> str: - """Full-text search in OpenSearch pmda_sections index.""" if not keyword.strip(): return _dump({"keyword": keyword, "total_drugs": 0, "shown": 0, "hits": []}) - size = min(max(1, limit), 100) - body: Dict[str, Any] = { - "size": size, - "_source": ["yj_full", "brand_names", "generic_name", "l2_code", "l2_name", "section_title", "line_num"], - "query": {"bool": {"must": [{"match": {"text": keyword}}]}}, - "collapse": { - "field": "yj_full", - "inner_hits": { - "name": "matches", - "size": 2, - "_source": ["section_title", "line_num"], - "highlight": {"fields": {"text": {"fragment_size": 160, "number_of_fragments": 1}}}, - }, - }, - "aggs": {"total_drugs": {"cardinality": {"field": "yj_full"}}}, - } - if section_filter: - body["query"]["bool"]["filter"] = [ - {"wildcard": {"section_title.raw": f"*{section_filter}*"}} - ] - - client = _get_os() - resp = client.search(index=OS_INDEX, body=body) - total = int(resp["aggregations"]["total_drugs"]["value"]) - + # Simple mock: search through section text hits_out = [] - for h in resp["hits"]["hits"]: - src = h.get("_source") or {} - inner = h.get("inner_hits", {}).get("matches", {}).get("hits", {}).get("hits", []) - matches = [] - seen = set() - for ih in inner: - ih_src = ih.get("_source") or {} - title = ih_src.get("section_title") or "" - if title in seen: + for (yj_full, section_title), text in MOCK_SECTION_TEXT.items(): + if section_filter and section_filter not in section_title: + continue + if keyword.lower() in text.lower(): + drug = None + for d in MOCK_DRUG_MASTER.values(): + if d["yj_full"] == yj_full: + drug = d + break + if not drug: continue - seen.add(title) - hl = ih.get("highlight", {}).get("text", [""]) - matches.append({"section_title": title, "snippet": hl[0] if hl else ""}) - brand = (src.get("brand_names") or [""])[0] - yj_full = src.get("yj_full") or "" - hits_out.append({ - "yj_full": yj_full, - "brand": brand, - "generic": src.get("generic_name") or "", - "l2": f"{src.get('l2_code') or ''} {src.get('l2_name') or ''}".strip(), - "matches": matches, - "_citation_template": f"[出典: {brand} (yj_full={yj_full}) / <該当章>]", - }) + brand = drug["brand_name"] + # Deduplicate by yj_full + existing = [h for h in hits_out if h["yj_full"] == yj_full] + if existing: + existing[0]["matches"].append({ + "section_title": section_title, + "snippet": text[:160], + }) + continue + hits_out.append({ + "yj_full": yj_full, + "brand": brand, + "generic": drug["generic_name"], + "l2": f"{drug['category_code']} {drug['category_name']}", + "matches": [{"section_title": section_title, "snippet": text[:160]}], + "_citation_template": f"[出典: {brand} (yj_full={yj_full}) / <該当章>]", + }) - out = { + return _dump({ "keyword": keyword, "section_filter": section_filter or None, - "total_drugs": total, + "total_drugs": len({h["yj_full"] for h in hits_out}), "shown": len(hits_out), - "hits": hits_out, - } - if total > len(hits_out): - out["_more_count"] = total - len(hits_out) - return _dump(out) + "hits": hits_out[:limit], + }) def _tool_list_drug_chapters(yj_full: str) -> str: - """List all chapter titles for a drug's package insert.""" - client = _get_os() - body = { - "size": 200, - "_source": ["yj_full", "brand_names", "generic_name", "section_title", "line_num"], - "query": {"term": {"yj_full": yj_full}}, - "sort": [{"line_num": {"order": "asc"}}], - } - resp = client.search(index=OS_INDEX, body=body) - hits = resp["hits"]["hits"] - - if not hits: + sections = MOCK_CHAPTERS.get(yj_full) + if not sections: return _dump({"error": f"yj_full {yj_full} の章節が見つかりません。"}) - sections = [] - for h in hits: - src = h.get("_source") or {} - # Calculate text length from _score or use stored field - sections.append({ - "section_title": src.get("section_title", ""), - "line_num": src.get("line_num", 0), - "text_len": 0, # not available from list query - }) + drug = None + for d in MOCK_DRUG_MASTER.values(): + if d["yj_full"] == yj_full: + drug = d + break - head = hits[0].get("_source") or {} return _dump({ "yj_full": yj_full, - "brand": (head.get("brand_names") or [""])[0], - "generic": head.get("generic_name", ""), + "brand": drug["brand_name"] if drug else "", + "generic": drug["generic_name"] if drug else "", "n_sections": len(sections), "sections": sections, }) def _tool_read_drug_chapter(yj_full: str, section_title: str) -> str: - """Read verbatim text of a specific chapter.""" - client = _get_os() - body = { - "size": 1, - "_source": ["text", "section_title"], - "query": { - "bool": { - "must": [ - {"term": {"yj_full": yj_full}}, - {"term": {"section_title.keyword": section_title}}, - ] - } - }, - } - resp = client.search(index=OS_INDEX, body=body) - hits = resp["hits"]["hits"] - - if hits: - text = hits[0].get("_source", {}).get("text", "") - if text: - return text[:8000] - - # Fallback: try match instead of term for section_title - body["query"]["bool"]["must"][1] = {"match_phrase": {"section_title": section_title}} - resp = client.search(index=OS_INDEX, body=body) - hits = resp["hits"]["hits"] - - if hits: - text = hits[0].get("_source", {}).get("text", "") - if text: - return text[:8000] - - # Not found — suggest listing chapters + text = MOCK_SECTION_TEXT.get((yj_full, section_title)) + if text: + return text[:8000] return _dump({ "error": f"section_title {section_title!r} は {yj_full} に存在しません。", "hint": "list_drug_chapters で取得した sections[].section_title をそのまま渡してください。", @@ -490,7 +428,6 @@ def _tool_read_drug_chapter(yj_full: str, section_title: str) -> str: # MCP request handler # --------------------------------------------------------------------------- -# Map tool names to their implementation functions _TOOL_DISPATCH = { "search_drugs": lambda args: _tool_search_drugs( query=args.get("query", ""),