发布时间: 2025-5-16 文章作者: myluzh 分类名称: NOTES 朗读文章
from mcp.server.fastmcp import FastMCP import socket import asyncio # 创建FastMCP实例 mcp = FastMCP("Demo") @mcp.tool() async def ping_port(host: str, port: int, timeout: float = 2.0) -> str: """ 测试指定主机的端口是否可达。 Args: host: 目标主机名或IP地址 port: 要测试的端口号 timeout: 超时时间(秒),默认为2秒 Returns: str: 端口测试结果 """ try: # 创建异步任务 loop = asyncio.get_event_loop() future = loop.run_in_executor(None, _check_port, host, port, timeout) result = await future if result: return f"成功: 主机 {host} 的端口 {port} 可达。" else: return f"失败: 主机 {host} 的端口 {port} 不可达。" except Exception as e: return f"错误: {str(e)}" def _check_port(host: str, port: int, timeout: float) -> bool: """同步函数,检查端口是否开放""" try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(timeout) result = sock.connect_ex((host, port)) sock.close() return result == 0 except Exception: return False @mcp.tool() async def resolve_domain(domain: str) -> str: """ 将域名解析为IP地址。 Args: domain: 要解析的域名 Returns: str: 解析结果,包含IP地址或错误信息 """ try: # 创建异步任务 loop = asyncio.get_event_loop() future = loop.run_in_executor(None, _resolve_domain, domain) result = await future if result: return f"成功: 域名 {domain} 解析为 {result}" else: return f"失败: 无法解析域名 {domain}" except Exception as e: return f"错误: {str(e)}" def _resolve_domain(domain: str) -> str: """同步函数,解析域名为IP地址""" try: ip_address = socket.gethostbyname(domain) return ip_address except socket.gaierror: return "" @mcp.tool() async def check_listening_ports(ip: str, username: str, password: str, ssh_port: int = 22) -> str: """ 通过SSH连接到服务器,查看主机监听的端口。 Args: ip: 服务器IP地址 username: SSH用户名 password: SSH密码 ssh_port: SSH端口,默认为22 Returns: str: 监听端口列表或错误信息 """ try: # 创建异步任务 loop = asyncio.get_event_loop() future = loop.run_in_executor(None, _check_listening_ports, ip, username, password, ssh_port) result = await future if result: return f"成功: 主机 {ip} 的监听端口:\n{result}" else: return f"失败: 无法获取主机 {ip} 的监听端口信息" except Exception as e: return f"错误: {str(e)}" def _check_listening_ports(ip: str, username: str, password: str, ssh_port: int) -> str: """同步函数,通过SSH连接到服务器并获取监听端口""" try: import paramiko client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect(ip, port=ssh_port, username=username, password=password, timeout=10) # 执行命令获取监听端口 stdin, stdout, stderr = client.exec_command("netstat -tuln | grep LISTEN") output = stdout.read().decode() client.close() return output if output else "未发现监听端口" except Exception as e: return "" @mcp.tool() async def check_firewall_ports(ip: str, username: str, password: str, ssh_port: int = 22) -> str: """ 通过SSH连接到云主机,查看防火墙放行的端口。 Args: ip: 服务器IP地址 username: SSH用户名 password: SSH密码 ssh_port: SSH端口,默认为22 Returns: str: 防火墙放行端口列表或错误信息 """ try: # 创建异步任务 loop = asyncio.get_event_loop() future = loop.run_in_executor(None, _check_firewall_ports, ip, username, password, ssh_port) result = await future if result: return f"成功: 主机 {ip} 的防火墙放行端口:\n{result}" else: return f"失败: 无法获取主机 {ip} 的防火墙放行端口信息" except Exception as e: return f"错误: {str(e)}" def _check_firewall_ports(ip: str, username: str, password: str, ssh_port: int) -> str: """同步函数,通过SSH连接到服务器并获取防火墙放行端口""" try: import paramiko client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect(ip, port=ssh_port, username=username, password=password, timeout=10) # 尝试使用不同的防火墙工具获取放行端口 # 首先尝试iptables stdin, stdout, stderr = client.exec_command("command -v iptables && sudo iptables -L -n") iptables_output = stdout.read().decode() # 尝试firewalld stdin, stdout, stderr = client.exec_command("command -v firewall-cmd && sudo firewall-cmd --list-all") firewalld_output = stdout.read().decode() # 尝试ufw stdin, stdout, stderr = client.exec_command("command -v ufw && sudo ufw status") ufw_output = stdout.read().decode() client.close() result = "" if iptables_output: result += "===== iptables 防火墙规则 =====\n" + iptables_output + "\n" if firewalld_output: result += "===== firewalld 防火墙规则 =====\n" + firewalld_output + "\n" if ufw_output: result += "===== ufw 防火墙规则 =====\n" + ufw_output + "\n" return result if result else "未能获取防火墙规则或无防火墙配置" except Exception as e: return "" @mcp.resource("greeting://{name}") def get_greeting(name: str) -> str: """获取个性化问候语""" return f"Hello, {name}!" if __name__ == "__main__": # 使用SSE传输 mcp.run(transport='sse')
/usr/local/bin/python3 mcp_server.py INFO: Started server process [44456] INFO: Waiting for application startup. INFO: Application startup complete. INFO: Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit) INFO: 127.0.0.1:63410 - "GET /sse HTTP/1.1" 200 OK
发表评论