fix(parser): harden runtime resource handling

This commit is contained in:
yukaidi
2026-06-10 21:19:56 +08:00
parent 46c6827eda
commit b6b7f0d8b7
14 changed files with 1498 additions and 455 deletions

View File

@@ -7,12 +7,15 @@ import org.slf4j.LoggerFactory;
import cn.qaiu.parser.custom.CustomParserRegistry;
public class WebClientVertxInit {
private Vertx vertx = null;
private volatile Vertx vertx = null;
private static final WebClientVertxInit INSTANCE = new WebClientVertxInit();
private static final Logger log = LoggerFactory.getLogger(WebClientVertxInit.class);
public static void init(Vertx vx) {
public static synchronized void init(Vertx vx) {
if (vx == null) {
throw new IllegalArgumentException("Vertx instance must not be null");
}
INSTANCE.vertx = vx;
// 自动加载JavaScript解析器脚本
@@ -23,18 +26,10 @@ public class WebClientVertxInit {
}
}
public static Vertx get() {
public static synchronized Vertx get() {
if (INSTANCE.vertx == null) {
log.info("getVertx: Vertx实例不存在, 创建Vertx实例.");
INSTANCE.vertx = Vertx.vertx();
// 如果Vertx实例是新创建的也尝试加载JavaScript脚本
try {
CustomParserRegistry.autoLoadJsScripts();
} catch (Exception e) {
log.warn("自动加载JavaScript解析器脚本失败", e);
}
throw new IllegalStateException("Vertx实例未初始化,请先调用 WebClientVertxInit.init(vertx)");
}
return INSTANCE.vertx;
}
}
}

View File

@@ -1,5 +1,6 @@
package cn.qaiu.parser;//package cn.qaiu.lz.common.parser;
import cn.qaiu.WebClientVertxInit;
import cn.qaiu.entity.FileInfo;
import cn.qaiu.entity.ShareLinkInfo;
import cn.qaiu.parser.clientlink.ClientLinkGeneratorFactory;
@@ -7,10 +8,26 @@ import cn.qaiu.parser.clientlink.ClientLinkType;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import java.util.function.Supplier;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
public interface IPanTool {
public interface IPanTool extends AutoCloseable {
/** 同步等待超时时间(秒) */
long SYNC_TIMEOUT_SECONDS = 120;
ScheduledExecutorService CLOSE_AFTER_SCHEDULER = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "pan-tool-close-after");
t.setDaemon(true);
return t;
});
/**
* 解析文件
@@ -18,8 +35,72 @@ public interface IPanTool {
*/
Future<String> parse();
static <T> Future<T> closeAfter(IPanTool tool, Supplier<Future<T>> action) {
Promise<T> promise = Promise.promise();
AtomicBoolean cleanupDone = new AtomicBoolean(false);
ScheduledFuture<?> cleanupTask = null;
try {
Future<T> future = action.get();
if (future == null) {
closeQuietly(tool);
return Future.failedFuture("解析器返回空 Future");
}
cleanupTask = CLOSE_AFTER_SCHEDULER.schedule(() -> {
if (cleanupDone.compareAndSet(false, true)) {
closeQuietly(tool);
failOnVertxContext(promise, "解析超时(" + SYNC_TIMEOUT_SECONDS + "秒)");
}
}, SYNC_TIMEOUT_SECONDS, TimeUnit.SECONDS);
ScheduledFuture<?> scheduledCleanupTask = cleanupTask;
future.onComplete(ar -> {
scheduledCleanupTask.cancel(false);
if (!cleanupDone.compareAndSet(false, true)) {
return;
}
closeQuietly(tool);
if (ar.succeeded()) {
promise.tryComplete(ar.result());
} else {
promise.tryFail(ar.cause());
}
});
return promise.future();
} catch (Throwable t) {
if (cleanupTask != null) {
cleanupTask.cancel(false);
}
closeQuietly(tool);
return Future.failedFuture(t);
}
}
private static <T> void failOnVertxContext(Promise<T> promise, String message) {
try {
WebClientVertxInit.get().runOnContext(ignored -> promise.tryFail(message));
} catch (Exception ignored) {
promise.tryFail(message);
}
}
static void closeQuietly(IPanTool tool) {
if (tool == null) {
return;
}
try {
tool.close();
} catch (Exception ignored) {
// ignore cleanup failures
}
}
static void shutdownCloseAfterScheduler() {
CLOSE_AFTER_SCHEDULER.shutdownNow();
}
default String parseSync() {
return parse().toCompletionStage().toCompletableFuture().join();
return timedJoin(parse());
}
/**
@@ -33,7 +114,7 @@ public interface IPanTool {
}
default List<FileInfo> parseFileListSync() {
return parseFileList().toCompletionStage().toCompletableFuture().join();
return timedJoin(parseFileList());
}
/**
@@ -47,7 +128,7 @@ public interface IPanTool {
}
default String parseByIdSync() {
return parseById().toCompletionStage().toCompletableFuture().join();
return timedJoin(parseById());
}
/**
@@ -126,7 +207,7 @@ public interface IPanTool {
* @return Map<ClientLinkType, String> 客户端下载链接集合
*/
default Map<ClientLinkType, String> parseWithClientLinksSync() {
return parseWithClientLinks().toCompletionStage().toCompletableFuture().join();
return timedJoin(parseWithClientLinks());
}
/**
@@ -137,4 +218,26 @@ public interface IPanTool {
default ShareLinkInfo getShareLinkInfo() {
return null;
}
@Override
default void close() {
// default no-op
}
/**
* 带超时的同步等待工具方法,替代无超时的 join()
*/
private static <T> T timedJoin(Future<T> future) {
try {
return future.toCompletionStage().toCompletableFuture()
.get(SYNC_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("线程被中断", e);
} catch (TimeoutException e) {
throw new RuntimeException("同步等待超时(" + SYNC_TIMEOUT_SECONDS + "秒)", e);
} catch (java.util.concurrent.ExecutionException e) {
throw new RuntimeException(e.getCause() != null ? e.getCause() : e);
}
}
}

View File

