feat: 重构解析器发布覆盖功能 - 添加forceOverwrite参数支持覆盖已存在解析器 - 前端添加覆盖确认对话框 - 修复lambda中Boolean类型转换错误

This commit is contained in:
q
2026-01-19 11:10:16 +08:00
parent 55c3387415
commit a925731f52
21 changed files with 5630 additions and 389 deletions

View File

@@ -69,6 +69,12 @@
<junit.version>4.13.2</junit.version>
<!-- GraalPy -->
<graalpy.version>24.1.1</graalpy.version>
<!-- 代理配置(可选)- 如不需要代理请保持注释 -->
<!-- <http.proxyHost>127.0.0.1</http.proxyHost>
<http.proxyPort>7890</http.proxyPort>
<https.proxyHost>127.0.0.1</https.proxyHost>
<https.proxyPort>7890</https.proxyPort> -->
</properties>
<dependencies>

View File

@@ -0,0 +1,280 @@
package cn.qaiu.parser.custompy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.regex.Pattern;
/**
* Python代码预处理器
* 用于在运行时自动检测代码中的网络请求导入并动态注入requests_guard猴子补丁
*
* 功能:
* 1. 检测代码中是否导入了 requests、urllib、httpx 等网络请求库
* 2. 如果检测到网络请求库,自动在代码头部注入 requests_guard 猴子补丁
* 3. 生成日志信息供演练场控制台显示
*
* @author <a href="https://qaiu.top">QAIU</a>
*/
public class PyCodePreprocessor {
private static final Logger log = LoggerFactory.getLogger(PyCodePreprocessor.class);
// 检测网络请求库的正则表达式
private static final Pattern IMPORT_REQUESTS = Pattern.compile(
"^\\s*(?:import\\s+requests|from\\s+requests\\b)",
Pattern.MULTILINE | Pattern.CASE_INSENSITIVE
);
private static final Pattern IMPORT_URLLIB = Pattern.compile(
"^\\s*(?:import\\s+urllib|from\\s+urllib\\b)",
Pattern.MULTILINE | Pattern.CASE_INSENSITIVE
);
private static final Pattern IMPORT_HTTPX = Pattern.compile(
"^\\s*(?:import\\s+httpx|from\\s+httpx\\b)",
Pattern.MULTILINE | Pattern.CASE_INSENSITIVE
);
private static final Pattern IMPORT_AIOHTTP = Pattern.compile(
"^\\s*(?:import\\s+aiohttp|from\\s+aiohttp\\b)",
Pattern.MULTILINE | Pattern.CASE_INSENSITIVE
);
private static final Pattern IMPORT_SOCKET = Pattern.compile(
"^\\s*(?:import\\s+socket|from\\s+socket\\b)",
Pattern.MULTILINE | Pattern.CASE_INSENSITIVE
);
/**
* 预处理Python代码 - 检测并注入猴子补丁
*
* @param originalCode 原始Python代码
* @return 处理后的代码(可能包含注入的补丁)
*/
public static PyPreprocessResult preprocess(String originalCode) {
if (originalCode == null || originalCode.trim().isEmpty()) {
return new PyPreprocessResult(originalCode, false, null, "代码为空,无需预处理");
}
// 检测网络请求库
NetworkLibraryDetection detection = detectNetworkLibraries(originalCode);
if (detection.hasAnyNetworkLibrary()) {
log.debug("检测到网络请求库: {}", detection.getDetectedLibraries());
// 加载猴子补丁代码
String patchCode = loadRequestsGuardPatch();
if (patchCode != null && !patchCode.isEmpty()) {
// 在代码头部注入补丁
String preprocessedCode = injectPatch(originalCode, patchCode);
String logMessage = String.format(
"✓ 网络请求安全拦截已启用 (检测到: %s) | 已动态注入 requests_guard 猴子补丁",
detection.getDetectedLibrariesAsString()
);
log.info(logMessage);
return new PyPreprocessResult(
preprocessedCode,
true,
detection.getDetectedLibraries(),
logMessage
);
} else {
String logMessage = "⚠ 检测到网络请求库但猴子补丁加载失败,请检查资源文件";
log.warn(logMessage);
return new PyPreprocessResult(
originalCode,
false,
detection.getDetectedLibraries(),
logMessage
);
}
} else {
// 没有检测到网络请求库
String logMessage = " 代码中未检测到网络请求库,不需要注入安全拦截补丁";
log.debug(logMessage);
return new PyPreprocessResult(originalCode, false, null, logMessage);
}
}
/**
* 检测代码中使用的网络请求库
*/
private static NetworkLibraryDetection detectNetworkLibraries(String code) {
NetworkLibraryDetection detection = new NetworkLibraryDetection();
if (IMPORT_REQUESTS.matcher(code).find()) {
detection.addLibrary("requests");
}
if (IMPORT_URLLIB.matcher(code).find()) {
detection.addLibrary("urllib");
}
if (IMPORT_HTTPX.matcher(code).find()) {
detection.addLibrary("httpx");
}
if (IMPORT_AIOHTTP.matcher(code).find()) {
detection.addLibrary("aiohttp");
}
if (IMPORT_SOCKET.matcher(code).find()) {
detection.addLibrary("socket");
}
return detection;
}
/**
* 加载requests_guard猴子补丁代码
*/
private static String loadRequestsGuardPatch() {
try {
// 从资源文件加载requests_guard.py
InputStream inputStream = PyCodePreprocessor.class.getClassLoader()
.getResourceAsStream("requests_guard.py");
if (inputStream == null) {
log.warn("无法找到 requests_guard.py 资源文件");
return null;
}
StringBuilder content = new StringBuilder();
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
content.append(line).append("\n");
}
}
return content.toString();
} catch (IOException e) {
log.error("加载 requests_guard.py 失败", e);
return null;
}
}
/**
* 在Python代码头部注入补丁
*
* @param originalCode 原始代码
* @param patchCode 补丁代码
* @return 注入补丁后的代码
*/
private static String injectPatch(String originalCode, String patchCode) {
// 找到第一个非注释、非空行作为注入位置
String[] lines = originalCode.split("\n");
int insertIndex = 0;
// 跳过模块文档字符串和注释
for (int i = 0; i < lines.length; i++) {
String line = lines[i].trim();
// 跳过空行和注释
if (line.isEmpty() || line.startsWith("#")) {
insertIndex = i + 1;
continue;
}
// 跳过模块文档字符串 (""" 或 ''')
if (line.startsWith("\"\"\"") || line.startsWith("'''")) {
// 简单处理:假设文档字符串在单行内或下一行结束
insertIndex = i + 1;
if (line.length() > 3 && !line.endsWith(line.substring(0, 3))) {
continue; // 多行文档字符串,继续跳过
}
}
// 找到第一个有效的代码行
break;
}
// 构建注入后的代码
StringBuilder result = new StringBuilder();
// 添加前面的行
for (int i = 0; i < insertIndex && i < lines.length; i++) {
result.append(lines[i]).append("\n");
}
// 添加补丁代码
result.append("\n# ===== 自动注入的网络请求安全补丁 (由 PyCodePreprocessor 生成) =====\n");
result.append(patchCode);
result.append("\n# ===== 安全补丁结束 =====\n\n");
// 添加剩余的代码
for (int i = insertIndex; i < lines.length; i++) {
result.append(lines[i]);
if (i < lines.length - 1) {
result.append("\n");
}
}
return result.toString();
}
/**
* 预处理结果类
*/
public static class PyPreprocessResult {
private final String processedCode; // 处理后的代码
private final boolean patchInjected; // 是否注入了补丁
private final java.util.List<String> detectedLibraries; // 检测到的库
private final String logMessage; // 日志消息
public PyPreprocessResult(String processedCode, boolean patchInjected,
java.util.List<String> detectedLibraries, String logMessage) {
this.processedCode = processedCode;
this.patchInjected = patchInjected;
this.detectedLibraries = detectedLibraries;
this.logMessage = logMessage;
}
public String getProcessedCode() {
return processedCode;
}
public boolean isPatchInjected() {
return patchInjected;
}
public java.util.List<String> getDetectedLibraries() {
return detectedLibraries;
}
public String getLogMessage() {
return logMessage;
}
}
/**
* 网络库检测结果
*/
private static class NetworkLibraryDetection {
private final java.util.List<String> detectedLibraries = new java.util.ArrayList<>();
void addLibrary(String library) {
if (!detectedLibraries.contains(library)) {
detectedLibraries.add(library);
}
}
boolean hasAnyNetworkLibrary() {
return !detectedLibraries.isEmpty();
}
java.util.List<String> getDetectedLibraries() {
return detectedLibraries;
}
String getDetectedLibrariesAsString() {
return String.join(", ", detectedLibraries);
}
}
}

