qwen_agent/skills/voice-notification/scripts/voice_notify.py
2026-04-01 10:26:17 +08:00

82 lines
2.5 KiB
Python

#!/usr/bin/env python3
"""Voice notification script for broadcasting messages to active voice sessions."""
import argparse
import json
import os
import sys
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError
# Default API endpoint
DEFAULT_API_URL = "http://localhost:8001/api/v3/voice/broadcast"
def broadcast_message(message: str, api_url: str = DEFAULT_API_URL) -> dict:
"""Send a broadcast message to the voice API.
Args:
message: The message content to be spoken
api_url: The API endpoint URL
Returns:
Response dict from the API
"""
bot_id = os.environ.get("BOT_ID", "")
user_identifier = os.environ.get("USER_IDENTIFIER", "")
if not bot_id:
return {"success": False, "error": "BOT_ID environment variable not set"}
if not user_identifier:
return {"success": False, "error": "USER_IDENTIFIER environment variable not set"}
payload = {
"bot_id": bot_id,
"user_identifier": user_identifier,
"message": message
}
req = Request(
api_url,
data=json.dumps(payload).encode("utf-8"),
headers={"Content-Type": "application/json"},
method="POST"
)
try:
with urlopen(req, timeout=10) as response:
return json.loads(response.read().decode("utf-8"))
except HTTPError as e:
return {"success": False, "error": f"HTTP {e.code}: {e.reason}"}
except URLError as e:
return {"success": False, "error": f"Connection error: {e.reason}"}
except Exception as e:
return {"success": False, "error": str(e)}
def main():
parser = argparse.ArgumentParser(description="Voice notification broadcast tool")
subparsers = parser.add_subparsers(dest="command", help="Available commands")
# Broadcast command
broadcast_parser = subparsers.add_parser("broadcast", help="Broadcast a message to active voice session")
broadcast_parser.add_argument("--message", required=True, help="Message content to be spoken")
broadcast_parser.add_argument("--api-url", default=DEFAULT_API_URL, help="API endpoint URL")
args = parser.parse_args()
if args.command == "broadcast":
result = broadcast_message(
message=args.message,
api_url=args.api_url
)
print(json.dumps(result, ensure_ascii=False, indent=2))
sys.exit(0 if result.get("success") else 1)
else:
parser.print_help()
sys.exit(1)
if __name__ == "__main__":
main()