@@ -34,25 +34,37 @@ import java.util.zip.GZIPInputStream;
* <p>{网盘标识}Tool, 网盘标识不超过5个字符, 可以取网盘名称首字母缩写或拼音首字母, <br>
* 音乐类型的解析以M开头, 例如网易云音乐Mne</p>
*/
public abstract class PanBase implements IPanTool {
public abstract class PanBase implements IPanTool, Closeable {
protected Logger log = LoggerFactory.getLogger(this.getClass());
protected Promise<String> promise = Promise.promise();
private static final int MAX_COMPRESSED_RESPONSE_BYTES = 8 * 1024 * 1024;
private static final int MAX_DECOMPRESSED_RESPONSE_CHARS = 16 * 1024 * 1024;
private static final int MAX_ERROR_BODY_CHARS = 4096;
/**
* 共享的 WebClient 配置(设置超时避免连接无限期占用)
*/
private static final WebClientOptions SHARED_OPTIONS = new WebClientOptions()
.setConnectTimeout(10000) // 连接超时 10 秒
.setIdleTimeout(30) // 空闲超时 30 秒
.setIdleTimeoutUnit(java.util.concurrent.TimeUnit.SECONDS);
private static final Object SHARED_CLIENT_LOCK = new Object();
/**
* 共享的 WebClient 实例(线程安全,避免每请求创建导致资源泄漏)
*/
private static final WebClient SHARED_CLIENT = WebClient.create(WebClientVertxInit.get(),
new WebClientOptions());
private static final WebClient SHARED_CLIENT_NO_REDIRECTS = WebClient.create(WebClientVertxInit.get(),
new WebClientOptions().setFollowRedirects(false));
private static final WebClient SHARED_CLIENT_DISABLE_UA = WebClient.create(WebClientVertxInit.get(),
new WebClientOptions().setUserAgentEnabled(false));
private static volatile WebClient sharedClient;
private static volatile WebClient sharedClientNoRedirects;
private static volatile WebClient sharedClientDisableUA;
private static volatile boolean sharedClientsShutdown = false;
/**
* Http client (默认使用共享实例,代理模式下使用独立实例)
*/
protected WebClient client = SHARED_CLIENT;
protected WebClient client = sharedClient();
/**
* Http client session (会话管理, 带cookie请求, 每实例独立)
@@ -62,14 +74,25 @@ public abstract class PanBase implements IPanTool {
/**
* Http client 不自动跳转
*/
protected WebClient clientNoRedirects = SHARED_CLIENT_NO_REDIRECTS;
protected WebClient clientNoRedirects = sharedClientNoRedirects();
/**
* Http client disable UserAgent
*/
protected WebClient clientDisableUA = SHARED_CLIENT_DISABLE_UA;
protected WebClient clientDisableUA = sharedClientDisableUA();
protected ShareLinkInfo shareLinkInfo;
/**
* 标记是否为代理模式(代理模式创建的 WebClient 需要手动关闭)
*/
private boolean isProxyMode = false;
/**
* 代理模式下创建的独立 WebClient 实例(需要在 close 时释放)
*/
private WebClient proxyClient = null;
private WebClient proxyClientNoRedirects = null;
/**
@@ -86,6 +109,7 @@ public abstract class PanBase implements IPanTool {
public PanBase(ShareLinkInfo shareLinkInfo) {
this.shareLinkInfo = shareLinkInfo;
if (shareLinkInfo.getOtherParam().containsKey("proxy")) {
this.isProxyMode = true;
JsonObject proxy = (JsonObject) shareLinkInfo.getOtherParam().get("proxy");
ProxyOptions proxyOptions = new ProxyOptions()
.setType(ProxyType.valueOf(proxy.getString("type").toUpperCase()))
@@ -97,22 +121,86 @@ public abstract class PanBase implements IPanTool {
if (StringUtils.isNotEmpty(proxy.getString("password"))) {
proxyOptions.setPassword(proxy.getString("password"));
}
this.client = WebClient.create(WebClientVertxInit.get(),
new WebClientOptions()
// 代理模式下创建独立的 WebClient 实例(应用超时配置)
this.proxyClient = WebClient.create(WebClientVertxInit.get(),
new WebClientOptions(SHARED_OPTIONS)
.setUserAgentEnabled(false)
.setProxyOptions(proxyOptions));
this.proxyClientNoRedirects = WebClient.create(WebClientVertxInit.get(),
new WebClientOptions(SHARED_OPTIONS).setFollowRedirects(false)
.setUserAgentEnabled(false)
.setProxyOptions(proxyOptions));
this.client = proxyClient;
this.clientSession = WebClientSession.create(client);
this.clientNoRedirects = WebClient.create(WebClientVertxInit.get(),
new WebClientOptions().setFollowRedirects(false)
.setUserAgentEnabled(false)
.setProxyOptions(proxyOptions));
this.clientNoRedirects = proxyClientNoRedirects;
}
}
protected PanBase() {
}
private static WebClient sharedClient() {
synchronized (SHARED_CLIENT_LOCK) {
if (sharedClientsShutdown) {
throw new IllegalStateException("共享 WebClient 已关闭");
}
if (sharedClient == null) {
sharedClient = WebClient.create(WebClientVertxInit.get(), new WebClientOptions(SHARED_OPTIONS));
}
return sharedClient;
}
}
private static WebClient sharedClientNoRedirects() {
synchronized (SHARED_CLIENT_LOCK) {
if (sharedClientsShutdown) {
throw new IllegalStateException("共享 WebClient 已关闭");
}
if (sharedClientNoRedirects == null) {
sharedClientNoRedirects = WebClient.create(WebClientVertxInit.get(),
new WebClientOptions(SHARED_OPTIONS).setFollowRedirects(false));
}
return sharedClientNoRedirects;
}
}
private static WebClient sharedClientDisableUA() {
synchronized (SHARED_CLIENT_LOCK) {
if (sharedClientsShutdown) {
throw new IllegalStateException("共享 WebClient 已关闭");
}
if (sharedClientDisableUA == null) {
sharedClientDisableUA = WebClient.create(WebClientVertxInit.get(),
new WebClientOptions(SHARED_OPTIONS).setUserAgentEnabled(false));
}
return sharedClientDisableUA;
}
}
public static void shutdownSharedClients() {
synchronized (SHARED_CLIENT_LOCK) {
sharedClientsShutdown = true;
closeSharedClient(sharedClient, "shared WebClient");
closeSharedClient(sharedClientNoRedirects, "shared WebClientNoRedirects");
closeSharedClient(sharedClientDisableUA, "shared WebClientDisableUA");
sharedClient = null;
sharedClientNoRedirects = null;
sharedClientDisableUA = null;
}
}
private static void closeSharedClient(WebClient client, String name) {
if (client == null) {
return;
}
try {
client.close();
} catch (Exception e) {
LoggerFactory.getLogger(PanBase.class).warn("关闭 {} 失败: {}", name, e.getMessage());
}
}
protected String baseMsg() {
if (shareLinkInfo.getShareUrl() != null) {
return shareLinkInfo.getPanName() + "-" + shareLinkInfo.getType() + ": url=" + shareLinkInfo.getShareUrl();
@@ -137,16 +225,19 @@ public abstract class PanBase implements IPanTool {
return;
}
String s = String.format(errorMsg.replaceAll("\\{}", "%s"), args);
log.error("解析异常: " + s, t.fillInStackTrace());
promise.fail(baseMsg() + ": 解析异常: " + s + " -> " + t);
// 只记录异常消息和类型,不调用 fillInStackTrace 避免产生巨大栈信息
log.error("解析异常: {} - {}: {}", s, t.getClass().getSimpleName(), t.getMessage());
// 只传递异常消息,不传递完整异常对象,减少内存占用
String failMsg = baseMsg() + ": 解析异常: " + s + " -> " + t.getClass().getSimpleName() + ": " + t.getMessage();
promise.fail(failMsg);
} catch (Exception e) {
log.error("ErrorMsg format fail. The parameter has been discarded", e);
log.error("解析异常: " + errorMsg, t.fillInStackTrace());
log.error("解析异常: {} - {}: {}", errorMsg, t.getClass().getSimpleName(), t.getMessage());
if (promise.future().isComplete()) {
log.warn("ErrorMsg format. Promise 已经完成, 无法再次失败: {}", errorMsg);
return;
}
promise.fail(baseMsg() + ": 解析异常: " + errorMsg + " -> " + t);
promise.fail(baseMsg() + ": 解析异常: " + errorMsg + " -> " + t.getClass().getSimpleName() + ": " + t.getMessage());
}
}
@@ -186,7 +277,7 @@ public abstract class PanBase implements IPanTool {
* @return Handler
*/
protected Handler<Throwable> handleFail(String errorMsg) {
return t -> fail(baseMsg() + " - 请求异常 {}: -> {}", errorMsg, t.fillInStackTrace());
return t -> fail(baseMsg() + " - 请求异常 {}: -> {}", errorMsg, t.getClass().getSimpleName() + ": " + t.getMessage());
}
protected Handler<Throwable> handleFail() {
@@ -205,28 +296,22 @@ public abstract class PanBase implements IPanTool {
String contentEncoding = res.getHeader("Content-Encoding");
try {
if ("gzip".equalsIgnoreCase(contentEncoding)) {
// 如果是gzip压缩的响应体解压
return new JsonObject(decompressGzip((Buffer) res.body()));
// 如果是gzip压缩的响应体解压(只解压一次,缓存结果)
String decompressed = decompressGzip((Buffer) res.body());
return new JsonObject(decompressed);
} else {
return res.bodyAsJsonObject();
}
} catch (Exception e) {
if ("gzip".equalsIgnoreCase(contentEncoding)) {
// 如果是gzip压缩的响应体,解压
try {
log.error(decompressGzip((Buffer) res.body()));
fail(decompressGzip((Buffer) res.body()));
//throw new RuntimeException("响应不是JSON格式");
} catch (IOException ex) {
log.error("响应gzip解压失败");
fail("响应gzip解压失败: {}", ex.getMessage());
//throw new RuntimeException("响应gzip解压失败", ex);
}
// gzip解压失败,记录错误
log.error("响应gzip解压或JSON解析失败: {}", e.getMessage());
fail("响应gzip解压或JSON解析失败: {}", e.getMessage());
} else {
log.error("解析失败: json格式异常: {}", res.bodyAsString());
fail("解析失败: json格式异常: {}", res.bodyAsString());
//throw new RuntimeException("解析失败: json格式异常");
String bodyPreview = responseBodyPreview(res);
log.error("解析失败: json格式异常: {}", bodyPreview);
fail("解析失败: json格式异常: {}", bodyPreview);
}
return JsonObject.of();
}
@@ -289,11 +374,15 @@ public abstract class PanBase implements IPanTool {
if (iterator.hasNext()) {
PanDomainTemplate next = iterator.next();
log.debug("规则不匹配, 处理解析器转发: {} -> {}", shareLinkInfo.getPanName(), next.getDisplayName());
ParserCreate.fromType(next.name())
.fromAnyShareUrl(shareLinkInfo.getShareUrl())
.createTool()
.parse()
.onComplete(promise);
try {
IPanTool nextTool = ParserCreate.fromType(next.name())
.fromAnyShareUrl(shareLinkInfo.getShareUrl())
.createTool();
IPanTool.closeAfter(nextTool, nextTool::parse)
.onComplete(promise);
} catch (Exception e) {
fail(e, "转发到下一个解析器失败: {}", next.getDisplayName());
}
} else {
fail("error: 没有下一个解析处理器");
}
@@ -309,6 +398,12 @@ public abstract class PanBase implements IPanTool {
* @throws IOException IOException
*/
private String decompressGzip(Buffer compressedData) throws IOException {
if (compressedData == null) {
return "";
}
if (compressedData.length() > MAX_COMPRESSED_RESPONSE_BYTES) {
throw new IOException("gzip响应体过大: " + compressedData.length() + " bytes");
}
try (ByteArrayInputStream bais = new ByteArrayInputStream(compressedData.getBytes());
GZIPInputStream gzis = new GZIPInputStream(bais);
InputStreamReader isr = new InputStreamReader(gzis, StandardCharsets.UTF_8);
@@ -317,12 +412,39 @@ public abstract class PanBase implements IPanTool {
char[] buffer = new char[4096];
int n;
while ((n = isr.read(buffer)) != -1) {
writer.write(buffer, 0, n);
writeLimited(writer, buffer, n);
}
return writer.toString();
}
}
private void writeLimited(StringWriter writer, char[] buffer, int len) throws IOException {
if (writer.getBuffer().length() + len > MAX_DECOMPRESSED_RESPONSE_CHARS) {
throw new IOException("gzip解压后响应体过大");
}
writer.write(buffer, 0, len);
}
private String responseBodyPreview(HttpResponse<?> res) {
if (res == null || res.body() == null) {
return "";
}
try {
if (res.body() instanceof Buffer body) {
int length = Math.min(body.length(), MAX_ERROR_BODY_CHARS);
String preview = new String(body.getBytes(0, length), StandardCharsets.UTF_8);
return body.length() > length ? preview + "...(truncated " + body.length() + " bytes)" : preview;
}
String text = res.bodyAsString();
if (text == null || text.length() <= MAX_ERROR_BODY_CHARS) {
return text;
}
return text.substring(0, MAX_ERROR_BODY_CHARS) + "...(truncated " + text.length() + " chars)";
} catch (Exception e) {
return "<body preview failed: " + e.getMessage() + ">";
}
}
protected String getDomainName(){
return shareLinkInfo.getOtherParam().getOrDefault("domainName", "").toString();
}
@@ -331,4 +453,28 @@ public abstract class PanBase implements IPanTool {
public ShareLinkInfo getShareLinkInfo() {
return shareLinkInfo;
}
/**
* 关闭代理模式下创建的 WebClient 资源
* 非代理模式使用共享实例,不需要关闭
*/
@Override
public void close() {
if (isProxyMode) {
try {
if (proxyClient != null) {
proxyClient.close();
}
} catch (Exception e) {
log.warn("关闭代理 WebClient 失败: {}", e.getMessage());
}
try {
if (proxyClientNoRedirects != null) {
proxyClientNoRedirects.close();
}
} catch (Exception e) {
log.warn("关闭代理 WebClientNoRedirects 失败: {}", e.getMessage());
}
}
}
}

View File

@@ -9,6 +9,8 @@ import org.apache.commons.lang3.StringUtils;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.EnumSet;
import java.util.Set;
import java.util.regex.Matcher;
import static cn.qaiu.parser.PanDomainTemplate.KEY;
@@ -23,6 +25,9 @@ import static cn.qaiu.parser.PanDomainTemplate.PWD;
* Create at 2024/9/15 14:10
*/
public class ParserCreate {
private static final Set<PanDomainTemplate> GENERIC_BUILT_IN_PARSERS =
EnumSet.of(PanDomainTemplate.CE, PanDomainTemplate.KD, PanDomainTemplate.OTHER);
private final PanDomainTemplate panDomainTemplate;
private final ShareLinkInfo shareLinkInfo;
@@ -132,12 +137,12 @@ public class ParserCreate {
if (StringUtils.isNotEmpty(pwd)) {
shareLinkInfo.setSharePassword(pwd);
}
standardUrl = standardUrl.replace("{pwd}", pwd);
standardUrl = standardUrl.replace("{pwd}", StringUtils.defaultString(pwd));
} catch (IllegalStateException | IllegalArgumentException ignored) {}
shareLinkInfo.setShareUrl(shareUrl);
shareLinkInfo.setShareKey(shareKey);
if (!(panDomainTemplate.ordinal() >= PanDomainTemplate.CE.ordinal())) {
if (!isGenericBuiltInParser(panDomainTemplate)) {
shareLinkInfo.setStandardUrl(standardUrl);
}
return this;
@@ -197,7 +202,7 @@ public class ParserCreate {
}
// 内置解析器处理
if (panDomainTemplate.ordinal() >= PanDomainTemplate.CE.ordinal()) {
if (isGenericBuiltInParser(panDomainTemplate)) {
// 处理Cloudreve(ce)类: pan.huang1111.cn_s_wDz5TK _ -> /
String[] s = shareKey.split("_");
String standardUrl = "https://" + String.join("/", s);
@@ -247,9 +252,19 @@ public class ParserCreate {
return this;
}
// 根据分享链接获取PanDomainTemplate实例(优先匹配自定义解析器)
// 根据分享链接获取PanDomainTemplate实例
public synchronized static ParserCreate fromShareUrl(String shareUrl) {
// 优先查找支持正则匹配的自定义解析器
if (StringUtils.isBlank(shareUrl)) {
throw new IllegalArgumentException("shareUrl不能为空");
}
shareUrl = shareUrl.trim();
ParserCreate builtInParser = fromBuiltInShareUrl(shareUrl, false);
if (builtInParser != null) {
return builtInParser;
}
// 明确内置解析器未命中时,再查找支持正则匹配的自定义解析器
for (CustomParserConfig customConfig : CustomParserRegistry.getAll().values()) {
if (customConfig.supportsFromShareUrl()) {
Matcher matcher = customConfig.getMatchPattern().matcher(shareUrl);
@@ -295,22 +310,35 @@ public class ParserCreate {
}
}
}
// 查找内置解析器
// 最后再走 Cloudreve/可道云/其他网盘这类泛化兜底,避免抢走自定义解析器
builtInParser = fromBuiltInShareUrl(shareUrl, true);
if (builtInParser != null) {
return builtInParser;
}
throw new IllegalArgumentException("Unsupported share URL");
}
private static ParserCreate fromBuiltInShareUrl(String shareUrl, boolean genericOnly) {
for (PanDomainTemplate panDomainTemplate : PanDomainTemplate.values()) {
boolean genericParser = isGenericBuiltInParser(panDomainTemplate);
if (genericOnly != genericParser) {
continue;
}
if (panDomainTemplate.getPattern().matcher(shareUrl).matches()) {
ShareLinkInfo shareLinkInfo = ShareLinkInfo.newBuilder()
.type(panDomainTemplate.name().toLowerCase())
.panName(panDomainTemplate.getDisplayName())
.shareUrl(shareUrl).build();
if (panDomainTemplate.ordinal() >= PanDomainTemplate.CE.ordinal()) {
if (isGenericBuiltInParser(panDomainTemplate)) {
shareLinkInfo.setStandardUrl(shareUrl);
}
ParserCreate parserCreate = new ParserCreate(panDomainTemplate, shareLinkInfo);
return parserCreate.normalizeShareLink();
}
}
throw new IllegalArgumentException("Unsupported share URL");
return null;
}
// 根据type获取枚举实例优先查找自定义解析器
@@ -353,7 +381,7 @@ public class ParserCreate {
// 自定义解析器处理
if (isCustomParser) {
path = this.shareLinkInfo.getType() + "/" + this.shareLinkInfo.getShareKey();
} else if (panDomainTemplate.ordinal() >= PanDomainTemplate.CE.ordinal()) {
} else if (isGenericBuiltInParser(panDomainTemplate)) {
// 处理Cloudreve(ce)类: pan.huang1111.cn_s_wDz5TK _ -> /
path = this.shareLinkInfo.getType() + "/"
+ this.shareLinkInfo.getShareUrl()
@@ -381,7 +409,11 @@ public class ParserCreate {
public CustomParserConfig getCustomParserConfig() {
return customParserConfig;
}
private static boolean isGenericBuiltInParser(PanDomainTemplate panDomainTemplate) {
return GENERIC_BUILT_IN_PARSERS.contains(panDomainTemplate);
}
/**
* 获取内置解析器模板仅当isCustomParser为false时有效
* @return 内置解析器模板如果是自定义解析器则返回null

View File

@@ -21,6 +21,7 @@ import java.util.concurrent.ConcurrentHashMap;
public class CustomParserRegistry {
private static final Logger log = LoggerFactory.getLogger(CustomParserRegistry.class);
private static final int MAX_CUSTOM_PARSERS = Integer.getInteger("parser.custom.maxRegistrySize", 256);
/**
* 存储自定义解析器配置的Mapkey为类型标识value为配置对象
@@ -33,7 +34,7 @@ public class CustomParserRegistry {
* @param config 解析器配置
* @throws IllegalArgumentException 如果type已存在或与内置解析器冲突
*/
public static void register(CustomParserConfig config) {
public static synchronized void register(CustomParserConfig config) {
if (config == null) {
throw new IllegalArgumentException("config不能为空");
}
@@ -59,6 +60,11 @@ public class CustomParserRegistry {
"类型标识 '" + type + "' 已被注册,请先注销或使用其他标识"
);
}
if (CUSTOM_PARSERS.size() >= MAX_CUSTOM_PARSERS) {
throw new IllegalArgumentException(
"自定义解析器数量已达到上限(" + MAX_CUSTOM_PARSERS + "个),请先注销不需要的解析器"
);
}
CUSTOM_PARSERS.put(type, config);
log.info("注册自定义解析器成功: {} ({})", config.getDisplayName(), type);
@@ -171,7 +177,7 @@ public class CustomParserRegistry {
* @param type 解析器类型标识
* @return 是否注销成功
*/
public static boolean unregister(String type) {
public static synchronized boolean unregister(String type) {
if (type == null || type.trim().isEmpty()) {
return false;
}
@@ -213,7 +219,7 @@ public class CustomParserRegistry {
/**
* 清空所有自定义解析器
*/
public static void clear() {
public static synchronized void clear() {
CUSTOM_PARSERS.clear();
}

View File

@@ -2,19 +2,21 @@ package cn.qaiu.parser.customjs;
import cn.qaiu.WebClientVertxInit;
import cn.qaiu.util.HttpResponseHelper;
import io.vertx.core.Future;
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.RequestOptions;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.ProxyOptions;
import io.vertx.core.net.ProxyType;
import io.vertx.ext.web.client.HttpRequest;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.client.WebClientOptions;
import io.vertx.ext.web.client.WebClientSession;
import io.vertx.ext.web.multipart.MultipartForm;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -27,8 +29,13 @@ import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
/**
@@ -39,11 +46,60 @@ import java.util.regex.Pattern;
* Create at 2025/10/17
*/
public class JsHttpClient {
private static final Logger log = LoggerFactory.getLogger(JsHttpClient.class);
private final WebClient client;
private final WebClientSession clientSession;
private static final int MAX_RESPONSE_BODY_BYTES = 8 * 1024 * 1024;
private static final int MAX_REQUEST_BODY_BYTES = 8 * 1024 * 1024;
private static final int MAX_HEADER_COUNT = 64;
private static final int MAX_HEADER_VALUE_LENGTH = 4096;
private static final int MAX_TIMEOUT_SECONDS = 120;
private static final int MAX_REDIRECTS = 5;
private static final String DEFAULT_ACCEPT_ENCODING = "gzip, deflate, br";
private static final Object SHARED_CLIENT_LOCK = new Object();
// 共享 HttpClient 实例(非代理模式),懒加载避免类初始化阶段抢跑 Vert.x。
private static volatile HttpClient sharedClient;
private static volatile boolean sharedClientShutdown = false;
/**
* 关闭共享 HttpClient应用关闭时调用
*/
public static void shutdownSharedClient() {
synchronized (SHARED_CLIENT_LOCK) {
sharedClientShutdown = true;
if (sharedClient != null) {
sharedClient.close();
sharedClient = null;
}
}
}
private static HttpClient sharedClient() {
synchronized (SHARED_CLIENT_LOCK) {
ensureSharedClientAvailable();
if (sharedClient == null) {
sharedClient = WebClientVertxInit.get().createHttpClient(
new HttpClientOptions()
.setConnectTimeout(10000)
.setIdleTimeout(30)
.setIdleTimeoutUnit(TimeUnit.SECONDS)
.setMaxPoolSize(64));
}
return sharedClient;
}
}
private static void ensureSharedClientAvailable() {
if (sharedClientShutdown) {
throw new IllegalStateException("共享 JavaScript HttpClient 已关闭");
}
}
private final HttpClient client;
private final boolean ownClient; // 标记是否为自建 client需要 close
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Object requestLock = new Object();
private final Set<HttpClientRequest> activeRequests = ConcurrentHashMap.newKeySet();
private MultiMap headers;
private int timeoutSeconds = 30; // 默认超时时间30秒
@@ -61,11 +117,12 @@ public class JsHttpClient {
};
public JsHttpClient() {
this.client = WebClient.create(WebClientVertxInit.get(), new WebClientOptions());
this.clientSession = WebClientSession.create(client);
ensureSharedClientAvailable();
this.client = sharedClient();
this.ownClient = false;
this.headers = MultiMap.caseInsensitiveMultiMap();
// 设置默认的Accept-Encoding头以支持压缩响应
this.headers.set("Accept-Encoding", "gzip, deflate, br, zstd");
this.headers.set("Accept-Encoding", DEFAULT_ACCEPT_ENCODING);
// 设置默认的User-Agent头
this.headers.set("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36 Edg/140.0.0.0");
// 设置默认的Accept-Language头
@@ -77,31 +134,35 @@ public class JsHttpClient {
* @param proxyConfig 代理配置JsonObject包含type、host、port、username、password
*/
public JsHttpClient(JsonObject proxyConfig) {
ensureSharedClientAvailable();
if (proxyConfig != null && proxyConfig.containsKey("type")) {
ProxyOptions proxyOptions = new ProxyOptions()
.setType(ProxyType.valueOf(proxyConfig.getString("type").toUpperCase()))
.setHost(proxyConfig.getString("host"))
.setPort(proxyConfig.getInteger("port"));
if (StringUtils.isNotEmpty(proxyConfig.getString("username"))) {
proxyOptions.setUsername(proxyConfig.getString("username"));
}
if (StringUtils.isNotEmpty(proxyConfig.getString("password"))) {
proxyOptions.setPassword(proxyConfig.getString("password"));
}
this.client = WebClient.create(WebClientVertxInit.get(),
new WebClientOptions()
.setUserAgentEnabled(false)
this.client = WebClientVertxInit.get().createHttpClient(
new HttpClientOptions()
.setConnectTimeout(10000)
.setIdleTimeout(30)
.setIdleTimeoutUnit(TimeUnit.SECONDS)
.setMaxPoolSize(16)
.setProxyOptions(proxyOptions));
this.clientSession = WebClientSession.create(client);
this.ownClient = true;
} else {
this.client = WebClient.create(WebClientVertxInit.get());
this.clientSession = WebClientSession.create(client);
this.client = sharedClient();
this.ownClient = false;
}
this.headers = MultiMap.caseInsensitiveMultiMap();
// 设置默认的Accept-Encoding头以支持压缩响应
this.headers.set("Accept-Encoding", "gzip, deflate, br, zstd");
this.headers.set("Accept-Encoding", DEFAULT_ACCEPT_ENCODING);
// 设置默认的User-Agent头
this.headers.set("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36 Edg/140.0.0.0");
// 设置默认的Accept-Language头
@@ -183,13 +244,7 @@ public class JsHttpClient {
*/
public JsHttpResponse get(String url) {
validateUrlSecurity(url);
return executeRequest(() -> {
HttpRequest<Buffer> request = client.getAbs(url);
if (!headers.isEmpty()) {
request.putHeaders(headers);
}
return request.send();
});
return executeRequest(HttpMethod.GET, url, null, false);
}
/**
@@ -198,16 +253,25 @@ public class JsHttpClient {
* @return HTTP响应
*/
public JsHttpResponse getWithRedirect(String url) {
validateUrlSecurity(url);
return executeRequest(() -> {
HttpRequest<Buffer> request = client.getAbs(url);
if (!headers.isEmpty()) {
request.putHeaders(headers);
String currentUrl = url;
for (int redirectCount = 0; redirectCount <= MAX_REDIRECTS; redirectCount++) {
validateUrlSecurity(currentUrl);
JsHttpResponse response = executeRequest(HttpMethod.GET, currentUrl, null, false);
if (!isRedirectStatus(response.statusCode())) {
return response;
}
// 设置跟随重定向
request.followRedirects(true);
return request.send();
});
if (redirectCount == MAX_REDIRECTS) {
throw new RuntimeException("重定向次数超过限制: " + MAX_REDIRECTS);
}
String location = response.header(HttpHeaders.LOCATION.toString());
if (StringUtils.isBlank(location)) {
throw new RuntimeException("重定向响应缺少Location头");
}
currentUrl = resolveRedirectUrl(currentUrl, location);
}
throw new RuntimeException("重定向处理失败");
}
/**
@@ -217,15 +281,7 @@ public class JsHttpClient {
*/
public JsHttpResponse getNoRedirect(String url) {
validateUrlSecurity(url);
return executeRequest(() -> {
HttpRequest<Buffer> request = client.getAbs(url);
if (!headers.isEmpty()) {
request.putHeaders(headers);
}
// 设置不跟随重定向
request.followRedirects(false);
return request.send();
});
return executeRequest(HttpMethod.GET, url, null, false);
}
/**
@@ -236,26 +292,7 @@ public class JsHttpClient {
*/
public JsHttpResponse post(String url, Object data) {
validateUrlSecurity(url);
return executeRequest(() -> {
HttpRequest<Buffer> request = client.postAbs(url);
if (!headers.isEmpty()) {
request.putHeaders(headers);
}
if (data != null) {
if (data instanceof String) {
return request.sendBuffer(Buffer.buffer((String) data));
} else if (data instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, String> mapData = (Map<String, String>) data;
return request.sendForm(MultiMap.caseInsensitiveMultiMap().addAll(mapData));
} else {
return request.sendJson(data);
}
} else {
return request.send();
}
});
return executeRequest(HttpMethod.POST, url, bodyFromData(data), false);
}
/**
@@ -266,26 +303,7 @@ public class JsHttpClient {
*/
public JsHttpResponse put(String url, Object data) {
validateUrlSecurity(url);
return executeRequest(() -> {
HttpRequest<Buffer> request = client.putAbs(url);
if (!headers.isEmpty()) {
request.putHeaders(headers);
}
if (data != null) {
if (data instanceof String) {
return request.sendBuffer(Buffer.buffer((String) data));
} else if (data instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, String> mapData = (Map<String, String>) data;
return request.sendForm(MultiMap.caseInsensitiveMultiMap().addAll(mapData));
} else {
return request.sendJson(data);
}
} else {
return request.send();
}
});
return executeRequest(HttpMethod.PUT, url, bodyFromData(data), false);
}
/**
@@ -294,13 +312,8 @@ public class JsHttpClient {
* @return HTTP响应
*/
public JsHttpResponse delete(String url) {
return executeRequest(() -> {
HttpRequest<Buffer> request = client.deleteAbs(url);
if (!headers.isEmpty()) {
request.putHeaders(headers);
}
return request.send();
});
validateUrlSecurity(url);
return executeRequest(HttpMethod.DELETE, url, null, false);
}
/**
@@ -310,26 +323,8 @@ public class JsHttpClient {
* @return HTTP响应
*/
public JsHttpResponse patch(String url, Object data) {
return executeRequest(() -> {
HttpRequest<Buffer> request = client.patchAbs(url);
if (!headers.isEmpty()) {
request.putHeaders(headers);
}
if (data != null) {
if (data instanceof String) {
return request.sendBuffer(Buffer.buffer((String) data));
} else if (data instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, String> mapData = (Map<String, String>) data;
return request.sendForm(MultiMap.caseInsensitiveMultiMap().addAll(mapData));
} else {
return request.sendJson(data);
}
} else {
return request.send();
}
});
validateUrlSecurity(url);
return executeRequest(HttpMethod.PATCH, url, bodyFromData(data), false);
}
/**
@@ -340,6 +335,12 @@ public class JsHttpClient {
*/
public JsHttpClient putHeader(String name, String value) {
if (name != null && value != null) {
if (headers.size() >= MAX_HEADER_COUNT && !headers.contains(name)) {
throw new IllegalArgumentException("请求头数量超过限制");
}
if (value.length() > MAX_HEADER_VALUE_LENGTH) {
throw new IllegalArgumentException("请求头过长: " + name);
}
headers.set(name, value);
}
return this;
@@ -353,9 +354,7 @@ public class JsHttpClient {
public JsHttpClient putHeaders(Map<String, String> headersMap) {
if (headersMap != null) {
for (Map.Entry<String, String> entry : headersMap.entrySet()) {
if (entry.getKey() != null && entry.getValue() != null) {
headers.set(entry.getKey(), entry.getValue());
}
putHeader(entry.getKey(), entry.getValue());
}
}
return this;
@@ -380,7 +379,7 @@ public class JsHttpClient {
public JsHttpClient clearHeaders() {
headers.clear();
// 重新设置默认头
headers.set("Accept-Encoding", "gzip, deflate, br, zstd");
headers.set("Accept-Encoding", DEFAULT_ACCEPT_ENCODING);
headers.set("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36 Edg/140.0.0.0");
headers.set("Accept-Language", "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6");
return this;
@@ -405,7 +404,7 @@ public class JsHttpClient {
*/
public JsHttpClient setTimeout(int seconds) {
if (seconds > 0) {
this.timeoutSeconds = seconds;
this.timeoutSeconds = Math.min(seconds, MAX_TIMEOUT_SECONDS);
}
return this;
}
@@ -450,19 +449,12 @@ public class JsHttpClient {
* @return HTTP响应
*/
public JsHttpResponse sendForm(Map<String, String> data) {
return executeRequest(() -> {
HttpRequest<Buffer> request = client.postAbs("");
if (!headers.isEmpty()) {
request.putHeaders(headers);
}
MultiMap formData = MultiMap.caseInsensitiveMultiMap();
if (data != null) {
formData.addAll(data);
}
return request.sendForm(formData);
});
throw new IllegalArgumentException("sendForm(data) 缺少请求URL请使用 post(url, data)");
}
public JsHttpResponse sendForm(String url, Map<String, String> data) {
validateUrlSecurity(url);
return executeRequest(HttpMethod.POST, url, formBody(data), false);
}
/**
@@ -474,34 +466,8 @@ public class JsHttpClient {
* @return HTTP响应
*/
public JsHttpResponse sendMultipartForm(String url, Map<String, Object> data) {
return executeRequest(() -> {
HttpRequest<Buffer> request = client.postAbs(url);
if (!headers.isEmpty()) {
request.putHeaders(headers);
}
MultipartForm form = MultipartForm.create();
if (data != null) {
for (Map.Entry<String, Object> entry : data.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
if (value instanceof String) {
form.attribute(key, (String) value);
} else if (value instanceof byte[]) {
form.binaryFileUpload(key, key, Buffer.buffer((byte[]) value), "application/octet-stream");
} else if (value instanceof Buffer) {
form.binaryFileUpload(key, key, (Buffer) value, "application/octet-stream");
} else if (value != null) {
// 其他类型转换为字符串
form.attribute(key, value.toString());
}
}
}
return request.sendMultipartForm(form);
});
validateUrlSecurity(url);
return executeRequest(HttpMethod.POST, url, multipartBody(data), false);
}
/**
@@ -510,44 +476,103 @@ public class JsHttpClient {
* @return HTTP响应
*/
public JsHttpResponse sendJson(Object data) {
return executeRequest(() -> {
HttpRequest<Buffer> request = client.postAbs("");
if (!headers.isEmpty()) {
request.putHeaders(headers);
}
return request.sendJson(data);
});
throw new IllegalArgumentException("sendJson(data) 缺少请求URL请使用 post(url, data)");
}
public JsHttpResponse sendJson(String url, Object data) {
validateUrlSecurity(url);
return executeRequest(HttpMethod.POST, url, jsonBody(data), false);
}
/**
* 执行HTTP请求同步
*/
private JsHttpResponse executeRequest(RequestExecutor executor) {
private JsHttpResponse executeRequest(HttpMethod method, String url, RequestBody requestBody, boolean followRedirects) {
if (closed.get()) {
throw new IllegalStateException("HTTP客户端已关闭");
}
AtomicReference<HttpClientRequest> requestRef = new AtomicReference<>();
AtomicBoolean abandoned = new AtomicBoolean(false);
try {
Promise<HttpResponse<Buffer>> promise = Promise.promise();
Future<HttpResponse<Buffer>> future = executor.execute();
future.onComplete(result -> {
if (result.succeeded()) {
promise.complete(result.result());
} else {
promise.fail(result.cause());
}
}).onFailure(e -> log.error("HTTP请求失败", e));
Promise<JsHttpResponse> promise = Promise.promise();
// 等待响应完成(使用配置的超时时间)
HttpResponse<Buffer> response = promise.future().toCompletionStage()
RequestOptions options = new RequestOptions()
.setMethod(method)
.setAbsoluteURI(url)
.setFollowRedirects(followRedirects)
.setTimeout(TimeUnit.SECONDS.toMillis(timeoutSeconds))
.setHeaders(MultiMap.caseInsensitiveMultiMap().setAll(headers));
client.request(options).onComplete(ar -> {
if (ar.failed()) {
promise.tryFail(ar.cause());
return;
}
HttpClientRequest request = ar.result();
synchronized (requestLock) {
if (closed.get() || abandoned.get()) {
request.reset();
promise.tryFail("HTTP客户端已关闭");
return;
}
activeRequests.add(request);
requestRef.set(request);
request.exceptionHandler(e -> {
finishRequest(request);
promise.tryFail(e);
});
request.response().onComplete(responseAr -> {
if (responseAr.succeeded()) {
collectResponse(request, responseAr.result(), promise);
} else {
finishRequest(request);
promise.tryFail(responseAr.cause());
}
});
if (closed.get() || abandoned.get()) {
request.reset();
finishRequest(request);
promise.tryFail("HTTP客户端已关闭");
return;
}
if (requestBody == null || requestBody.body() == null) {
request.end().onFailure(e -> {
finishRequest(request);
promise.tryFail(e);
});
} else {
request.headers().set(HttpHeaders.CONTENT_LENGTH, String.valueOf(requestBody.body().length()));
if (StringUtils.isNotEmpty(requestBody.contentType())) {
request.headers().set(HttpHeaders.CONTENT_TYPE, requestBody.contentType());
}
request.end(requestBody.body()).onFailure(e -> {
finishRequest(request);
promise.tryFail(e);
});
}
}
});
return promise.future().toCompletionStage()
.toCompletableFuture()
.get(timeoutSeconds, TimeUnit.SECONDS);
return new JsHttpResponse(response);
} catch (TimeoutException e) {
// RequestOptions timeout 通常会先触发;这里再兜底,避免等待线程返回后请求还在后台下载。
String errorMsg = "HTTP请求超时" + timeoutSeconds + "秒)";
abandoned.set(true);
synchronized (requestLock) {
abortRequest(requestRef);
}
log.error(errorMsg, e);
throw new RuntimeException(errorMsg, e);
} catch (Exception e) {
abandoned.set(true);
synchronized (requestLock) {
abortRequest(requestRef);
}
String errorMsg = e.getMessage();
if (errorMsg == null || errorMsg.trim().isEmpty()) {
errorMsg = e.getClass().getSimpleName();
@@ -559,13 +584,196 @@ public class JsHttpClient {
throw new RuntimeException("HTTP请求执行失败: " + errorMsg, e);
}
}
/**
* 请求执行器接口
*/
@FunctionalInterface
private interface RequestExecutor {
Future<HttpResponse<Buffer>> execute();
private static boolean isRedirectStatus(int statusCode) {
return statusCode == 301 || statusCode == 302 || statusCode == 303
|| statusCode == 307 || statusCode == 308;
}
private String resolveRedirectUrl(String currentUrl, String location) {
try {
URI redirectUri = new URI(currentUrl).resolve(location.trim());
String scheme = redirectUri.getScheme();
if (!"http".equalsIgnoreCase(scheme) && !"https".equalsIgnoreCase(scheme)) {
throw new SecurityException("🔒 安全拦截: 重定向协议不被允许");
}
String redirectUrl = redirectUri.toString();
validateUrlSecurity(redirectUrl);
return redirectUrl;
} catch (SecurityException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException("解析重定向地址失败: " + e.getMessage(), e);
}
}
private void collectResponse(HttpClientRequest request, HttpClientResponse response, Promise<JsHttpResponse> promise) {
Buffer body = Buffer.buffer();
AtomicBoolean done = new AtomicBoolean(false);
String contentLengthHeader = response.getHeader(HttpHeaders.CONTENT_LENGTH.toString());
if (StringUtils.isNumeric(contentLengthHeader)) {
long contentLength = Long.parseLong(contentLengthHeader);
if (contentLength > MAX_RESPONSE_BODY_BYTES) {
done.set(true);
request.reset();
finishRequest(request);
promise.tryFail("响应体过大: " + contentLength + " bytes");
return;
}
}
response.exceptionHandler(e -> {
if (done.compareAndSet(false, true)) {
finishRequest(request);
promise.tryFail(e);
}
});
response.handler(chunk -> {
if (done.get()) {
return;
}
if (body.length() + chunk.length() > MAX_RESPONSE_BODY_BYTES) {
if (done.compareAndSet(false, true)) {
request.reset();
finishRequest(request);
promise.tryFail("响应体过大: " + (body.length() + chunk.length()) + " bytes");
}
return;
}
body.appendBuffer(chunk);
});
response.endHandler(v -> {
if (done.compareAndSet(false, true)) {
finishRequest(request);
promise.tryComplete(new JsHttpResponse(
response.statusCode(),
MultiMap.caseInsensitiveMultiMap().setAll(response.headers()),
body,
response.statusMessage(),
null
));
}
});
response.resume();
}
private void finishRequest(HttpClientRequest request) {
if (request != null) {
activeRequests.remove(request);
}
}
private void abortRequest(AtomicReference<HttpClientRequest> requestRef) {
HttpClientRequest request = requestRef.get();
if (request != null) {
try {
request.reset();
} finally {
finishRequest(request);
}
}
}
private RequestBody bodyFromData(Object data) {
if (data == null) {
return null;
}
if (data instanceof String str) {
return plainTextBody(str);
}
if (data instanceof Buffer buffer) {
return limitedBody(buffer, null);
}
if (data instanceof byte[] bytes) {
return limitedBody(Buffer.buffer(bytes), null);
}
if (data instanceof Map<?, ?> map) {
Map<String, String> formMap = new HashMap<>();
map.forEach((key, value) -> {
if (key != null && value != null) {
formMap.put(String.valueOf(key), String.valueOf(value));
}
});
return formBody(formMap);
}
return jsonBody(data);
}
private RequestBody plainTextBody(String data) {
return limitedBody(Buffer.buffer(data, StandardCharsets.UTF_8.name()), null);
}
private RequestBody jsonBody(Object data) {
Buffer body = data == null ? Buffer.buffer() : Buffer.buffer(Json.encode(data), StandardCharsets.UTF_8.name());
return limitedBody(body, "application/json; charset=utf-8");
}
private RequestBody formBody(Map<String, String> data) {
StringBuilder encoded = new StringBuilder();
if (data != null) {
for (Map.Entry<String, String> entry : data.entrySet()) {
if (encoded.length() > 0) {
encoded.append('&');
}
encoded.append(urlEncode(entry.getKey()));
encoded.append('=');
encoded.append(urlEncode(entry.getValue()));
}
}
return limitedBody(Buffer.buffer(encoded.toString(), StandardCharsets.UTF_8.name()),
"application/x-www-form-urlencoded; charset=utf-8");
}
private RequestBody multipartBody(Map<String, Object> data) {
String boundary = "----NetdiskJsHttpClientBoundary" + UUID.randomUUID().toString().replace("-", "");
Buffer body = Buffer.buffer();
if (data != null) {
for (Map.Entry<String, Object> entry : data.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
if (key == null || value == null) {
continue;
}
appendAscii(body, "--" + boundary + "\r\n");
if (value instanceof byte[] bytes) {
appendAscii(body, "Content-Disposition: form-data; name=\"" + escapeMultipart(key)
+ "\"; filename=\"" + escapeMultipart(key) + "\"\r\n");
appendAscii(body, "Content-Type: application/octet-stream\r\n\r\n");
body.appendBytes(bytes);
appendAscii(body, "\r\n");
} else if (value instanceof Buffer buffer) {
appendAscii(body, "Content-Disposition: form-data; name=\"" + escapeMultipart(key)
+ "\"; filename=\"" + escapeMultipart(key) + "\"\r\n");
appendAscii(body, "Content-Type: application/octet-stream\r\n\r\n");
body.appendBuffer(buffer);
appendAscii(body, "\r\n");
} else {
appendAscii(body, "Content-Disposition: form-data; name=\"" + escapeMultipart(key) + "\"\r\n\r\n");
body.appendString(String.valueOf(value), StandardCharsets.UTF_8.name());
appendAscii(body, "\r\n");
}
ensureRequestBodyLimit(body);
}
}
appendAscii(body, "--" + boundary + "--\r\n");
return limitedBody(body, "multipart/form-data; boundary=" + boundary);
}
private static void appendAscii(Buffer body, String value) {
body.appendString(value, StandardCharsets.US_ASCII.name());
}
private static String escapeMultipart(String value) {
return value.replace("\\", "\\\\").replace("\"", "\\\"");
}
private static RequestBody limitedBody(Buffer body, String contentType) {
ensureRequestBodyLimit(body);
return new RequestBody(body, contentType);
}
private record RequestBody(Buffer body, String contentType) {
}
/**
@@ -573,10 +781,29 @@ public class JsHttpClient {
*/
public static class JsHttpResponse {
private final HttpResponse<Buffer> response;
private final int statusCode;
private final MultiMap headers;
private final Buffer body;
private final String statusMessage;
private final HttpResponse<Buffer> originalResponse;
public JsHttpResponse(HttpResponse<Buffer> response) {
this.response = response;
this(
response.statusCode(),
MultiMap.caseInsensitiveMultiMap().setAll(response.headers()),
response.body(),
response.statusMessage(),
response
);
}
public JsHttpResponse(int statusCode, MultiMap headers, Buffer body, String statusMessage,
HttpResponse<Buffer> originalResponse) {
this.statusCode = statusCode;
this.headers = headers == null ? MultiMap.caseInsensitiveMultiMap() : headers;
this.body = body == null ? Buffer.buffer() : body;
this.statusMessage = statusMessage;
this.originalResponse = originalResponse;
}
/**
@@ -584,7 +811,7 @@ public class JsHttpClient {
* @return 响应体字符串
*/
public String body() {
return HttpResponseHelper.asText(response);
return HttpResponseHelper.asText(body, header(HttpHeaders.CONTENT_ENCODING.toString()));
}
/**
@@ -593,7 +820,7 @@ public class JsHttpClient {
*/
public Object json() {
try {
JsonObject jsonObject = HttpResponseHelper.asJson(response);
JsonObject jsonObject = HttpResponseHelper.asJson(body, header(HttpHeaders.CONTENT_ENCODING.toString()));
if (jsonObject == null || jsonObject.isEmpty()) {
return null;
}
@@ -611,7 +838,7 @@ public class JsHttpClient {
* @return 状态码
*/
public int statusCode() {
return response.statusCode();
return statusCode;
}
/**
@@ -620,7 +847,7 @@ public class JsHttpClient {
* @return 头值
*/
public String header(String name) {
return response.getHeader(name);
return headers.get(name);
}
/**
@@ -628,10 +855,9 @@ public class JsHttpClient {
* @return 响应头Map
*/
public Map<String, String> headers() {
MultiMap responseHeaders = response.headers();
Map<String, String> result = new HashMap<>();
for (String name : responseHeaders.names()) {
result.put(name, responseHeaders.get(name));
for (String name : headers.names()) {
result.put(name, headers.get(name));
}
return result;
}
@@ -649,8 +875,14 @@ public class JsHttpClient {
* 获取原始响应对象
* @return HttpResponse对象
*/
@Deprecated
public HttpResponse<Buffer> getOriginalResponse() {
return response;
if (originalResponse == null) {
throw new UnsupportedOperationException(
"流式HTTP客户端不再保留原始Vert.x HttpResponse请使用statusCode/header/headers/body/bodyBytes方法"
);
}
return originalResponse;
}
/**
@@ -658,11 +890,8 @@ public class JsHttpClient {
* @return 响应体字节数组
*/
public byte[] bodyBytes() {
Buffer buffer = response.body();
if (buffer == null) {
return new byte[0];
}
return buffer.getBytes();
ensureResponseBodyLimit(body);
return body.getBytes();
}
/**
@@ -670,20 +899,46 @@ public class JsHttpClient {
* @return 响应体大小(字节)
*/
public long bodySize() {
Buffer buffer = response.body();
if (buffer == null) {
return 0;
}
return buffer.length();
return body.length();
}
public String statusMessage() {
return statusMessage;
}
}
/**
* 关闭 WebClient 释放连接池资源
* 关闭 HttpClient 释放连接池资源
* 仅关闭自建的 client代理模式共享实例不关闭
*/
public void close() {
if (client != null) {
if (!closed.compareAndSet(false, true)) {
return;
}
synchronized (requestLock) {
for (HttpClientRequest request : activeRequests) {
try {
request.reset();
} catch (Exception e) {
log.debug("重置 JavaScript HTTP 请求失败: {}", e.getMessage());
}
}
activeRequests.clear();
}
if (ownClient && client != null) {
client.close();
}
}
private static void ensureResponseBodyLimit(Buffer buffer) {
if (buffer != null && buffer.length() > MAX_RESPONSE_BODY_BYTES) {
throw new IllegalArgumentException("响应体过大: " + buffer.length() + " bytes");
}
}
private static void ensureRequestBodyLimit(Buffer buffer) {
if (buffer != null && buffer.length() > MAX_REQUEST_BODY_BYTES) {
throw new IllegalArgumentException("请求体过大: " + buffer.length() + " bytes");
}
}
}

View File

@@ -6,6 +6,7 @@ import cn.qaiu.entity.ShareLinkInfo;
import cn.qaiu.parser.IPanTool;
import cn.qaiu.parser.custom.CustomParserConfig;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.WorkerExecutor;
import io.vertx.core.json.JsonObject;
import org.openjdk.nashorn.api.scripting.NashornScriptEngineFactory;
@@ -20,6 +21,13 @@ import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
/**
@@ -35,16 +43,40 @@ public class JsParserExecutor implements IPanTool, AutoCloseable {
private static volatile WorkerExecutor EXECUTOR;
private static final Object EXECUTOR_LOCK = new Object();
private static volatile boolean executorShutdown = false;
private static String FETCH_RUNTIME_JS = null;
/** 安全网调度器:当 onComplete 未触发时,延迟强制释放资源 */
private static final ScheduledExecutorService CLEANUP_SCHEDULER =
Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "js-parser-cleanup-safety");
t.setDaemon(true);
return t;
});
private static final long EXECUTION_TIMEOUT_SECONDS = 30;
private static final int MAX_RESULT_STRING_LENGTH = 1024 * 1024;
private static final int MAX_FILE_LIST_SIZE = 1000;
private static final int MAX_FILE_FIELD_LENGTH = 4096;
private static final int MAX_CONCURRENT_EXECUTIONS =
Math.max(1, Integer.getInteger("parser.custom.js.maxConcurrentExecutions", 32));
private static final Semaphore EXECUTION_PERMITS = new Semaphore(MAX_CONCURRENT_EXECUTIONS);
private static volatile String FETCH_RUNTIME_JS = null;
private final CustomParserConfig config;
private final ShareLinkInfo shareLinkInfo;
private final ScriptEngine engine;
private volatile ScriptEngine engine;
private final Object engineLock = new Object();
private final JsHttpClient httpClient;
private final JsLogger jsLogger;
private final JsShareLinkInfoWrapper shareLinkInfoWrapper;
private final JsFetchBridge fetchBridge;
/** 标记是否已释放,防止重复关闭 */
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Object lifecycleLock = new Object();
private volatile boolean running = false;
private volatile boolean closeRequested = false;
/** 安全网定时任务句柄,正常完成时取消 */
private volatile ScheduledFuture<?> safetyCleanupFuture = null;
public JsParserExecutor(ShareLinkInfo shareLinkInfo, CustomParserConfig config) {
this.config = config;
@@ -60,7 +92,6 @@ public class JsParserExecutor implements IPanTool, AutoCloseable {
this.jsLogger = new JsLogger("JsParser-" + config.getType());
this.shareLinkInfoWrapper = new JsShareLinkInfoWrapper(shareLinkInfo);
this.fetchBridge = new JsFetchBridge(httpClient);
this.engine = initEngine();
}
/**
@@ -112,6 +143,7 @@ public class JsParserExecutor implements IPanTool, AutoCloseable {
if (engine == null) {
throw new RuntimeException("无法创建JavaScript引擎请确保Nashorn可用");
}
this.engine = engine;
// 注入Java对象到JavaScript环境
engine.put("http", httpClient);
@@ -146,45 +178,124 @@ public class JsParserExecutor implements IPanTool, AutoCloseable {
throw new RuntimeException("JavaScript引擎初始化失败: " + e.getMessage(), e);
}
}
private ScriptEngine engine() {
ScriptEngine current = engine;
if (current != null) {
return current;
}
synchronized (engineLock) {
if (closed.get()) {
throw new IllegalStateException("JavaScript解析器已关闭");
}
if (engine == null) {
engine = initEngine();
}
return engine;
}
}
private void beginExecution() {
synchronized (lifecycleLock) {
if (closed.get() || closeRequested) {
throw new IllegalStateException("JavaScript解析器已关闭");
}
if (running) {
throw new IllegalStateException("JavaScript解析器已在运行");
}
running = true;
}
}
private void finishExecution() {
synchronized (lifecycleLock) {
running = false;
if (closeRequested) {
doClose();
}
}
}
/**
* 释放资源ScriptEngine 和 HttpClient避免内存泄漏
* 幂等:可安全多次调用
*/
@Override
public void close() {
synchronized (lifecycleLock) {
closeRequested = true;
cancelSafetyCleanup();
if (running || closed.get()) {
closeExternalResources();
return;
}
doClose();
}
}
private void doClose() {
if (!closed.compareAndSet(false, true)) return;
closeRequested = false;
closeExternalResources();
cleanupEngine();
}
private void closeExternalResources() {
if (httpClient != null) {
httpClient.close();
}
// 清除 ScriptEngine 持有的 Java 对象引用,帮助 GC 回收
}
private void cleanupEngine() {
// 清除 ScriptEngine 持有的所有引用和内部状态,帮助 GC 回收
if (engine != null) {
engine.put("http", null);
engine.put("logger", null);
engine.put("shareLinkInfo", null);
engine.put("JavaFetch", null);
try {
engine.put("http", null);
engine.put("logger", null);
engine.put("shareLinkInfo", null);
engine.put("JavaFetch", null);
// 彻底清除 ENGINE_SCOPE bindings释放 JS AST、编译函数、闭包等运行时状态
var bindings = engine.getBindings(javax.script.ScriptContext.ENGINE_SCOPE);
if (bindings != null) {
bindings.clear();
}
} catch (Exception e) {
log.warn("清理 ScriptEngine bindings 失败: {}", e.getMessage());
}
}
}
private void cancelSafetyCleanup() {
// 取消安全网定时任务(如果正常完成则无需再触发)
if (safetyCleanupFuture != null) {
safetyCleanupFuture.cancel(false);
safetyCleanupFuture = null;
}
}
/**
* 关闭全局 WorkerExecutor应在应用关闭时调用
* 关闭全局 WorkerExecutor 和清理调度器(应在应用关闭时调用)
*/
public static void shutdownExecutor() {
synchronized (EXECUTOR_LOCK) {
executorShutdown = true;
if (EXECUTOR != null) {
EXECUTOR.close();
EXECUTOR = null;
log.info("JsParserExecutor WorkerExecutor 已关闭");
}
}
CLEANUP_SCHEDULER.shutdown();
}
/**
* 获取或创建 WorkerExecutor懒加载
*/
private static WorkerExecutor getExecutor() {
if (EXECUTOR != null) {
return EXECUTOR;
}
synchronized (EXECUTOR_LOCK) {
if (executorShutdown) {
throw new IllegalStateException("JavaScript解析器 WorkerExecutor 已关闭");
}
if (EXECUTOR == null) {
EXECUTOR = WebClientVertxInit.get().createSharedWorkerExecutor("parser-executor", 32);
}
@@ -192,98 +303,159 @@ public class JsParserExecutor implements IPanTool, AutoCloseable {
}
}
private <T> Future<T> executeBlockingWithPermit(String operation, Callable<T> blockingCode) {
if (!EXECUTION_PERMITS.tryAcquire()) {
String message = "JavaScript " + operation + " 执行并发已满,请稍后重试";
jsLogger.error(message);
close();
return Future.failedFuture(message);
}
try {
return getExecutor().executeBlocking(() -> {
boolean executionStarted = false;
try {
beginExecution();
executionStarted = true;
return blockingCode.call();
} finally {
if (executionStarted) {
finishExecution();
}
EXECUTION_PERMITS.release();
}
});
} catch (Throwable e) {
EXECUTION_PERMITS.release();
close();
return Future.failedFuture(e);
}
}
private <T> Future<T> withTimeout(Future<T> executionFuture, String operation) {
Promise<T> promise = Promise.promise();
try {
safetyCleanupFuture = CLEANUP_SCHEDULER.schedule(() -> {
if (promise.tryFail("JavaScript " + operation + " 执行超时(" + EXECUTION_TIMEOUT_SECONDS + "秒)")) {
jsLogger.error("{} 执行超时已停止外部HTTP资源ScriptEngine将在执行线程退出后清理", operation);
close();
}
}, EXECUTION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (Exception e) {
log.warn("安全网调度失败: {}", e.getMessage());
}
executionFuture.onComplete(ar -> {
cancelSafetyCleanup();
if (ar.succeeded()) {
promise.tryComplete(ar.result());
} else {
promise.tryFail(ar.cause());
}
close();
});
return promise.future();
}
@Override
public Future<String> parse() {
jsLogger.info("开始执行JavaScript解析器: {}", config.getType());
// 使用executeBlocking在工作线程上执行避免阻塞EventLoop线程
return getExecutor().executeBlocking(() -> {
Future<String> executionFuture = executeBlockingWithPermit("parse", () -> {
ScriptEngine engine = engine();
// 直接调用全局parse函数
Object parseFunction = engine.get("parse");
if (parseFunction == null) {
throw new RuntimeException("JavaScript代码中未找到parse函数");
}
if (parseFunction instanceof ScriptObjectMirror parseMirror) {
Object result = parseMirror.call(null, shareLinkInfoWrapper, httpClient, jsLogger);
if (result instanceof String) {
jsLogger.info("解析成功: {}", result);
return (String) result;
String resultText = limitResultString((String) result, "parse");
jsLogger.info("解析成功,结果长度: {}", resultText.length());
return resultText;
} else {
jsLogger.error("parse方法返回值类型错误期望String实际: {}",
jsLogger.error("parse方法返回值类型错误期望String实际: {}",
result != null ? result.getClass().getSimpleName() : "null");
throw new RuntimeException("parse方法返回值类型错误");
}
} else {
throw new RuntimeException("parse函数类型错误");
}
}).onComplete(ar -> close());
});
return withTimeout(executionFuture, "parse");
}
@Override
public Future<List<FileInfo>> parseFileList() {
jsLogger.info("开始执行JavaScript文件列表解析: {}", config.getType());
// 使用executeBlocking在工作线程上执行避免阻塞EventLoop线程
return getExecutor().executeBlocking(() -> {
Future<List<FileInfo>> executionFuture = executeBlockingWithPermit("parseFileList", () -> {
ScriptEngine engine = engine();
// 直接调用全局parseFileList函数
Object parseFileListFunction = engine.get("parseFileList");
if (parseFileListFunction == null) {
throw new RuntimeException("JavaScript代码中未找到parseFileList函数");
}
// 调用parseFileList方法
if (parseFileListFunction instanceof ScriptObjectMirror parseFileListMirror) {
Object result = parseFileListMirror.call(null, shareLinkInfoWrapper, httpClient, jsLogger);
if (result instanceof ScriptObjectMirror resultMirror) {
List<FileInfo> fileList = convertToFileInfoList(resultMirror);
jsLogger.info("文件列表解析成功,共 {} 个文件", fileList.size());
return fileList;
} else {
jsLogger.error("parseFileList方法返回值类型错误期望数组实际: {}",
jsLogger.error("parseFileList方法返回值类型错误期望数组实际: {}",
result != null ? result.getClass().getSimpleName() : "null");
throw new RuntimeException("parseFileList方法返回值类型错误");
}
} else {
throw new RuntimeException("parseFileList函数类型错误");
}
}).onComplete(ar -> close());
});
return withTimeout(executionFuture, "parseFileList");
}
@Override
public Future<String> parseById() {
jsLogger.info("开始执行JavaScript按ID解析: {}", config.getType());
// 使用executeBlocking在工作线程上执行避免阻塞EventLoop线程
return getExecutor().executeBlocking(() -> {
Future<String> executionFuture = executeBlockingWithPermit("parseById", () -> {
ScriptEngine engine = engine();
// 直接调用全局parseById函数
Object parseByIdFunction = engine.get("parseById");
if (parseByIdFunction == null) {
throw new RuntimeException("JavaScript代码中未找到parseById函数");
}
// 调用parseById方法
if (parseByIdFunction instanceof ScriptObjectMirror parseByIdMirror) {
Object result = parseByIdMirror.call(null, shareLinkInfoWrapper, httpClient, jsLogger);
if (result instanceof String) {
jsLogger.info("按ID解析成功: {}", result);
return (String) result;
String resultText = limitResultString((String) result, "parseById");
jsLogger.info("按ID解析成功结果长度: {}", resultText.length());
return resultText;
} else {
jsLogger.error("parseById方法返回值类型错误期望String实际: {}",
jsLogger.error("parseById方法返回值类型错误期望String实际: {}",
result != null ? result.getClass().getSimpleName() : "null");
throw new RuntimeException("parseById方法返回值类型错误");
}
} else {
throw new RuntimeException("parseById函数类型错误");
}
}).onComplete(ar -> close());
});
return withTimeout(executionFuture, "parseById");
}
/**
@@ -293,6 +465,9 @@ public class JsParserExecutor implements IPanTool, AutoCloseable {
List<FileInfo> fileList = new ArrayList<>();
if (resultMirror.isArray()) {
if (resultMirror.size() > MAX_FILE_LIST_SIZE) {
throw new RuntimeException("文件列表数量超过限制: " + resultMirror.size());
}
for (int i = 0; i < resultMirror.size(); i++) {
Object item = resultMirror.get(String.valueOf(i));
if (item instanceof ScriptObjectMirror) {
@@ -316,13 +491,13 @@ public class JsParserExecutor implements IPanTool, AutoCloseable {
// 设置基本字段
if (itemMirror.hasMember("fileName")) {
fileInfo.setFileName(itemMirror.getMember("fileName").toString());
fileInfo.setFileName(limitField(itemMirror.getMember("fileName")));
}
if (itemMirror.hasMember("fileId")) {
fileInfo.setFileId(itemMirror.getMember("fileId").toString());
fileInfo.setFileId(limitField(itemMirror.getMember("fileId")));
}
if (itemMirror.hasMember("fileType")) {
fileInfo.setFileType(itemMirror.getMember("fileType").toString());
fileInfo.setFileType(limitField(itemMirror.getMember("fileType")));
}
if (itemMirror.hasMember("size")) {
Object size = itemMirror.getMember("size");
@@ -331,16 +506,16 @@ public class JsParserExecutor implements IPanTool, AutoCloseable {
}
}
if (itemMirror.hasMember("sizeStr")) {
fileInfo.setSizeStr(itemMirror.getMember("sizeStr").toString());
fileInfo.setSizeStr(limitField(itemMirror.getMember("sizeStr")));
}
if (itemMirror.hasMember("createTime")) {
fileInfo.setCreateTime(itemMirror.getMember("createTime").toString());
fileInfo.setCreateTime(limitField(itemMirror.getMember("createTime")));
}
if (itemMirror.hasMember("updateTime")) {
fileInfo.setUpdateTime(itemMirror.getMember("updateTime").toString());
fileInfo.setUpdateTime(limitField(itemMirror.getMember("updateTime")));
}
if (itemMirror.hasMember("createBy")) {
fileInfo.setCreateBy(itemMirror.getMember("createBy").toString());
fileInfo.setCreateBy(limitField(itemMirror.getMember("createBy")));
}
if (itemMirror.hasMember("downloadCount")) {
Object downloadCount = itemMirror.getMember("downloadCount");
@@ -349,16 +524,16 @@ public class JsParserExecutor implements IPanTool, AutoCloseable {
}
}
if (itemMirror.hasMember("fileIcon")) {
fileInfo.setFileIcon(itemMirror.getMember("fileIcon").toString());
fileInfo.setFileIcon(limitField(itemMirror.getMember("fileIcon")));
}
if (itemMirror.hasMember("panType")) {
fileInfo.setPanType(itemMirror.getMember("panType").toString());
fileInfo.setPanType(limitField(itemMirror.getMember("panType")));
}
if (itemMirror.hasMember("parserUrl")) {
fileInfo.setParserUrl(itemMirror.getMember("parserUrl").toString());
fileInfo.setParserUrl(limitField(itemMirror.getMember("parserUrl")));
}
if (itemMirror.hasMember("previewUrl")) {
fileInfo.setPreviewUrl(itemMirror.getMember("previewUrl").toString());
fileInfo.setPreviewUrl(limitField(itemMirror.getMember("previewUrl")));
}
return fileInfo;
@@ -368,4 +543,22 @@ public class JsParserExecutor implements IPanTool, AutoCloseable {
return null;
}
}
private static String limitResultString(String value, String operation) {
if (value.length() > MAX_RESULT_STRING_LENGTH) {
throw new RuntimeException(operation + " 返回结果过大: " + value.length() + " 字符");
}
return value;
}
private static String limitField(Object value) {
if (value == null) {
return null;
}
String text = value.toString();
if (text.length() > MAX_FILE_FIELD_LENGTH) {
throw new RuntimeException("文件字段过长: " + text.length() + " 字符");
}
return text;
}
}

View File

@@ -21,20 +21,37 @@ import java.util.concurrent.*;
*
* @author <a href="https://qaiu.top">QAIU</a>
*/
public class JsPlaygroundExecutor {
public class JsPlaygroundExecutor implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(JsPlaygroundExecutor.class);
// JavaScript执行超时时间
private static final long EXECUTION_TIMEOUT_SECONDS = 30;
private static final int MAX_RESULT_STRING_LENGTH = 1024 * 1024;
private static final int MAX_FILE_LIST_SIZE = 1000;
private static final int MAX_FILE_FIELD_LENGTH = 4096;
private static final int TIMEOUT_LOG_RETAIN = 50;
// 使用独立的线程池,不受Vert.x的BlockedThreadChecker监控
private static final ExecutorService INDEPENDENT_EXECUTOR = Executors.newCachedThreadPool(r -> {
Thread thread = new Thread(r);
thread.setName("playground-independent-" + System.currentTimeMillis());
thread.setDaemon(true); // 设置为守护线程,服务关闭时自动清理
return thread;
});
// 使用有界线程池,防止线程无限增长导致内存溢出
private static final int POOL_MAX_THREADS = 16;
private static final int POOL_QUEUE_CAPACITY = 256;
private static final ExecutorService INDEPENDENT_EXECUTOR = new ThreadPoolExecutor(
4, POOL_MAX_THREADS, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(POOL_QUEUE_CAPACITY),
r -> {
Thread thread = new Thread(r);
thread.setName("playground-independent-" + thread.getId());
thread.setDaemon(true);
return thread;
},
(r, executor) -> {
// 拒绝策略:记录日志并抛出异常,避免阻塞 Vert.x EventLoop
log.warn("演练场线程池已满,拒绝任务。活跃线程: {}, 队列大小: {}",
((ThreadPoolExecutor) executor).getActiveCount(),
((ThreadPoolExecutor) executor).getQueue().size());
throw new java.util.concurrent.RejectedExecutionException("演练场线程池已满,请稍后重试");
}
);
// 超时调度线程池,用于处理超时中断
private static final ScheduledExecutorService TIMEOUT_SCHEDULER = Executors.newScheduledThreadPool(2, r -> {
@@ -43,14 +60,29 @@ public class JsPlaygroundExecutor {
thread.setDaemon(true);
return thread;
});
/**
* 关闭静态线程池(应在应用关闭时调用)
*/
public static void shutdownPools() {
INDEPENDENT_EXECUTOR.shutdown();
TIMEOUT_SCHEDULER.shutdown();
log.info("JsPlaygroundExecutor 线程池已关闭");
}
private final ShareLinkInfo shareLinkInfo;
private final String jsCode;
private final ScriptEngine engine;
private volatile ScriptEngine engine;
private final Object engineLock = new Object();
private final JsHttpClient httpClient;
private final JsPlaygroundLogger playgroundLogger;
private final JsShareLinkInfoWrapper shareLinkInfoWrapper;
private final JsFetchBridge fetchBridge;
/** 标记是否已释放,防止重复关闭 */
private volatile boolean closed = false;
private final Object lifecycleLock = new Object();
private volatile boolean running = false;
private volatile boolean closeRequested = false;
/**
* 创建演练场执行器
@@ -72,7 +104,6 @@ public class JsPlaygroundExecutor {
this.playgroundLogger = new JsPlaygroundLogger();
this.shareLinkInfoWrapper = new JsShareLinkInfoWrapper(shareLinkInfo);
this.fetchBridge = new JsFetchBridge(httpClient);
this.engine = initEngine();
}
/**
@@ -89,6 +120,7 @@ public class JsPlaygroundExecutor {
if (engine == null) {
throw new RuntimeException("无法创建JavaScript引擎请确保Nashorn可用");
}
this.engine = engine;
// 注入Java对象到JavaScript环境
engine.put("http", httpClient);
@@ -133,9 +165,14 @@ public class JsPlaygroundExecutor {
*/
public Future<String> executeParseAsync() {
Promise<String> promise = Promise.promise();
// 使用独立的ExecutorService执行避免Vert.x的BlockedThreadChecker输出警告
CompletableFuture<String> executionFuture = CompletableFuture.supplyAsync(() -> {
final CompletableFuture<String> executionFuture;
try {
// 使用独立的ExecutorService执行避免Vert.x的BlockedThreadChecker输出警告
executionFuture = CompletableFuture.supplyAsync(() -> {
beginExecution();
try {
ScriptEngine engine = engine();
playgroundLogger.infoJava("开始执行parse方法");
try {
Object parseFunction = engine.get("parse");
@@ -151,8 +188,9 @@ public class JsPlaygroundExecutor {
log.debug("[JsPlaygroundExecutor] parse函数执行完成当前日志数量: {}", playgroundLogger.size());
if (result instanceof String) {
playgroundLogger.infoJava("解析成功,返回结果: " + result);
return (String) result;
String resultText = limitResultString((String) result, "parse");
playgroundLogger.infoJava("解析成功,返回结果长度: " + resultText.length());
return resultText;
} else {
String errorMsg = "parse方法返回值类型错误期望String实际: " +
(result != null ? result.getClass().getSimpleName() : "null");
@@ -167,25 +205,27 @@ public class JsPlaygroundExecutor {
playgroundLogger.errorJava("执行parse方法失败: " + e.getMessage(), e);
throw new RuntimeException(e);
}
}, INDEPENDENT_EXECUTOR);
// 创建超时任务,强制取消执行
ScheduledFuture<?> timeoutTask = TIMEOUT_SCHEDULER.schedule(() -> {
if (!executionFuture.isDone()) {
executionFuture.cancel(true); // 强制中断执行线程
playgroundLogger.errorJava("执行超时,已强制中断");
log.warn("JavaScript执行超时已强制取消");
} finally {
finishExecution();
}
}, EXECUTION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
}, INDEPENDENT_EXECUTOR);
} catch (java.util.concurrent.RejectedExecutionException e) {
log.warn("演练场线程池已满,任务被拒绝");
close(); // 释放已创建的 ScriptEngine 和 HttpClient 资源
promise.fail(new RuntimeException("演练场线程池已满,请稍后重试", e));
return promise.future();
}
ScheduledFuture<?> timeoutTask = scheduleTimeout(executionFuture, "parse");
// 处理执行结果
executionFuture.whenComplete((result, error) -> {
// 取消超时任务
timeoutTask.cancel(false);
if (error != null) {
if (error instanceof CancellationException) {
String timeoutMsg = "JavaScript执行超时超过" + EXECUTION_TIMEOUT_SECONDS + "秒),已强制中断";
String timeoutMsg = "JavaScript执行超时超过" + EXECUTION_TIMEOUT_SECONDS + "秒),已返回超时并停止外部HTTP资源ScriptEngine将在执行线程退出后清理";
playgroundLogger.errorJava(timeoutMsg);
log.error(timeoutMsg);
promise.fail(new RuntimeException(timeoutMsg));
@@ -197,10 +237,10 @@ public class JsPlaygroundExecutor {
promise.complete(result);
}
});
return promise.future();
}
/**
* 执行parseFileList方法异步带超时控制
* 使用独立线程池不受Vert.x BlockedThreadChecker监控
@@ -209,9 +249,14 @@ public class JsPlaygroundExecutor {
*/
public Future<List<FileInfo>> executeParseFileListAsync() {
Promise<List<FileInfo>> promise = Promise.promise();
// 使用独立的ExecutorService执行避免Vert.x的BlockedThreadChecker输出警告
CompletableFuture<List<FileInfo>> executionFuture = CompletableFuture.supplyAsync(() -> {
final CompletableFuture<List<FileInfo>> executionFuture;
try {
// 使用独立的ExecutorService执行避免Vert.x的BlockedThreadChecker输出警告
executionFuture = CompletableFuture.supplyAsync(() -> {
beginExecution();
try {
ScriptEngine engine = engine();
playgroundLogger.infoJava("开始执行parseFileList方法");
try {
Object parseFileListFunction = engine.get("parseFileList");
@@ -242,25 +287,27 @@ public class JsPlaygroundExecutor {
playgroundLogger.errorJava("执行parseFileList方法失败: " + e.getMessage(), e);
throw new RuntimeException(e);
}
}, INDEPENDENT_EXECUTOR);
// 创建超时任务,强制取消执行
ScheduledFuture<?> timeoutTask = TIMEOUT_SCHEDULER.schedule(() -> {
if (!executionFuture.isDone()) {
executionFuture.cancel(true); // 强制中断执行线程
playgroundLogger.errorJava("执行超时,已强制中断");
log.warn("JavaScript执行超时已强制取消");
} finally {
finishExecution();
}
}, EXECUTION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
}, INDEPENDENT_EXECUTOR);
} catch (java.util.concurrent.RejectedExecutionException e) {
log.warn("演练场线程池已满,任务被拒绝");
close(); // 释放已创建的 ScriptEngine 和 HttpClient 资源
promise.fail(new RuntimeException("演练场线程池已满,请稍后重试", e));
return promise.future();
}
ScheduledFuture<?> timeoutTask = scheduleTimeout(executionFuture, "parseFileList");
// 处理执行结果
executionFuture.whenComplete((result, error) -> {
// 取消超时任务
timeoutTask.cancel(false);
if (error != null) {
if (error instanceof CancellationException) {
String timeoutMsg = "JavaScript执行超时超过" + EXECUTION_TIMEOUT_SECONDS + "秒),已强制中断";
String timeoutMsg = "JavaScript执行超时超过" + EXECUTION_TIMEOUT_SECONDS + "秒),已返回超时并停止外部HTTP资源ScriptEngine将在执行线程退出后清理";
playgroundLogger.errorJava(timeoutMsg);
log.error(timeoutMsg);
promise.fail(new RuntimeException(timeoutMsg));
@@ -272,10 +319,10 @@ public class JsPlaygroundExecutor {
promise.complete(result);
}
});
return promise.future();
}
/**
* 执行parseById方法异步带超时控制
* 使用独立线程池不受Vert.x BlockedThreadChecker监控
@@ -284,9 +331,14 @@ public class JsPlaygroundExecutor {
*/
public Future<String> executeParseByIdAsync() {
Promise<String> promise = Promise.promise();
// 使用独立的ExecutorService执行避免Vert.x的BlockedThreadChecker输出警告
CompletableFuture<String> executionFuture = CompletableFuture.supplyAsync(() -> {
final CompletableFuture<String> executionFuture;
try {
// 使用独立的ExecutorService执行避免Vert.x的BlockedThreadChecker输出警告
executionFuture = CompletableFuture.supplyAsync(() -> {
beginExecution();
try {
ScriptEngine engine = engine();
playgroundLogger.infoJava("开始执行parseById方法");
try {
Object parseByIdFunction = engine.get("parseById");
@@ -300,8 +352,9 @@ public class JsPlaygroundExecutor {
Object result = parseByIdMirror.call(null, shareLinkInfoWrapper, httpClient, playgroundLogger);
if (result instanceof String) {
playgroundLogger.infoJava("按ID解析成功: " + result);
return (String) result;
String resultText = limitResultString((String) result, "parseById");
playgroundLogger.infoJava("按ID解析成功返回结果长度: " + resultText.length());
return resultText;
} else {
String errorMsg = "parseById方法返回值类型错误期望String实际: " +
(result != null ? result.getClass().getSimpleName() : "null");
@@ -316,25 +369,27 @@ public class JsPlaygroundExecutor {
playgroundLogger.errorJava("执行parseById方法失败: " + e.getMessage(), e);
throw new RuntimeException(e);
}
}, INDEPENDENT_EXECUTOR);
// 创建超时任务,强制取消执行
ScheduledFuture<?> timeoutTask = TIMEOUT_SCHEDULER.schedule(() -> {
if (!executionFuture.isDone()) {
executionFuture.cancel(true); // 强制中断执行线程
playgroundLogger.errorJava("执行超时,已强制中断");
log.warn("JavaScript执行超时已强制取消");
} finally {
finishExecution();
}
}, EXECUTION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
}, INDEPENDENT_EXECUTOR);
} catch (java.util.concurrent.RejectedExecutionException e) {
log.warn("演练场线程池已满,任务被拒绝");
close(); // 释放已创建的 ScriptEngine 和 HttpClient 资源
promise.fail(new RuntimeException("演练场线程池已满,请稍后重试", e));
return promise.future();
}
ScheduledFuture<?> timeoutTask = scheduleTimeout(executionFuture, "parseById");
// 处理执行结果
executionFuture.whenComplete((result, error) -> {
// 取消超时任务
timeoutTask.cancel(false);
if (error != null) {
if (error instanceof CancellationException) {
String timeoutMsg = "JavaScript执行超时超过" + EXECUTION_TIMEOUT_SECONDS + "秒),已强制中断";
String timeoutMsg = "JavaScript执行超时超过" + EXECUTION_TIMEOUT_SECONDS + "秒),已返回超时并停止外部HTTP资源ScriptEngine将在执行线程退出后清理";
playgroundLogger.errorJava(timeoutMsg);
log.error(timeoutMsg);
promise.fail(new RuntimeException(timeoutMsg));
@@ -346,7 +401,7 @@ public class JsPlaygroundExecutor {
promise.complete(result);
}
});
return promise.future();
}
@@ -373,6 +428,9 @@ public class JsPlaygroundExecutor {
List<FileInfo> fileList = new ArrayList<>();
if (resultMirror.isArray()) {
if (resultMirror.size() > MAX_FILE_LIST_SIZE) {
throw new RuntimeException("文件列表数量超过限制: " + resultMirror.size());
}
for (int i = 0; i < resultMirror.size(); i++) {
Object item = resultMirror.get(String.valueOf(i));
if (item instanceof ScriptObjectMirror) {
@@ -396,13 +454,13 @@ public class JsPlaygroundExecutor {
// 设置基本字段
if (itemMirror.hasMember("fileName")) {
fileInfo.setFileName(itemMirror.getMember("fileName").toString());
fileInfo.setFileName(limitField(itemMirror.getMember("fileName")));
}
if (itemMirror.hasMember("fileId")) {
fileInfo.setFileId(itemMirror.getMember("fileId").toString());
fileInfo.setFileId(limitField(itemMirror.getMember("fileId")));
}
if (itemMirror.hasMember("fileType")) {
fileInfo.setFileType(itemMirror.getMember("fileType").toString());
fileInfo.setFileType(limitField(itemMirror.getMember("fileType")));
}
if (itemMirror.hasMember("size")) {
Object size = itemMirror.getMember("size");
@@ -411,16 +469,16 @@ public class JsPlaygroundExecutor {
}
}
if (itemMirror.hasMember("sizeStr")) {
fileInfo.setSizeStr(itemMirror.getMember("sizeStr").toString());
fileInfo.setSizeStr(limitField(itemMirror.getMember("sizeStr")));
}
if (itemMirror.hasMember("createTime")) {
fileInfo.setCreateTime(itemMirror.getMember("createTime").toString());
fileInfo.setCreateTime(limitField(itemMirror.getMember("createTime")));
}
if (itemMirror.hasMember("updateTime")) {
fileInfo.setUpdateTime(itemMirror.getMember("updateTime").toString());
fileInfo.setUpdateTime(limitField(itemMirror.getMember("updateTime")));
}
if (itemMirror.hasMember("createBy")) {
fileInfo.setCreateBy(itemMirror.getMember("createBy").toString());
fileInfo.setCreateBy(limitField(itemMirror.getMember("createBy")));
}
if (itemMirror.hasMember("downloadCount")) {
Object downloadCount = itemMirror.getMember("downloadCount");
@@ -429,24 +487,154 @@ public class JsPlaygroundExecutor {
}
}
if (itemMirror.hasMember("fileIcon")) {
fileInfo.setFileIcon(itemMirror.getMember("fileIcon").toString());
fileInfo.setFileIcon(limitField(itemMirror.getMember("fileIcon")));
}
if (itemMirror.hasMember("panType")) {
fileInfo.setPanType(itemMirror.getMember("panType").toString());
fileInfo.setPanType(limitField(itemMirror.getMember("panType")));
}
if (itemMirror.hasMember("parserUrl")) {
fileInfo.setParserUrl(itemMirror.getMember("parserUrl").toString());
fileInfo.setParserUrl(limitField(itemMirror.getMember("parserUrl")));
}
if (itemMirror.hasMember("previewUrl")) {
fileInfo.setPreviewUrl(itemMirror.getMember("previewUrl").toString());
fileInfo.setPreviewUrl(limitField(itemMirror.getMember("previewUrl")));
}
return fileInfo;
} catch (Exception e) {
playgroundLogger.errorJava("转换FileInfo对象失败", e);
return null;
}
}
private static String limitResultString(String value, String operation) {
if (value.length() > MAX_RESULT_STRING_LENGTH) {
throw new RuntimeException(operation + " 返回结果过大: " + value.length() + " 字符");
}
return value;
}
private static String limitField(Object value) {
if (value == null) {
return null;
}
String text = value.toString();
if (text.length() > MAX_FILE_FIELD_LENGTH) {
throw new RuntimeException("文件字段过长: " + text.length() + " 字符");
}
return text;
}
private void beginExecution() {
synchronized (lifecycleLock) {
if (closed) {
throw new CancellationException("演练场执行器已关闭");
}
if (running) {
throw new IllegalStateException("演练场执行器已在运行");
}
running = true;
}
}
private ScriptEngine engine() {
ScriptEngine current = engine;
if (current != null) {
return current;
}
synchronized (engineLock) {
if (closed) {
throw new CancellationException("演练场执行器已关闭");
}
if (engine == null) {
engine = initEngine();
}
return engine;
}
}
private void finishExecution() {
synchronized (lifecycleLock) {
running = false;
if (closeRequested) {
doClose();
}
}
}
private ScheduledFuture<?> scheduleTimeout(CompletableFuture<?> executionFuture, String operation) {
// cancel(true) 只能请求中断Nashorn 死循环不保证立即停止。
return TIMEOUT_SCHEDULER.schedule(() -> {
if (!executionFuture.isDone()) {
executionFuture.cancel(true);
playgroundLogger.errorJava(operation + " 执行超时已请求取消并停止外部HTTP资源");
forceCloseAfterTimeout();
log.warn("JavaScript {} 执行超时已请求取消Nashorn长循环可能继续占用线程ScriptEngine将在执行线程退出后清理", operation);
}
}, EXECUTION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
}
private void forceCloseAfterTimeout() {
synchronized (lifecycleLock) {
closeRequested = true;
if (running || closed) {
closeExternalResources();
} else {
doClose();
}
}
playgroundLogger.trimToLast(TIMEOUT_LOG_RETAIN);
}
/**
* 释放资源HttpClient 和 ScriptEngine避免内存泄漏
* 幂等:可安全多次调用
*/
@Override
public void close() {
synchronized (lifecycleLock) {
closeRequested = true;
if (running || closed) {
closeExternalResources();
return;
}
doClose();
}
}
private void doClose() {
if (closed) return;
closed = true;
closeRequested = false;
closeExternalResources();
cleanupEngine();
log.debug("JsPlaygroundExecutor 资源已释放");
}
private void closeExternalResources() {
if (httpClient != null) {
httpClient.close();
}
}
private void cleanupEngine() {
// 清除 ScriptEngine 的所有 bindings释放 JS 运行时引用
if (engine != null) {
try {
// 清除注入的 Java 对象引用
engine.put("http", null);
engine.put("logger", null);
engine.put("shareLinkInfo", null);
engine.put("JavaFetch", null);
// 清除所有 ENGINE_SCOPE bindings包括 eval 加载的 JS 函数
var bindings = engine.getBindings(javax.script.ScriptContext.ENGINE_SCOPE);
if (bindings != null) {
bindings.clear();
}
} catch (Exception e) {
log.warn("清理 ScriptEngine bindings 失败: {}", e.getMessage());
}
}
}
}

View File

@@ -20,6 +20,7 @@ public class JsPlaygroundLogger {
// 使用线程安全的列表
private static final int MAX_LOG_SIZE = 1000;
private static final int MAX_LOG_MESSAGE_LENGTH = 4096;
private final List<LogEntry> logs = Collections.synchronizedList(new ArrayList<>());
/**
@@ -62,7 +63,11 @@ public class JsPlaygroundLogger {
if (obj == null) {
return "null";
}
return obj.toString();
String msg = obj.toString();
if (msg.length() <= MAX_LOG_MESSAGE_LENGTH) {
return msg;
}
return msg.substring(0, MAX_LOG_MESSAGE_LENGTH) + "...[truncated]";
}
/**
@@ -127,7 +132,7 @@ public class JsPlaygroundLogger {
public void error(Object message, Throwable throwable) {
String msg = toString(message);
if (throwable != null) {
msg = msg + ": " + throwable.getMessage();
msg = toString(msg + ": " + throwable.getMessage());
}
addLog(new LogEntry("ERROR", msg, "JS"));
log.debug("[JSPlaygroundLogger] ERROR: {}", msg);
@@ -167,9 +172,9 @@ public class JsPlaygroundLogger {
* 错误日志带异常供Java层调用
*/
public void errorJava(String message, Throwable throwable) {
String msg = message;
String msg = toString(message);
if (throwable != null) {
msg = msg + ": " + throwable.getMessage();
msg = toString(msg + ": " + throwable.getMessage());
}
addLog(new LogEntry("ERROR", msg, "JAVA"));
log.debug("[JAVAPlaygroundLogger] ERROR: {}", msg);
@@ -197,4 +202,17 @@ public class JsPlaygroundLogger {
public void clear() {
logs.clear();
}
public void trimToLast(int maxEntries) {
if (maxEntries < 0) {
throw new IllegalArgumentException("maxEntries不能小于0");
}
synchronized (logs) {
int removeCount = logs.size() - maxEntries;
if (removeCount <= 0) {
return;
}
logs.subList(0, removeCount).clear();
}
}
}

View File

@@ -31,6 +31,8 @@ public class JsScriptLoader {
private static final String RESOURCE_PATH = "custom-parsers";
private static final String EXTERNAL_PATH = "./custom-parsers";
private static final long MAX_SCRIPT_SIZE_BYTES = 128 * 1024;
private static final int MAX_EXTERNAL_SCRIPT_COUNT = 100;
// 系统属性配置的外部目录路径
private static final String EXTERNAL_PATH_PROPERTY = "parser.custom-parsers.path";
@@ -81,14 +83,16 @@ public class JsScriptLoader {
try {
InputStream inputStream = JsScriptLoader.class.getClassLoader()
.getResourceAsStream(resourceFile);
if (inputStream != null) {
String jsCode = new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
CustomParserConfig config = JsScriptMetadataParser.parseScript(jsCode);
configs.add(config);
String fileName = resourceFile.substring(resourceFile.lastIndexOf('/') + 1);
log.debug("从资源目录加载脚本: {}", fileName);
try (inputStream) {
String jsCode = readResourceScript(inputStream, resourceFile);
CustomParserConfig config = JsScriptMetadataParser.parseScript(jsCode);
configs.add(config);
String fileName = resourceFile.substring(resourceFile.lastIndexOf('/') + 1);
log.debug("从资源目录加载脚本: {}", fileName);
}
}
} catch (Exception e) {
log.warn("加载资源脚本失败: {}", resourceFile, e);
@@ -207,8 +211,10 @@ public class JsScriptLoader {
paths.filter(Files::isRegularFile)
.filter(path -> path.toString().endsWith(".js"))
.filter(path -> !isExcludedFile(path.getFileName().toString()))
.limit(MAX_EXTERNAL_SCRIPT_COUNT)
.forEach(path -> {
try {
ensureScriptSize(path);
String jsCode = Files.readString(path, StandardCharsets.UTF_8);
CustomParserConfig config = JsScriptMetadataParser.parseScript(jsCode);
configs.add(config);
@@ -262,6 +268,7 @@ public class JsScriptLoader {
throw new IllegalArgumentException("文件不存在: " + filePath);
}
ensureScriptSize(path);
String jsCode = Files.readString(path, StandardCharsets.UTF_8);
return JsScriptMetadataParser.parseScript(jsCode);
@@ -279,14 +286,16 @@ public class JsScriptLoader {
try {
InputStream inputStream = JsScriptLoader.class.getClassLoader()
.getResourceAsStream(resourcePath);
if (inputStream == null) {
throw new IllegalArgumentException("资源文件不存在: " + resourcePath);
}
String jsCode = new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
return JsScriptMetadataParser.parseScript(jsCode);
try (inputStream) {
String jsCode = readResourceScript(inputStream, resourcePath);
return JsScriptMetadataParser.parseScript(jsCode);
}
} catch (IOException e) {
throw new RuntimeException("读取资源文件失败: " + resourcePath, e);
}
@@ -346,4 +355,19 @@ public class JsScriptLoader {
fileName.contains(".test.") ||
fileName.contains(".spec.");
}
private static void ensureScriptSize(Path path) throws IOException {
long size = Files.size(path);
if (size > MAX_SCRIPT_SIZE_BYTES) {
throw new IllegalArgumentException("JavaScript脚本超过128KB限制: " + path.getFileName());
}
}
private static String readResourceScript(InputStream inputStream, String name) throws IOException {
byte[] bytes = inputStream.readNBytes((int) MAX_SCRIPT_SIZE_BYTES + 1);
if (bytes.length > MAX_SCRIPT_SIZE_BYTES) {
throw new IllegalArgumentException("JavaScript资源脚本超过128KB限制: " + name);
}
return new String(bytes, StandardCharsets.UTF_8);
}
}

View File

@@ -17,16 +17,35 @@ import java.util.zip.InflaterInputStream;
public class HttpResponseHelper {
static Logger LOGGER = LoggerFactory.getLogger(HttpResponseHelper.class);
private static final int MAX_RESPONSE_BODY_BYTES = 8 * 1024 * 1024;
private static final int MAX_DECOMPRESSED_CHARS = 16 * 1024 * 1024;
// -------------------- 公共方法 --------------------
public static String asText(HttpResponse<?> res) {
String encoding = res.getHeader(HttpHeaders.CONTENT_ENCODING.toString());
try {
Buffer body = toBuffer(res);
return asText(body, encoding);
} catch (IllegalArgumentException | UnsupportedOperationException e) {
throw e;
} catch (Exception e) {
LOGGER.error("asText: {}", e.getMessage(), e);
return null;
}
}
public static String asText(Buffer body, String encoding) {
try {
if (body == null) {
return "";
}
ensureBodyLimit(body);
if (encoding == null || "identity".equalsIgnoreCase(encoding)) {
return body.toString(StandardCharsets.UTF_8);
}
return decompress(body, encoding);
} catch (IllegalArgumentException | UnsupportedOperationException e) {
throw e;
} catch (Exception e) {
LOGGER.error("asText: {}", e.getMessage(), e);
return null;
@@ -36,6 +55,29 @@ public class HttpResponseHelper {
public static JsonObject asJson(HttpResponse<?> res) {
try {
String text = asText(res);
return parseJsonText(text);
} catch (IllegalArgumentException | UnsupportedOperationException e) {
throw e;
} catch (Exception e) {
LOGGER.error("asJson: {}", e.getMessage(), e);
return JsonObject.of();
}
}
public static JsonObject asJson(Buffer body, String encoding) {
try {
String text = asText(body, encoding);
return parseJsonText(text);
} catch (IllegalArgumentException | UnsupportedOperationException e) {
throw e;
} catch (Exception e) {
LOGGER.error("asJson: {}", e.getMessage(), e);
return JsonObject.of();
}
}
private static JsonObject parseJsonText(String text) {
try {
if (text != null) {
return new JsonObject(text);
} else {
@@ -53,13 +95,26 @@ public class HttpResponseHelper {
return res.body() instanceof Buffer ? (Buffer) res.body() : Buffer.buffer(res.bodyAsString());
}
private static void ensureBodyLimit(Buffer body) {
if (body != null && body.length() > MAX_RESPONSE_BODY_BYTES) {
throw new IllegalArgumentException("响应体过大: " + body.length() + " bytes");
}
}
private static void writeLimited(StringWriter writer, char[] buffer, int len) throws IOException {
if (writer.getBuffer().length() + len > MAX_DECOMPRESSED_CHARS) {
throw new IOException("解压后响应体过大");
}
writer.write(buffer, 0, len);
}
// -------------------- 通用解压分发 --------------------
private static String decompress(Buffer compressed, String encoding) throws IOException {
return switch (encoding.toLowerCase()) {
case "gzip" -> decompressGzip(compressed);
case "deflate" -> decompressDeflate(compressed);
case "br" -> decompressBrotli(compressed);
case "zstd" -> compressed.toString(StandardCharsets.UTF_8); // 暂时返回原始内容
case "zstd" -> throw new UnsupportedOperationException("不支持的 Content-Encoding: zstd");
default -> throw new UnsupportedOperationException("不支持的 Content-Encoding: " + encoding);
};
}
@@ -74,7 +129,7 @@ public class HttpResponseHelper {
char[] buffer = new char[4096];
int n;
while ((n = isr.read(buffer)) != -1) {
writer.write(buffer, 0, n);
writeLimited(writer, buffer, n);
}
return writer.toString();
}
@@ -99,7 +154,7 @@ public class HttpResponseHelper {
char[] buffer = new char[4096];
int n;
while ((n = isr.read(buffer)) != -1) {
writer.write(buffer, 0, n);
writeLimited(writer, buffer, n);
}
return writer.toString();
}
@@ -115,7 +170,7 @@ public class HttpResponseHelper {
char[] buffer = new char[4096];
int n;
while ((n = isr.read(buffer)) != -1) {
writer.write(buffer, 0, n);
writeLimited(writer, buffer, n);
}
return writer.toString();
}

View File

@@ -1,7 +1,7 @@
package cn.qaiu.util;
import cn.qaiu.WebClientVertxInit;
import io.vertx.core.MultiMap;
import io.vertx.core.Vertx;
import io.vertx.core.http.impl.headers.HeadersMultiMap;
import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.client.WebClientSession;
@@ -19,6 +19,10 @@ import java.util.List;
public class IpExtractor {
private static final Logger log = LoggerFactory.getLogger(IpExtractor.class);
// 使用共享的 Vertx 实例,避免每次调用创建新实例导致资源泄漏
private static final WebClient SHARED_CLIENT = WebClient.create(WebClientVertxInit.get());
private static final WebClientSession SHARED_SESSION = WebClientSession.create(SHARED_CLIENT);
public static void main(String[] args) throws InterruptedException {
@@ -43,11 +47,9 @@ public class IpExtractor {
headers.add("user-agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36");
headers.add("Content-Type", "application/x-www-form-urlencoded");
WebClient client = WebClient.create(Vertx.vertx());
WebClientSession webClientSession = WebClientSession.create(client);
webClientSession.getAbs("https://ip.ihuan.me").putHeaders(headers).send().onSuccess(res->{
SHARED_SESSION.getAbs("https://ip.ihuan.me").putHeaders(headers).send().onSuccess(res->{
log.debug("response: {}", res.toString());
webClientSession.getAbs("https://ip.ihuan.me").putHeaders(headers).send().onSuccess(res2->{
SHARED_SESSION.getAbs("https://ip.ihuan.me").putHeaders(headers).send().onSuccess(res2->{
log.debug("response2: {}", res2.toString());
});

View File

@@ -46,34 +46,62 @@ public class JsExecUtils {
/**
* 调用执行蓝奏云js文件每次动态JS代码无法复用引擎
* 注意:使用后清理引擎引用,帮助 GC 回收 Nashorn 引擎内部资源
*/
public static ScriptObjectMirror executeDynamicJs(String jsText, String funName) throws ScriptException,
NoSuchMethodException {
ScriptEngine engine = ENGINE_MANAGER.getEngineByName("JavaScript"); // 得到脚本引擎
engine.eval(JsContent.lz + "\n" + jsText);
Invocable inv = (Invocable) engine;
//调用js中的函数
if (StringUtils.isNotEmpty(funName)) {
inv.invokeFunction(funName);
try {
engine.eval(JsContent.lz + "\n" + jsText);
Invocable inv = (Invocable) engine;
//调用js中的函数
if (StringUtils.isNotEmpty(funName)) {
inv.invokeFunction(funName);
}
return (ScriptObjectMirror) engine.get("signObj");
} finally {
// 清理引擎持有的引用,帮助 GC 回收
clearEngineBindings(engine);
}
return (ScriptObjectMirror) engine.get("signObj");
}
/**
* 调用执行js文件使用缓存的 ScriptEngineManager 创建新引擎实例)
* 注意:使用后清理引擎引用,帮助 GC 回收 Nashorn 引擎内部资源
*/
public static Object executeOtherJs(String jsText, String funName, Object ... args) throws ScriptException,
NoSuchMethodException {
ScriptEngine engine = ENGINE_MANAGER.getEngineByName("JavaScript"); // 得到脚本引擎
engine.eval(jsText);
Invocable inv = (Invocable) engine;
//调用js中的函数
if (StringUtils.isNotEmpty(funName)) {
return inv.invokeFunction(funName, args);
try {
engine.eval(jsText);
Invocable inv = (Invocable) engine;
//调用js中的函数
if (StringUtils.isNotEmpty(funName)) {
return inv.invokeFunction(funName, args);
}
throw new ScriptException("funName is null");
} finally {
// 清理引擎持有的引用,帮助 GC 回收
clearEngineBindings(engine);
}
}
/**
* 清理 ScriptEngine 的 bindings帮助 GC 回收 Nashorn 引擎资源
*/
private static void clearEngineBindings(ScriptEngine engine) {
try {
if (engine != null) {
// 清理全局 bindings
var bindings = engine.getBindings(javax.script.ScriptContext.ENGINE_SCOPE);
if (bindings != null) {
bindings.clear();
}
}
} catch (Exception ignored) {
// 清理失败不影响主流程
}
throw new ScriptException("funName is null");
}
public static String getKwSign(String s, String pwd) {

View File

@@ -1,8 +1,8 @@
package cn.qaiu.util;
import cn.qaiu.WebClientVertxInit;
import io.vertx.core.AsyncResult;
import io.vertx.core.MultiMap;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.impl.headers.HeadersMultiMap;
import io.vertx.ext.web.client.HttpResponse;
@@ -47,15 +47,13 @@ public class ReqIpUtil {
}
Vertx vertx = Vertx.vertx();
WebClient webClient = WebClient.create(vertx);
// 发送GET请求
WebClientSession webClientSession = WebClientSession.create(webClient);
// 使用共享的 Vertx 实例和 WebClient避免每次创建新实例导致资源泄漏
private static final WebClient WEB_CLIENT = WebClient.create(WebClientVertxInit.get());
private static final WebClientSession WEB_CLIENT_SESSION = WebClientSession.create(WEB_CLIENT);
public void exec() {
webClientSession.getAbs(BASE_URL)
WEB_CLIENT_SESSION.getAbs(BASE_URL)
.putHeaders(headers) // 将请求头Map添加到请求中
.send(this::next);
}
@@ -67,7 +65,7 @@ public class ReqIpUtil {
HttpResponse<Buffer> res = response.result();
log.debug("Received response with status code {}", res.statusCode());
log.debug("Body: {}", res.body());
webClientSession.getAbs(BASE_URL_TEMPLATE).setTemplateParam("path", PATH1)
WEB_CLIENT_SESSION.getAbs(BASE_URL_TEMPLATE).setTemplateParam("path", PATH1)
.putHeaders(headers) // 将请求头Map添加到请求中
.send(response2 -> {
log.debug("response2: {}", response2.result().bodyAsString());