View File

@@ -6,6 +6,7 @@ import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.json.JsonObject;
import org.graalvm.polyglot.Context;
import org.graalvm.polyglot.PolyglotException;
import org.graalvm.polyglot.Value;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,6 +77,11 @@ public class PyPlaygroundExecutor {
}
playgroundLogger.debugJava("安全检查通过");
// Python代码预处理 - 检测并注入猴子补丁
PyCodePreprocessor.PyPreprocessResult preprocessResult = PyCodePreprocessor.preprocess(pyCode);
playgroundLogger.infoJava(preprocessResult.getLogMessage());
String codeToExecute = preprocessResult.getProcessedCode();
CompletableFuture<String> executionFuture = CompletableFuture.supplyAsync(() -> {
playgroundLogger.infoJava("开始执行parse方法");
@@ -91,7 +97,7 @@ public class PyPlaygroundExecutor {
// 执行Python代码已支持真正的 pip 包如 requests, zlib 等)
playgroundLogger.debugJava("执行Python代码");
context.eval("python", pyCode);
context.eval("python", codeToExecute);
// 调用parse函数
Value parseFunc = bindings.getMember("parse");
@@ -113,6 +119,11 @@ public class PyPlaygroundExecutor {
playgroundLogger.errorJava(errorMsg);
throw new RuntimeException(errorMsg);
}
} catch (PolyglotException e) {
// 处理 Python 语法错误和运行时错误
String errorMsg = formatPolyglotException(e);
playgroundLogger.errorJava("执行parse方法失败: " + errorMsg);
throw new RuntimeException(errorMsg, e);
} catch (Exception e) {
String errorMsg = e.getMessage();
if (errorMsg == null || errorMsg.isEmpty()) {
@@ -164,6 +175,11 @@ public class PyPlaygroundExecutor {
public Future<List<FileInfo>> executeParseFileListAsync() {
Promise<List<FileInfo>> promise = Promise.promise();
// Python代码预处理 - 检测并注入猴子补丁
PyCodePreprocessor.PyPreprocessResult preprocessResult = PyCodePreprocessor.preprocess(pyCode);
playgroundLogger.infoJava(preprocessResult.getLogMessage());
String codeToExecute = preprocessResult.getProcessedCode();
CompletableFuture<List<FileInfo>> executionFuture = CompletableFuture.supplyAsync(() -> {
playgroundLogger.infoJava("开始执行parse_file_list方法");
@@ -177,7 +193,7 @@ public class PyPlaygroundExecutor {
bindings.putMember("crypto", cryptoUtils);
// 执行Python代码已支持真正的 pip 包)
context.eval("python", pyCode);
context.eval("python", codeToExecute);
Value parseFileListFunc = bindings.getMember("parse_file_list");
if (parseFileListFunc == null || !parseFileListFunc.canExecute()) {
@@ -191,6 +207,11 @@ public class PyPlaygroundExecutor {
List<FileInfo> fileList = convertToFileInfoList(result);
playgroundLogger.infoJava("文件列表解析成功,共 " + fileList.size() + " 个文件");
return fileList;
} catch (PolyglotException e) {
// 处理 Python 语法错误和运行时错误
String errorMsg = formatPolyglotException(e);
playgroundLogger.errorJava("执行parse_file_list方法失败: " + errorMsg);
throw new RuntimeException(errorMsg, e);
} catch (Exception e) {
playgroundLogger.errorJava("执行parse_file_list方法失败: " + e.getMessage(), e);
throw new RuntimeException(e);
@@ -229,6 +250,11 @@ public class PyPlaygroundExecutor {
public Future<String> executeParseByIdAsync() {
Promise<String> promise = Promise.promise();
// Python代码预处理 - 检测并注入猴子补丁
PyCodePreprocessor.PyPreprocessResult preprocessResult = PyCodePreprocessor.preprocess(pyCode);
playgroundLogger.infoJava(preprocessResult.getLogMessage());
String codeToExecute = preprocessResult.getProcessedCode();
CompletableFuture<String> executionFuture = CompletableFuture.supplyAsync(() -> {
playgroundLogger.infoJava("开始执行parse_by_id方法");
@@ -242,7 +268,7 @@ public class PyPlaygroundExecutor {
bindings.putMember("crypto", cryptoUtils);
// 执行Python代码已支持真正的 pip 包)
context.eval("python", pyCode);
context.eval("python", codeToExecute);
Value parseByIdFunc = bindings.getMember("parse_by_id");
if (parseByIdFunc == null || !parseByIdFunc.canExecute()) {
@@ -372,4 +398,74 @@ public class PyPlaygroundExecutor {
return null;
}
}
/**
* 格式化 PolyglotException 异常信息,提取详细的错误位置和描述
*/
private String formatPolyglotException(PolyglotException e) {
StringBuilder sb = new StringBuilder();
// 判断是否为语法错误
if (e.isSyntaxError()) {
sb.append("Python语法错误: ");
} else if (e.isGuestException()) {
sb.append("Python运行时错误: ");
} else {
sb.append("Python执行错误: ");
}
// 添加错误消息
String message = e.getMessage();
if (message != null && !message.isEmpty()) {
sb.append(message);
}
// 添加源代码位置信息
if (e.getSourceLocation() != null) {
org.graalvm.polyglot.SourceSection sourceSection = e.getSourceLocation();
sb.append("\n位置: ");
// 文件名(如果有)
if (sourceSection.getSource() != null && sourceSection.getSource().getName() != null) {
sb.append(sourceSection.getSource().getName()).append(", ");
}
// 行号和列号
sb.append("").append(sourceSection.getStartLine()).append("");
if (sourceSection.hasColumns()) {
sb.append(", 第 ").append(sourceSection.getStartColumn()).append("");
}
// 显示出错的代码行(如果可用)
if (sourceSection.hasCharIndex() && sourceSection.getCharacters() != null) {
sb.append("\n错误代码: ").append(sourceSection.getCharacters().toString().trim());
}
}
// 添加堆栈跟踪仅显示Python部分
if (e.isGuestException() && e.getPolyglotStackTrace() != null) {
sb.append("\n\nPython堆栈跟踪:");
boolean foundPythonFrame = false;
for (PolyglotException.StackFrame frame : e.getPolyglotStackTrace()) {
if (frame.isGuestFrame() && frame.getLanguage() != null &&
frame.getLanguage().getId().equals("python")) {
foundPythonFrame = true;
sb.append("\n at ").append(frame.getRootName() != null ? frame.getRootName() : "<unknown>");
if (frame.getSourceLocation() != null) {
org.graalvm.polyglot.SourceSection loc = frame.getSourceLocation();
sb.append(" (");
if (loc.getSource() != null && loc.getSource().getName() != null) {
sb.append(loc.getSource().getName()).append(":");
}
sb.append("line ").append(loc.getStartLine()).append(")");
}
}
}
if (!foundPythonFrame) {
sb.append("\n (无Python堆栈信息)");
}
}
return sb.toString();
}
}

View File

@@ -23,6 +23,10 @@ Python解析器示例
可选实现的函数:
- parse_file_list(share_link_info, http, logger): 解析文件列表,返回文件信息列表
- parse_by_id(share_link_info, http, logger): 根据文件ID解析下载链接
注意事项:
- http、logger、crypto 等对象已在全局注入,无需导入
- 如需使用标准库,直接 import 即可import json, import re
"""

View File

@@ -0,0 +1,310 @@
"""
requests_guard.py - 网络请求安全卫士
对 requests, urllib 等网络库做猴子补丁,阻断本地及危险地址的访问
用法:在程序最早 import 本模块即可全局生效
功能:
1. 拦截 requests 库的所有 HTTP 请求
2. 检测和阻止访问本地地址127.0.0.1, localhost 等)
3. 检测和阻止访问私网地址10.0.0.0, 172.16.0.0, 192.168.0.0, 等)
4. 提供详细的审计日志
作者: QAIU
版本: 1.0.0
"""
import socket
import sys
from urllib.parse import urlparse
# ===== IP 地址判断工具 =====
# 常见内网/危险网段(可按需增删)
PRIVATE_NETS = [
"127.0.0.0/8", # 本地回环
"10.0.0.0/8", # A 类私网
"172.16.0.0/12", # B 类私网
"192.168.0.0/16", # C 类私网
"0.0.0.0/8", # 0.x.x.x
"169.254.0.0/16", # Link-local
"224.0.0.0/4", # 多播地址
"240.0.0.0/4", # 预留地址
]
# 危险端口列表(常见网络服务端口)
DANGEROUS_PORTS = [
22, # SSH
25, # SMTP
53, # DNS
3306, # MySQL
5432, # PostgreSQL
6379, # Redis
8000, 8001, 8080, 8888, # 常见开发服务器端口
27017, # MongoDB
]
def _ip_in_nets(ip_str: str) -> bool:
"""判断 IP 是否落在 PRIVATE_NETS 中的任一 CIDR"""
try:
from ipaddress import ip_address, ip_network
addr = ip_address(ip_str)
return any(addr in ip_network(cidr) for cidr in PRIVATE_NETS)
except (ValueError, ImportError):
# 如果解析失败非IP地址或模块不可用返回False不是私网IP
return False
def _hostname_resolves_to_private(hostname: str) -> bool:
"""解析域名并判断解析结果是否落在私网"""
try:
_, _, ips = socket.gethostbyname_ex(hostname)
return any(_ip_in_nets(ip) for ip in ips)
except (OSError, socket.error):
# 解析失败如网络问题、DNS不可用允许访问不视为私网
# 仅当成功解析且落在私网时才拦截
return False
def _is_dangerous_port(port):
"""判断是否为危险端口"""
return port in DANGEROUS_PORTS
# ===== 日志工具 =====
class GuardLogger:
"""网络请求卫士日志记录器"""
# 用于去重的最近请求缓存(避免重复日志)
_recent_requests = set()
_max_cache_size = 100
@staticmethod
def audit(level, message):
"""输出审计日志"""
timestamp = _get_timestamp()
log_msg = f"[{timestamp}] [Guard-{level}] {message}"
print(log_msg)
# 可以在这里添加文件日志、数据库日志等
sys.stdout.flush()
@staticmethod
def allow(method, url):
"""记录允许的请求(带去重)"""
request_key = f"{method.upper()}:{url}"
if request_key not in GuardLogger._recent_requests:
GuardLogger._recent_requests.add(request_key)
# 限制缓存大小
if len(GuardLogger._recent_requests) > GuardLogger._max_cache_size:
GuardLogger._recent_requests.clear()
GuardLogger.audit("ALLOW", f"{method.upper():6} {url}")
@staticmethod
def block(method, url, reason):
"""记录被阻止的请求"""
GuardLogger.audit("BLOCK", f"{method.upper():6} {url} - {reason}")
def _get_timestamp():
"""获取当前时间戳"""
try:
from datetime import datetime
return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
except ImportError:
return ""
# ===== requests 库猴子补丁 =====
def _patch_requests():
"""为 requests 库应用猴子补丁"""
try:
import requests
from requests import models
# 备份原始的 request 方法
_orig_request = requests.api.request
_orig_session_request = requests.Session.request
# 备份高层快捷函数(在修改之前)
_orig_methods = {}
for method in ("get", "post", "put", "patch", "delete", "head", "options"):
_orig_methods[method] = getattr(requests, method, None)
def _safe_request(method, url, **kwargs):
"""安全的 request 包装函数"""
_validate_url(method, url)
GuardLogger.allow(method, url)
return _orig_request(method, url, **kwargs)
def _safe_session_request(self, method, url, **kwargs):
"""安全的 Session.request 包装函数"""
_validate_url(method, url)
GuardLogger.allow(method, url)
return _orig_session_request(self, method, url, **kwargs)
# 应用猴子补丁
requests.api.request = _safe_request
requests.Session.request = _safe_session_request
# 为了兼容高层快捷函数 get/post/...
for method_name, original_method in _orig_methods.items():
if original_method:
# 创建闭包保存当前方法名和原始方法
def make_safe_method(m, orig_func):
def safe_method(url, **kwargs):
_validate_url(m, url)
GuardLogger.allow(m, url)
return orig_func(url, **kwargs)
return safe_method
setattr(requests, method_name, make_safe_method(method_name, original_method))
GuardLogger.audit("INFO", "requests 库猴子补丁加载成功,已启用网络请求安全拦截")
return True
except ImportError:
GuardLogger.audit("DEBUG", "requests 库未安装,跳过补丁")
return False
except Exception as e:
GuardLogger.audit("ERROR", f"requests 库补丁加载失败: {str(e)}")
return False
# ===== urllib 库猴子补丁 =====
def _patch_urllib():
"""为 urllib 库应用猴子补丁"""
try:
import urllib.request
import urllib.error
# 备份原始方法
_orig_urlopen = urllib.request.urlopen
def _safe_urlopen(url, *args, **kwargs):
"""安全的 urlopen 包装函数"""
if isinstance(url, str):
_validate_url("GET", url)
GuardLogger.allow("GET", url)
elif hasattr(url, 'get_full_url'):
# 处理 Request 对象
full_url = url.get_full_url()
_validate_url(url.get_method(), full_url)
GuardLogger.allow(url.get_method(), full_url)
return _orig_urlopen(url, *args, **kwargs)
# 应用猴子补丁
urllib.request.urlopen = _safe_urlopen
GuardLogger.audit("INFO", "urllib 库猴子补丁加载成功")
return True
except ImportError:
GuardLogger.audit("DEBUG", "urllib 库未安装或不可用,跳过补丁")
return False
except Exception as e:
GuardLogger.audit("ERROR", f"urllib 库补丁加载失败: {str(e)}")
return False
# ===== 核心验证函数 =====
def _validate_url(method: str, url: str):
"""验证 URL 是否安全"""
if not isinstance(url, str):
raise ValueError(f"[Guard] 非法 URL 类型:{type(url)}")
if not url or len(url) == 0:
raise ValueError("[Guard] URL 不能为空")
# 解析 URL
try:
parsed = urlparse(url)
except Exception as e:
raise ValueError(f"[Guard] 无法解析 URL{url} - {str(e)}")
scheme = parsed.scheme.lower()
host = parsed.hostname
port = parsed.port
# 检查协议(仅允许 http/https
if scheme not in ("http", "https"):
GuardLogger.block(method, url, f"不允许的协议: {scheme}")
raise PermissionError(f"[Guard] 禁止访问不安全的协议:{scheme}://")
if not host:
GuardLogger.block(method, url, "无法解析主机名")
raise ValueError(f"[Guard] 无法解析 URL 中的主机名:{url}")
# 1. 快速检查本地地址
host_lower = host.lower()
if host_lower in ("localhost", "127.0.0.1", "::1", "[::1]"):
GuardLogger.block(method, url, "本地地址")
raise PermissionError(f"[Guard] 禁止访问本地地址:{url}")
# 2. 检查危险端口
if port and _is_dangerous_port(port):
GuardLogger.block(method, url, f"危险端口 {port}")
raise PermissionError(f"[Guard] 禁止访问危险端口 {port}{url}")
# 3. 检查是否为 IP 地址或解析后落在私网网段
try:
# 判断 host 是否为纯 IP 地址(仅包含数字、点、冒号)
is_ip_format = all(c.isdigit() or c in '.:-[]' for c in host)
if is_ip_format:
# 如果是 IP 格式,检查是否落在私网段
if _ip_in_nets(host):
GuardLogger.block(method, url, "私网IP地址")
raise PermissionError(f"[Guard] 禁止访问私网/危险地址:{url}")
else:
# 如果是域名,解析后检查是否指向私网
if _hostname_resolves_to_private(host):
GuardLogger.block(method, url, "域名解析到私网")
raise PermissionError(f"[Guard] 禁止访问私网/危险地址(域名解析):{url}")
except PermissionError:
raise # 重新抛出 PermissionError
except Exception as e:
# 其他异常(如 DNS 解析异常)允许通过,仅记录警告
GuardLogger.audit("WARN", f"地址检查异常(已允许): {url} - {str(e)}")
# ===== 初始化和全局补丁应用 =====
def apply_all_patches():
"""应用所有网络库的补丁"""
print("[Guard] 正在初始化网络请求安全卫士...")
patches_applied = []
# 应用 requests 补丁
if _patch_requests():
patches_applied.append("requests")
# 应用 urllib 补丁
if _patch_urllib():
patches_applied.append("urllib")
if patches_applied:
msg = f"[Guard] 成功应用 {len(patches_applied)} 个网络库补丁: {', '.join(patches_applied)}"
GuardLogger.audit("INFO", msg)
else:
GuardLogger.audit("WARN", "[Guard] 没有可用的网络库可以补丁")
# ===== 模块初始化 =====
# 在模块加载时自动应用所有补丁
apply_all_patches()
# 暴露公共接口
__all__ = [
'GuardLogger',
'apply_all_patches',
'PRIVATE_NETS',
'DANGEROUS_PORTS',
]