fix(web): harden cache and API services

This commit is contained in:
yukaidi
2026-06-10 21:21:52 +08:00
parent 0103841fb5
commit cbb442ceb6
14 changed files with 935 additions and 315 deletions

View File

@@ -3,9 +3,11 @@ package cn.qaiu.lz;
import cn.qaiu.WebClientVertxInit; import cn.qaiu.WebClientVertxInit;
import cn.qaiu.db.pool.JDBCPoolInit; import cn.qaiu.db.pool.JDBCPoolInit;
import cn.qaiu.lz.common.cache.CacheConfigLoader; import cn.qaiu.lz.common.cache.CacheConfigLoader;
import cn.qaiu.lz.common.cache.CacheManager;
import cn.qaiu.lz.common.interceptorImpl.RateLimiter; import cn.qaiu.lz.common.interceptorImpl.RateLimiter;
import cn.qaiu.lz.web.config.PlaygroundConfig; import cn.qaiu.lz.web.config.PlaygroundConfig;
import cn.qaiu.lz.web.service.DbService; import cn.qaiu.lz.web.service.DbService;
import cn.qaiu.lz.web.service.impl.ShoutServiceImpl;
import cn.qaiu.parser.custom.CustomParserConfig; import cn.qaiu.parser.custom.CustomParserConfig;
import cn.qaiu.parser.custom.CustomParserRegistry; import cn.qaiu.parser.custom.CustomParserRegistry;
import cn.qaiu.parser.customjs.JsScriptMetadataParser; import cn.qaiu.parser.customjs.JsScriptMetadataParser;
@@ -20,6 +22,7 @@ import io.vertx.core.json.jackson.DatabindCodec;
import io.vertx.core.shareddata.LocalMap; import io.vertx.core.shareddata.LocalMap;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.commons.lang3.time.DateFormatUtils;
import org.slf4j.LoggerFactory;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
@@ -38,22 +41,45 @@ import static cn.qaiu.vx.core.util.ConfigConstant.LOCAL;
public class AppMain { public class AppMain {
public static void main(String[] args) { public static void main(String[] args) {
// 先注册 ShutdownHookJVM 逆序执行,先注册的后执行) applyRuntimeLogLevelOverride();
// 确保关闭顺序Vert.x -> JDBCPoolInit -> JsParserExecutor Deploy deploy = Deploy.instance();
Runtime.getRuntime().addShutdownHook(new Thread(() -> { // 先阻断应用级定时任务,再让 Vert.x 停入口和 verticle。
try { deploy.addPreShutdownTask(CacheManager::cancelPeriodicCleanup);
JDBCPoolInit.instance().close(); deploy.addPreShutdownTask(ShoutServiceImpl::cancelCleanup);
} catch (Exception e) { // Vert.x 停完后再关数据库和解析器共享资源,避免请求还在路上就先关底层 client。
// ignore deploy.addPostShutdownTask(() -> JDBCPoolInit.instance().close());
} deploy.addPostShutdownTask(cn.qaiu.parser.customjs.JsParserExecutor::shutdownExecutor);
try { deploy.addPostShutdownTask(cn.qaiu.parser.customjs.JsPlaygroundExecutor::shutdownPools);
cn.qaiu.parser.customjs.JsParserExecutor.shutdownExecutor(); deploy.addPostShutdownTask(cn.qaiu.parser.customjs.JsHttpClient::shutdownSharedClient);
} catch (Exception e) { deploy.addPostShutdownTask(cn.qaiu.parser.PanBase::shutdownSharedClients);
// ignore deploy.addPostShutdownTask(cn.qaiu.parser.IPanTool::shutdownCloseAfterScheduler);
} deploy.addPostShutdownTask(cn.qaiu.parser.impl.PodTool::shutdownWorkerExecutor);
}));
// start // start
Deploy.instance().start(args, AppMain::exec); deploy.start(args, AppMain::exec);
}
private static void applyRuntimeLogLevelOverride() {
String levelName = System.getProperty("NFD_LOG_LEVEL");
if (levelName == null || levelName.isBlank()) {
levelName = System.getenv("NFD_LOG_LEVEL");
}
if (levelName == null || levelName.isBlank()) {
return;
}
try {
var level = ch.qos.logback.classic.Level.toLevel(levelName, null);
if (level == null) {
log.warn("忽略无效的 NFD_LOG_LEVEL: {}", levelName);
return;
}
var logger = LoggerFactory.getLogger("cn.qaiu");
if (logger instanceof ch.qos.logback.classic.Logger logbackLogger) {
logbackLogger.setLevel(level);
log.info("cn.qaiu 日志级别已覆盖为 {}", level);
}
} catch (Exception e) {
log.warn("覆盖 cn.qaiu 日志级别失败: {}", e.getMessage());
}
} }
/** /**
@@ -65,6 +91,8 @@ public class AppMain {
private static void exec(JsonObject jsonObject) { private static void exec(JsonObject jsonObject) {
WebClientVertxInit.init(VertxHolder.getVertxInstance()); WebClientVertxInit.init(VertxHolder.getVertxInstance());
DatabindCodec.mapper().registerModule(new JavaTimeModule()); DatabindCodec.mapper().registerModule(new JavaTimeModule());
// 演练场配置要先加载,后续启动流程才能按开关决定是否注册动态解析器。
PlaygroundConfig.loadFromJson(jsonObject);
// 限流 // 限流
if (jsonObject.containsKey("rateLimit")) { if (jsonObject.containsKey("rateLimit")) {
JsonObject rateLimit = jsonObject.getJsonObject("rateLimit"); JsonObject rateLimit = jsonObject.getJsonObject("rateLimit");
@@ -108,7 +136,12 @@ public class AppMain {
} catch (Exception e) { } catch (Exception e) {
log.warn("读取代理配置失败,使用默认页面地址: {}", e.getMessage()); log.warn("读取代理配置失败,使用默认页面地址: {}", e.getMessage());
} }
loadPlaygroundParsers(pageAddr); if (PlaygroundConfig.getInstance().isEnabled()) {
loadPlaygroundParsers(pageAddr);
} else {
log.info("演练场功能已禁用,跳过加载演练场解析器");
log.info("服务已启动,可通过 {} 访问页面", pageAddr);
}
System.out.println("启动成功: \n本地服务地址: " + addr); System.out.println("启动成功: \n本地服务地址: " + addr);
}); });
}); });
@@ -142,9 +175,6 @@ public class AppMain {
JsonObject auths = jsonObject.getJsonObject(ConfigConstant.AUTHS); JsonObject auths = jsonObject.getJsonObject(ConfigConstant.AUTHS);
localMap.put(ConfigConstant.AUTHS, auths); localMap.put(ConfigConstant.AUTHS, auths);
} }
// 演练场配置
PlaygroundConfig.loadFromJson(jsonObject);
} }
/** /**
@@ -153,34 +183,31 @@ public class AppMain {
private static void loadPlaygroundParsers(String accessAddr) { private static void loadPlaygroundParsers(String accessAddr) {
DbService dbService = AsyncServiceUtil.getAsyncServiceInstance(DbService.class); DbService dbService = AsyncServiceUtil.getAsyncServiceInstance(DbService.class);
dbService.getPlaygroundParserList().onSuccess(result -> { dbService.getEnabledPlaygroundParsersForLoad().onSuccess(result -> {
JsonArray parsers = result.getJsonArray("data"); JsonArray parsers = result.getJsonArray("data");
if (parsers != null) { if (parsers != null) {
int loadedCount = 0; int loadedCount = 0;
for (int i = 0; i < parsers.size(); i++) { for (int i = 0; i < parsers.size(); i++) {
JsonObject parser = parsers.getJsonObject(i); JsonObject parser = parsers.getJsonObject(i);
// 只注册已启用的解析器 try {
if (parser.getBoolean("enabled", false)) { String jsCode = parser.getString("jsCode");
try { if (jsCode == null || jsCode.trim().isEmpty()) {
String jsCode = parser.getString("jsCode"); log.error("加载演练场解析器失败: {} - JavaScript代码为空", parser.getString("name"));
if (jsCode == null || jsCode.trim().isEmpty()) { continue;
log.error("加载演练场解析器失败: {} - JavaScript代码为空", parser.getString("name")); }
continue; CustomParserConfig config = JsScriptMetadataParser.parseScript(jsCode);
} CustomParserRegistry.register(config);
CustomParserConfig config = JsScriptMetadataParser.parseScript(jsCode); loadedCount++;
CustomParserRegistry.register(config); log.info("已加载演练场解析器: {} ({})",
loadedCount++; config.getDisplayName(), config.getType());
log.info("已加载演练场解析器: {} ({})", } catch (Exception e) {
config.getDisplayName(), config.getType()); String parserName = parser.getString("name");
} catch (Exception e) { String errorMsg = e.getMessage();
String parserName = parser.getString("name"); log.error("加载演练场解析器失败: {} - {}", parserName, errorMsg, e);
String errorMsg = e.getMessage(); // 如果是require相关错误提供更详细的提示
log.error("加载演练场解析器失败: {} - {}", parserName, errorMsg, e); if (errorMsg != null && errorMsg.contains("require")) {
// 如果是require相关错误提供更详细的提示 log.error("提示演练场解析器不支持CommonJS模块系统require请确保代码使用ES5.1语法");
if (errorMsg != null && errorMsg.contains("require")) {
log.error("提示演练场解析器不支持CommonJS模块系统require请确保代码使用ES5.1语法");
}
} }
} }
} }

View File

@@ -5,8 +5,10 @@ import cn.qaiu.db.pool.JDBCType;
import cn.qaiu.lz.web.model.CacheLinkInfo; import cn.qaiu.lz.web.model.CacheLinkInfo;
import cn.qaiu.lz.web.model.PanFileInfo; import cn.qaiu.lz.web.model.PanFileInfo;
import cn.qaiu.lz.web.model.PanFileInfoRowMapper; import cn.qaiu.lz.web.model.PanFileInfoRowMapper;
import cn.qaiu.vx.core.util.VertxHolder;
import io.vertx.core.Future; import io.vertx.core.Future;
import io.vertx.core.Promise; import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject; import io.vertx.core.json.JsonObject;
import io.vertx.sqlclient.Pool; import io.vertx.sqlclient.Pool;
import io.vertx.sqlclient.Row; import io.vertx.sqlclient.Row;
@@ -18,14 +20,24 @@ import org.slf4j.LoggerFactory;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class CacheManager { public class CacheManager {
private static final int MAX_SHARE_KEY_LENGTH = 1024;
private final Pool jdbcPool = JDBCPoolInit.instance().getPool(); private final Pool jdbcPool = JDBCPoolInit.instance().getPool();
private final JDBCType jdbcType = JDBCPoolInit.instance().getType(); private final JDBCType jdbcType = JDBCPoolInit.instance().getType();
private static final Logger LOGGER = LoggerFactory.getLogger(CacheManager.class); private static final Logger LOGGER = LoggerFactory.getLogger(CacheManager.class);
public Future<CacheLinkInfo> get(String cacheKey) { public Future<CacheLinkInfo> get(String cacheKey) {
if (isOversizedShareKey(cacheKey)) {
LOGGER.warn("缓存key过长跳过缓存读取: length={}, prefix={}",
cacheKey.length(), previewShareKey(cacheKey));
return Future.succeededFuture(new CacheLinkInfo(JsonObject.of("cacheHit", false, "shareKey", cacheKey)));
}
String sql = "SELECT share_key as shareKey, direct_link as directLink, expiration FROM cache_link_info WHERE share_key = #{share_key}"; String sql = "SELECT share_key as shareKey, direct_link as directLink, expiration FROM cache_link_info WHERE share_key = #{share_key}";
String sql2 = "SELECT * FROM pan_file_info WHERE share_key = #{share_key}"; String sql2 = "SELECT * FROM pan_file_info WHERE share_key = #{share_key}";
Map<String, Object> params = new HashMap<>(); Map<String, Object> params = new HashMap<>();
@@ -65,6 +77,17 @@ public class CacheManager {
// 插入或更新缓存数据 // 插入或更新缓存数据
public void cacheShareLink(CacheLinkInfo cacheLinkInfo) { public void cacheShareLink(CacheLinkInfo cacheLinkInfo) {
if (cacheLinkInfo == null) {
LOGGER.warn("缓存信息为空,跳过缓存写入");
return;
}
if (isOversizedShareKey(cacheLinkInfo.getShareKey())) {
String shareKey = cacheLinkInfo.getShareKey();
LOGGER.warn("缓存key过长跳过缓存写入: length={}, prefix={}",
shareKey.length(), previewShareKey(shareKey));
return;
}
String sql; String sql;
if (jdbcType == JDBCType.MySQL) { if (jdbcType == JDBCType.MySQL) {
sql = """ sql = """
@@ -125,12 +148,19 @@ public class CacheManager {
} }
}).onFailure(e -> LOGGER.error("文件信息插入失败", e)); }).onFailure(e -> LOGGER.error("文件信息插入失败", e));
} }
}); })
.onFailure(e -> LOGGER.error("查询文件信息缓存失败: shareKey={}", cacheLinkInfo.getShareKey(), e));
} }
} }
// 写入网盘厂商API解析次数 // 写入网盘厂商API解析次数
public Future<Integer> updateTotalByField(String shareKey, CacheTotalField field) { public Future<Integer> updateTotalByField(String shareKey, CacheTotalField field) {
if (isOversizedShareKey(shareKey)) {
LOGGER.warn("缓存key过长跳过统计写入: length={}, prefix={}",
shareKey.length(), previewShareKey(shareKey));
return Future.succeededFuture(0);
}
Promise<Integer> promise = Promise.promise(); Promise<Integer> promise = Promise.promise();
String fieldLower = field.name().toLowerCase(); String fieldLower = field.name().toLowerCase();
String sql; String sql;
@@ -179,6 +209,12 @@ public class CacheManager {
} }
public Future<Integer> getShareKeyTotal(String shareKey, String name) { public Future<Integer> getShareKeyTotal(String shareKey, String name) {
if (isOversizedShareKey(shareKey)) {
LOGGER.warn("缓存key过长跳过统计读取: length={}, prefix={}",
shareKey.length(), previewShareKey(shareKey));
return Future.succeededFuture(null);
}
String sql = """ String sql = """
SELECT `share_key`, SUM({total_name}) AS sum_num SELECT `share_key`, SUM({total_name}) AS sum_num
FROM `api_statistics_info` FROM `api_statistics_info`
@@ -205,21 +241,56 @@ public class CacheManager {
/** /**
* 清理过期缓存记录,防止数据库无限增长 * 清理过期缓存记录,防止数据库无限增长
* @return 删除的行数 * 包括:
* 1. 清理 cache_link_info 中过期的记录
* 2. 清理 pan_file_info 中孤立的记录(对应的 cache_link_info 已被删除)
* @return 删除的总行数
*/ */
public Future<Integer> cleanupExpiredCache() { public Future<Integer> cleanupExpiredCache() {
String sql = "DELETE FROM cache_link_info WHERE expiration > 0 AND expiration < #{now}";
Map<String, Object> params = new HashMap<>();
params.put("now", System.currentTimeMillis());
Promise<Integer> promise = Promise.promise(); Promise<Integer> promise = Promise.promise();
SqlTemplate.forUpdate(jdbcPool, sql) long now = System.currentTimeMillis();
// 第一步:清理 cache_link_info 中过期的记录
String sqlDeleteExpired = "DELETE FROM cache_link_info WHERE expiration > 0 AND expiration < #{now}";
Map<String, Object> params = new HashMap<>();
params.put("now", now);
SqlTemplate.forUpdate(jdbcPool, sqlDeleteExpired)
.execute(params) .execute(params)
.onSuccess(res -> { .onSuccess(res -> {
int deleted = res.rowCount(); int deletedCache = res.rowCount();
if (deleted > 0) { if (deletedCache > 0) {
LOGGER.info("清理过期缓存记录 {} 条", deleted); LOGGER.info("清理过期缓存记录 {} 条", deletedCache);
} }
promise.complete(deleted);
// 第二步:清理 pan_file_info 中孤立的记录
// 使用 share_key 关联create_time 使用字符串格式比较yyyy-MM-dd HH:mm:ss
String sqlDeleteOrphans = """
DELETE FROM pan_file_info
WHERE share_key NOT IN (
SELECT DISTINCT share_key FROM cache_link_info WHERE share_key IS NOT NULL
)
AND (create_time IS NULL OR create_time < #{thresholdTime})
""";
Map<String, Object> orphanParams = new HashMap<>();
// 计算1天前的时间转换为 yyyy-MM-dd HH:mm:ss 格式
java.time.LocalDateTime thresholdTime = java.time.LocalDateTime.now().minusDays(1);
orphanParams.put("thresholdTime", thresholdTime.format(java.time.format.DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
SqlTemplate.forUpdate(jdbcPool, sqlDeleteOrphans)
.execute(orphanParams)
.onSuccess(res2 -> {
int deletedOrphans = res2.rowCount();
if (deletedOrphans > 0) {
LOGGER.info("清理孤立文件信息记录 {} 条", deletedOrphans);
}
promise.complete(deletedCache + deletedOrphans);
})
.onFailure(e -> {
LOGGER.warn("清理孤立文件信息记录失败(不影响主流程)", e);
// 即使孤立记录清理失败,也返回已删除的缓存记录数
promise.complete(deletedCache);
});
}) })
.onFailure(e -> { .onFailure(e -> {
LOGGER.error("清理过期缓存失败", e); LOGGER.error("清理过期缓存失败", e);
@@ -232,31 +303,121 @@ public class CacheManager {
* 注册定时清理过期缓存任务(每小时执行一次) * 注册定时清理过期缓存任务(每小时执行一次)
* 应在应用启动后调用 * 应在应用启动后调用
*/ */
private static volatile boolean cleanupRegistered = false; private static final long CLEANUP_INTERVAL_MILLIS = 3600_000L;
private static final long CLEANUP_SHUTDOWN_WAIT_MILLIS = 5_000L;
private static final AtomicBoolean CLEANUP_REGISTERED = new AtomicBoolean(false);
private static final AtomicInteger CLEANUP_IN_FLIGHT = new AtomicInteger(0);
private static final Object CLEANUP_MONITOR = new Object();
private static volatile Long cleanupTimerId;
private static volatile Vertx cleanupVertx;
public static void registerPeriodicCleanup() { public static void registerPeriodicCleanup() {
if (cleanupRegistered) return; if (!CLEANUP_REGISTERED.compareAndSet(false, true)) {
return;
}
try { try {
io.vertx.core.Vertx vertx = cn.qaiu.vx.core.util.VertxHolder.getVertxInstance(); Vertx vertx = VertxHolder.getVertxInstance();
if (vertx == null) { cleanupVertx = vertx;
LOGGER.warn("Vertx 未就绪,缓存定时清理任务延迟注册"); cleanupTimerId = vertx.setPeriodic(CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS,
return; id -> cleanupExpiredCacheSafely());
}
cleanupRegistered = true;
vertx.setPeriodic(3600_000, 3600_000, id -> {
try {
new CacheManager().cleanupExpiredCache();
} catch (Exception e) {
LOGGER.warn("定时清理缓存任务跳过(数据库可能未就绪)", e);
}
});
LOGGER.info("缓存定时清理任务已注册(每小时执行)"); LOGGER.info("缓存定时清理任务已注册(每小时执行)");
} catch (Exception e) { } catch (Exception e) {
cleanupTimerId = null;
cleanupVertx = null;
CLEANUP_REGISTERED.set(false);
LOGGER.warn("注册缓存定时清理任务失败", e); LOGGER.warn("注册缓存定时清理任务失败", e);
} }
} }
public static void cancelPeriodicCleanup() {
Long timerId = cleanupTimerId;
Vertx vertx = cleanupVertx;
cleanupTimerId = null;
cleanupVertx = null;
CLEANUP_REGISTERED.set(false);
if (timerId == null || vertx == null) {
waitForCleanupToFinish();
return;
}
try {
if (vertx.cancelTimer(timerId)) {
LOGGER.info("缓存定时清理任务已取消");
}
} catch (Exception e) {
LOGGER.warn("取消缓存定时清理任务失败", e);
}
waitForCleanupToFinish();
}
private static void cleanupExpiredCacheSafely() {
cleanupStarted();
boolean asyncCleanupStarted = false;
try {
if (!CLEANUP_REGISTERED.get()) {
return;
}
JDBCPoolInit poolInit = JDBCPoolInit.instance();
if (poolInit == null || poolInit.getPool() == null) {
LOGGER.debug("数据库连接池未就绪,跳过缓存定时清理");
return;
}
Future<Integer> cleanupFuture = new CacheManager().cleanupExpiredCache();
asyncCleanupStarted = true;
cleanupFuture.onComplete(ar -> {
if (ar.failed()) {
LOGGER.warn("定时清理缓存失败", ar.cause());
}
cleanupFinished();
});
} catch (Exception e) {
LOGGER.warn("定时清理缓存任务跳过(数据库可能正在关闭)", e);
} finally {
if (!asyncCleanupStarted) {
cleanupFinished();
}
}
}
private static void cleanupStarted() {
CLEANUP_IN_FLIGHT.incrementAndGet();
}
private static void cleanupFinished() {
if (CLEANUP_IN_FLIGHT.decrementAndGet() <= 0) {
synchronized (CLEANUP_MONITOR) {
CLEANUP_MONITOR.notifyAll();
}
}
}
private static void waitForCleanupToFinish() {
long deadline = System.currentTimeMillis() + CLEANUP_SHUTDOWN_WAIT_MILLIS;
synchronized (CLEANUP_MONITOR) {
while (CLEANUP_IN_FLIGHT.get() > 0) {
long waitMillis = deadline - System.currentTimeMillis();
if (waitMillis <= 0) {
LOGGER.warn("等待缓存定时清理结束超时,剩余任务数: {}", CLEANUP_IN_FLIGHT.get());
return;
}
try {
CLEANUP_MONITOR.wait(waitMillis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warn("等待缓存定时清理结束被中断");
return;
}
}
}
}
public Future<Map<String, Integer>> getShareKeyTotal(String shareKey) { public Future<Map<String, Integer>> getShareKeyTotal(String shareKey) {
if (isOversizedShareKey(shareKey)) {
LOGGER.warn("缓存key过长跳过统计读取: length={}, prefix={}",
shareKey.length(), previewShareKey(shareKey));
return Future.succeededFuture(null);
}
String sql = """ String sql = """
SELECT `share_key`, SUM(cache_hit_total) AS hit_total, SUM(api_parser_total) AS parser_total SELECT `share_key`, SUM(cache_hit_total) AS hit_total, SUM(api_parser_total) AS parser_total
FROM `api_statistics_info` FROM `api_statistics_info`
@@ -287,4 +448,16 @@ public class CacheManager {
return promise.future(); return promise.future();
} }
private static boolean isOversizedShareKey(String shareKey) {
return shareKey != null && shareKey.length() > MAX_SHARE_KEY_LENGTH;
}
private static String previewShareKey(String shareKey) {
if (shareKey == null) {
return "";
}
int maxLength = 120;
return shareKey.length() <= maxLength ? shareKey : shareKey.substring(0, maxLength) + "...";
}
} }

View File

@@ -9,6 +9,10 @@ import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.RoutingContext; import io.vertx.ext.web.RoutingContext;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static cn.qaiu.vx.core.util.ConfigConstant.IGNORES_REG; import static cn.qaiu.vx.core.util.ConfigConstant.IGNORES_REG;
import static io.vertx.core.http.HttpHeaders.CONTENT_TYPE; import static io.vertx.core.http.HttpHeaders.CONTENT_TYPE;
@@ -19,7 +23,15 @@ import static io.vertx.core.http.HttpHeaders.CONTENT_TYPE;
@HandleSortFilter(1) @HandleSortFilter(1)
public class DefaultInterceptor implements BeforeInterceptor { public class DefaultInterceptor implements BeforeInterceptor {
protected final JsonArray ignores = SharedDataUtil.getJsonArrayForCustomConfig(IGNORES_REG); /** 预编译的忽略路径正则列表,避免每次请求重新编译 */
protected final List<Pattern> ignorePatterns;
public DefaultInterceptor() {
JsonArray ignores = SharedDataUtil.getJsonArrayForCustomConfig(IGNORES_REG);
this.ignorePatterns = ignores.stream()
.map(obj -> Pattern.compile(obj.toString()))
.collect(Collectors.toList());
}
@Override @Override
public void handle(RoutingContext ctx) { public void handle(RoutingContext ctx) {
@@ -38,28 +50,30 @@ public class DefaultInterceptor implements BeforeInterceptor {
// limit: 1000 // limit: 1000
// # 限流的时间窗口(单位秒) // # 限流的时间窗口(单位秒)
// timeWindow: 60 // timeWindow: 60
if (rateLimit.getBoolean("enable")) { if (!rateLimit.getBoolean("enable", false)) {
// 获取当前请求的路径 doNext(ctx);
String path = ctx.request().path(); return;
// 正则匹配路径
if (ignores.stream().anyMatch(ignore -> path.matches(ignore.toString()))) {
// 如果匹配到忽略的路径,则不进行限流
doNext(ctx);
return;
}
RateLimiter.checkRateLimit(ctx.request())
.onSuccess(v -> {
// 继续执行下一个拦截器
doNext(ctx);
})
.onFailure(t -> {
// 限流失败,返回错误响应
log.warn("Rate limit exceeded for path: {}", path);
ctx.response().putHeader(CONTENT_TYPE, "text/html; charset=utf-8")
.setStatusCode(429)
.end(t.getMessage());
});
} }
// 获取当前请求的路径
String path = ctx.request().path();
// 正则匹配路径
if (ignorePatterns.stream().anyMatch(pattern -> pattern.matcher(path).matches())) {
// 如果匹配到忽略的路径,则不进行限流
doNext(ctx);
return;
}
RateLimiter.checkRateLimit(ctx.request())
.onSuccess(v -> {
// 继续执行下一个拦截器
doNext(ctx);
})
.onFailure(t -> {
// 限流失败,返回错误响应
log.warn("Rate limit exceeded for path: {}", path);
ctx.response().putHeader(CONTENT_TYPE, "text/html; charset=utf-8")
.setStatusCode(429)
.end(t.getMessage());
});
} }
} }

View File

@@ -17,12 +17,19 @@ public class RateLimiter {
private static final Map<String, RequestInfo> ipRequestMap = new ConcurrentHashMap<>(); private static final Map<String, RequestInfo> ipRequestMap = new ConcurrentHashMap<>();
private static int MAX_REQUESTS = 10; // 最大请求次数 private static int MAX_REQUESTS = 10; // 最大请求次数
private static int MAX_ENTRIES = 10_000;
private static long TIME_WINDOW = 60 * 1000; // 时间窗口(毫秒) private static long TIME_WINDOW = 60 * 1000; // 时间窗口(毫秒)
private static String PATH_REG; // 限流路径正则 private static String PATH_REG = "/.*"; // 限流路径正则(默认匹配所有路径)
// 上次清理时间
private static volatile long lastCleanupTime = System.currentTimeMillis();
// 清理间隔30秒
private static final long CLEANUP_INTERVAL = 30 * 1000;
public static void init(JsonObject rateLimitConfig) { public static void init(JsonObject rateLimitConfig) {
MAX_REQUESTS = rateLimitConfig.getInteger("limit", 10); MAX_REQUESTS = rateLimitConfig.getInteger("limit", 10);
MAX_ENTRIES = rateLimitConfig.getInteger("maxEntries", 10_000);
TIME_WINDOW = rateLimitConfig.getInteger("timeWindow", 60) * 1000L; // 转换为毫秒 TIME_WINDOW = rateLimitConfig.getInteger("timeWindow", 60) * 1000L; // 转换为毫秒
PATH_REG = rateLimitConfig.getString("pathReg", "/.*"); PATH_REG = rateLimitConfig.getString("pathReg", "/.*");
log.info("RateLimiter initialized with max requests: {}, time window: {} ms, path regex: {}", log.info("RateLimiter initialized with max requests: {}, time window: {} ms, path regex: {}",
@@ -39,28 +46,37 @@ public class RateLimiter {
String ip = request.remoteAddress().host(); String ip = request.remoteAddress().host();
// 定期清理过期条目,防止 Map 无限增长 // 基于时间间隔的清理策略,避免 Map 无限增长
if (ipRequestMap.size() > 1000) { long now = System.currentTimeMillis();
long now = System.currentTimeMillis(); if (now - lastCleanupTime > CLEANUP_INTERVAL) {
ipRequestMap.entrySet().removeIf(entry -> now - entry.getValue().timestamp > TIME_WINDOW); cleanupExpiredEntries(now, false);
} }
RequestInfo info;
RequestInfo info = ipRequestMap.compute(ip, (key, requestInfo) -> { synchronized (RateLimiter.class) {
long currentTime = System.currentTimeMillis(); if (!ipRequestMap.containsKey(ip) && ipRequestMap.size() >= MAX_ENTRIES) {
if (requestInfo == null || currentTime - requestInfo.timestamp > TIME_WINDOW) { cleanupExpiredEntries(now, true);
// 初始化或重置计数器 if (ipRequestMap.size() >= MAX_ENTRIES) {
return new RequestInfo(1, currentTime); promise.fail("限流记录过多,请稍后再试。");
} else { return promise.future();
// 增加计数器 }
requestInfo.count.incrementAndGet();
return requestInfo;
} }
});
info = ipRequestMap.compute(ip, (key, requestInfo) -> {
if (requestInfo == null || now - requestInfo.timestamp > TIME_WINDOW) {
// 初始化或重置计数器
return new RequestInfo(1, now);
} else {
// 增加计数器
requestInfo.count.incrementAndGet();
return requestInfo;
}
});
}
if (info.count.get() > MAX_REQUESTS) { if (info.count.get() > MAX_REQUESTS) {
// 超过限制 // 超过限制
// 计算剩余时间 // 计算剩余时间
long remainingTime = TIME_WINDOW - (System.currentTimeMillis() - info.timestamp); long remainingTime = TIME_WINDOW - (now - info.timestamp);
BigDecimal bigDecimal = BigDecimal.valueOf(remainingTime / 1000.0) BigDecimal bigDecimal = BigDecimal.valueOf(remainingTime / 1000.0)
.setScale(2, RoundingMode.HALF_UP); .setScale(2, RoundingMode.HALF_UP);
promise.fail("请求次数太多了,请" + bigDecimal + "秒后再试。"); promise.fail("请求次数太多了,请" + bigDecimal + "秒后再试。");
@@ -71,6 +87,27 @@ public class RateLimiter {
return promise.future(); return promise.future();
} }
/**
* 清理过期的限流条目
* 使用 synchronized 避免并发清理
*/
private static synchronized void cleanupExpiredEntries(long now, boolean force) {
// 双重检查,避免重复清理
if (!force && now - lastCleanupTime <= CLEANUP_INTERVAL) {
return;
}
lastCleanupTime = now;
int sizeBefore = ipRequestMap.size();
if (sizeBefore > 0) {
ipRequestMap.entrySet().removeIf(entry -> now - entry.getValue().timestamp > TIME_WINDOW);
int sizeAfter = ipRequestMap.size();
if (sizeBefore > 100 || sizeAfter != sizeBefore) {
log.debug("RateLimiter 清理过期条目: {} -> {}", sizeBefore, sizeAfter);
}
}
}
private static class RequestInfo { private static class RequestInfo {
final AtomicInteger count; final AtomicInteger count;
volatile long timestamp; volatile long timestamp;

View File

@@ -19,6 +19,7 @@ public class PlaygroundConfig {
/** Token有效期24小时 */ /** Token有效期24小时 */
private static final long TOKEN_EXPIRY_MS = 24 * 60 * 60 * 1000L; private static final long TOKEN_EXPIRY_MS = 24 * 60 * 60 * 1000L;
private static final int MAX_TOKEN_COUNT = 1024;
private static final SecureRandom SECURE_RANDOM = new SecureRandom(); private static final SecureRandom SECURE_RANDOM = new SecureRandom();
@@ -59,10 +60,10 @@ public class PlaygroundConfig {
/** /**
* 生成并存储一个新的认证Token同时清理过期Token * 生成并存储一个新的认证Token同时清理过期Token
*/ */
public String generateToken() { public synchronized String generateToken() {
// 清理过期Token防止Map无限增长
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
validTokens.entrySet().removeIf(e -> now - e.getValue() > TOKEN_EXPIRY_MS); cleanupExpiredTokens(now);
evictOldestTokensIfNeeded();
// 使用SecureRandom生成32字节的密码学安全Token // 使用SecureRandom生成32字节的密码学安全Token
byte[] bytes = new byte[32]; byte[] bytes = new byte[32];
SECURE_RANDOM.nextBytes(bytes); SECURE_RANDOM.nextBytes(bytes);
@@ -78,16 +79,39 @@ public class PlaygroundConfig {
/** /**
* 校验Token是否合法且未过期 * 校验Token是否合法且未过期
*/ */
public boolean validateToken(String token) { public synchronized boolean validateToken(String token) {
if (token == null || token.isEmpty()) return false; if (token == null || token.isEmpty()) return false;
long now = System.currentTimeMillis();
cleanupExpiredTokens(now);
Long createdAt = validTokens.get(token); Long createdAt = validTokens.get(token);
if (createdAt == null) return false; if (createdAt == null) return false;
if (System.currentTimeMillis() - createdAt > TOKEN_EXPIRY_MS) { if (now - createdAt > TOKEN_EXPIRY_MS) {
validTokens.remove(token); validTokens.remove(token);
return false; return false;
} }
return true; return true;
} }
private void cleanupExpiredTokens(long now) {
validTokens.entrySet().removeIf(e -> now - e.getValue() > TOKEN_EXPIRY_MS);
}
private void evictOldestTokensIfNeeded() {
while (validTokens.size() >= MAX_TOKEN_COUNT) {
String oldestToken = null;
long oldestTime = Long.MAX_VALUE;
for (Map.Entry<String, Long> entry : validTokens.entrySet()) {
if (entry.getValue() < oldestTime) {
oldestTime = entry.getValue();
oldestToken = entry.getKey();
}
}
if (oldestToken == null) {
return;
}
validTokens.remove(oldestToken);
}
}
/** /**
* 获取单例实例 * 获取单例实例
@@ -125,5 +149,13 @@ public class PlaygroundConfig {
} else { } else {
log.info("未找到playground配置使用默认值: enabled=false, public=false"); log.info("未找到playground配置使用默认值: enabled=false, public=false");
} }
String enabledOverride = System.getProperty("NFD_PLAYGROUND_ENABLED");
if (enabledOverride == null || enabledOverride.isBlank()) {
enabledOverride = System.getenv("NFD_PLAYGROUND_ENABLED");
}
if (enabledOverride != null && !enabledOverride.isBlank()) {
cfg.enabled = Boolean.parseBoolean(enabledOverride);
log.info("Playground enabled 已被 NFD_PLAYGROUND_ENABLED 覆盖为 {}", cfg.enabled);
}
} }
} }

View File

@@ -13,6 +13,7 @@ import cn.qaiu.lz.web.model.LinkInfoResp;
import cn.qaiu.lz.web.model.StatisticsInfo; import cn.qaiu.lz.web.model.StatisticsInfo;
import cn.qaiu.lz.web.service.DbService; import cn.qaiu.lz.web.service.DbService;
import cn.qaiu.parser.PanDomainTemplate; import cn.qaiu.parser.PanDomainTemplate;
import cn.qaiu.parser.IPanTool;
import cn.qaiu.parser.ParserCreate; import cn.qaiu.parser.ParserCreate;
import cn.qaiu.parser.clientlink.ClientLinkType; import cn.qaiu.parser.clientlink.ClientLinkType;
import cn.qaiu.vx.core.annotaions.RouteHandler; import cn.qaiu.vx.core.annotaions.RouteHandler;
@@ -76,11 +77,30 @@ public class ParserApi {
private static final CacheManager cacheManager = new CacheManager(); private static final CacheManager cacheManager = new CacheManager();
private static final ServerApi serverApi = new ServerApi(); private static final ServerApi serverApi = new ServerApi();
@RouteMapping(value = "/check/:type/:key", method = RouteMethod.GET)
public void check(HttpServerResponse response, String type, String key) {
response.putHeader("Content-Type", "text/plain; charset=utf-8")
.setStatusCode(200)
.end("ok");
}
@RouteMapping(value = "/check/:type/:key", method = RouteMethod.HEAD)
public void checkHead(HttpServerResponse response, String type, String key) {
response.putHeader("Content-Type", "text/plain; charset=utf-8")
.setStatusCode(200)
.end();
}
@RouteMapping(value = "/linkInfo", method = RouteMethod.GET) @RouteMapping(value = "/linkInfo", method = RouteMethod.GET)
public Future<LinkInfoResp> parse(HttpServerRequest request, String pwd, String auth) { public Future<LinkInfoResp> parse(HttpServerRequest request, String pwd, String auth) {
Promise<LinkInfoResp> promise = Promise.promise(); Promise<LinkInfoResp> promise = Promise.promise();
String url = URLParamUtil.parserParams(request); String url = URLParamUtil.parserParams(request);
ParserCreate parserCreate = ParserCreate.fromShareUrl(url).setShareLinkInfoPwd(pwd); ParserCreate parserCreate;
try {
parserCreate = ParserCreate.fromShareUrl(url).setShareLinkInfoPwd(pwd);
} catch (Exception e) {
return Future.failedFuture(e);
}
ShareLinkInfo shareLinkInfo = parserCreate.getShareLinkInfo(); ShareLinkInfo shareLinkInfo = parserCreate.getShareLinkInfo();
// 构建链接信息响应,如果有 auth 参数则附加到链接中 // 构建链接信息响应,如果有 auth 参数则附加到链接中
@@ -141,7 +161,12 @@ public class ParserApi {
@RouteMapping("/getFileList") @RouteMapping("/getFileList")
public Future<List<FileInfo>> getFileList(HttpServerRequest request, String pwd, String dirId, String uuid) { public Future<List<FileInfo>> getFileList(HttpServerRequest request, String pwd, String dirId, String uuid) {
String url = URLParamUtil.parserParams(request); String url = URLParamUtil.parserParams(request);
ParserCreate parserCreate = ParserCreate.fromShareUrl(url).setShareLinkInfoPwd(pwd); ParserCreate parserCreate;
try {
parserCreate = ParserCreate.fromShareUrl(url).setShareLinkInfoPwd(pwd);
} catch (Exception e) {
return Future.failedFuture(e);
}
String linkPrefix = getLinkPrefix(request); String linkPrefix = getLinkPrefix(request);
parserCreate.getShareLinkInfo().getOtherParam().put("domainName", linkPrefix); parserCreate.getShareLinkInfo().getOtherParam().put("domainName", linkPrefix);
parserCreate.getShareLinkInfo().getOtherParam().put("_requestOrigin", linkPrefix); parserCreate.getShareLinkInfo().getOtherParam().put("_requestOrigin", linkPrefix);
@@ -151,7 +176,8 @@ public class ParserApi {
if (StringUtils.isNotBlank(uuid)) { if (StringUtils.isNotBlank(uuid)) {
parserCreate.getShareLinkInfo().getOtherParam().put("uuid", uuid); parserCreate.getShareLinkInfo().getOtherParam().put("uuid", uuid);
} }
return parserCreate.createTool().parseFileList(); IPanTool tool = parserCreate.createTool();
return IPanTool.closeAfter(tool, tool::parseFileList);
} }
// 目录解析下载文件 // 目录解析下载文件
@@ -174,7 +200,8 @@ public class ParserApi {
String linkPrefix = getLinkPrefix(request); String linkPrefix = getLinkPrefix(request);
shareLinkInfo.getOtherParam().put("domainName", linkPrefix); shareLinkInfo.getOtherParam().put("domainName", linkPrefix);
shareLinkInfo.getOtherParam().put("_requestOrigin", linkPrefix); shareLinkInfo.getOtherParam().put("_requestOrigin", linkPrefix);
return parserCreate.createTool().parseById(); IPanTool tool = parserCreate.createTool();
return IPanTool.closeAfter(tool, tool::parseById);
} }
@RouteMapping("/redirectUrl/:type/:param") @RouteMapping("/redirectUrl/:type/:param")
@@ -183,10 +210,9 @@ public class ParserApi {
getFileDownUrl(request, type, param) getFileDownUrl(request, type, param)
.onSuccess(res -> { .onSuccess(res -> {
ResponseUtil.redirect(response, res); ResponseUtil.redirect(response, res, promise);
promise.complete();
}) })
.onFailure(t -> promise.fail(t.fillInStackTrace())); .onFailure(promise::tryFail);
return promise.future(); return promise.future();
} }
@@ -212,7 +238,7 @@ public class ParserApi {
} }
String previewURL = SharedDataUtil.getJsonStringForServerConfig("previewURL"); String previewURL = SharedDataUtil.getJsonStringForServerConfig("previewURL");
serverApi.parseKeyJson(request, type, key).onSuccess(res -> { serverApi.parseKeyJsonForRedirect(request, type, key).onSuccess(res -> {
redirect(response, previewURL, res); redirect(response, previewURL, res);
}).onFailure(e -> { }).onFailure(e -> {
ResponseUtil.fireJsonResultResponse(response, JsonResult.error(e.toString())); ResponseUtil.fireJsonResultResponse(response, JsonResult.error(e.toString()));
@@ -248,7 +274,7 @@ public class ParserApi {
} }
String previewURL = SharedDataUtil.getJsonStringForServerConfig("previewURL"); String previewURL = SharedDataUtil.getJsonStringForServerConfig("previewURL");
serverApi.parseJson(request, pwd, null).onSuccess(res -> { serverApi.parseJsonForRedirect(request, pwd, null).onSuccess(res -> {
redirect(response, previewURL, res); redirect(response, previewURL, res);
}).onFailure(e -> { }).onFailure(e -> {
ResponseUtil.fireJsonResultResponse(response, JsonResult.error(e.toString())); ResponseUtil.fireJsonResultResponse(response, JsonResult.error(e.toString()));
@@ -264,10 +290,9 @@ public class ParserApi {
getFileDownUrl(request, type, param) getFileDownUrl(request, type, param)
.onSuccess(res -> { .onSuccess(res -> {
String url = viewPrefix + URLEncoder.encode(res, StandardCharsets.UTF_8); String url = viewPrefix + URLEncoder.encode(res, StandardCharsets.UTF_8);
ResponseUtil.redirect(response, url); ResponseUtil.redirect(response, url, promise);
promise.complete();
}) })
.onFailure(t -> promise.fail(t.fillInStackTrace())); .onFailure(promise::tryFail);
return promise.future(); return promise.future();
} }
@@ -320,7 +345,8 @@ public class ParserApi {
} }
// 使用默认方法解析并生成客户端链接 // 使用默认方法解析并生成客户端链接
parserCreate.createTool().parseWithClientLinks() IPanTool tool = parserCreate.createTool();
IPanTool.closeAfter(tool, tool::parseWithClientLinks)
.onSuccess(clientLinks -> { .onSuccess(clientLinks -> {
try { try {
ClientLinkResp response = buildClientLinkResponse(shareLinkInfo, clientLinks); ClientLinkResp response = buildClientLinkResponse(shareLinkInfo, clientLinks);
@@ -362,7 +388,8 @@ public class ParserApi {
URLParamUtil.addParam(parserCreate); URLParamUtil.addParam(parserCreate);
// 使用默认方法解析并生成客户端链接 // 使用默认方法解析并生成客户端链接
parserCreate.createTool().parseWithClientLinks() IPanTool tool = parserCreate.createTool();
IPanTool.closeAfter(tool, tool::parseWithClientLinks)
.onSuccess(clientLinks -> { .onSuccess(clientLinks -> {
try { try {
String clientLink = extractClientLinkByType(clientLinks, clientType); String clientLink = extractClientLinkByType(clientLinks, clientType);

View File

@@ -123,6 +123,10 @@ public class PlaygroundApi {
// 获取密码 // 获取密码
JsonObject body = ctx.body().asJsonObject(); JsonObject body = ctx.body().asJsonObject();
if (body == null) {
promise.complete(JsonResult.error("请求体不能为空").toJsonObject());
return promise.future();
}
String password = body.getString("password"); String password = body.getString("password");
if (StringUtils.isBlank(password)) { if (StringUtils.isBlank(password)) {
@@ -249,82 +253,84 @@ public class PlaygroundApi {
ShareLinkInfo shareLinkInfo = parserCreate.getShareLinkInfo(); ShareLinkInfo shareLinkInfo = parserCreate.getShareLinkInfo();
// 创建演练场执行器 // 创建演练场执行器
JsPlaygroundExecutor executor = new JsPlaygroundExecutor(shareLinkInfo, jsCode); final JsPlaygroundExecutor executor = new JsPlaygroundExecutor(shareLinkInfo, jsCode);
// 根据方法类型选择执行,并异步处理结果 // 根据方法类型选择执行,并异步处理结果
Future<Object> executionFuture; Future<Object> executionFuture;
switch (method) { try {
case "parse": switch (method) {
executionFuture = executor.executeParseAsync().map(r -> (Object) r); case "parse":
break; executionFuture = executor.executeParseAsync().map(r -> (Object) r);
case "parseFileList": break;
executionFuture = executor.executeParseFileListAsync().map(r -> (Object) r); case "parseFileList":
break; executionFuture = executor.executeParseFileListAsync().map(r -> (Object) r);
case "parseById": break;
executionFuture = executor.executeParseByIdAsync().map(r -> (Object) r); case "parseById":
break; executionFuture = executor.executeParseByIdAsync().map(r -> (Object) r);
default: break;
promise.fail(new IllegalArgumentException("未知的方法类型: " + method)); default:
return promise.future(); executor.close();
promise.fail(new IllegalArgumentException("未知的方法类型: " + method));
return promise.future();
}
} catch (Exception ex) {
// 同步异常路径executor 已创建但 Future 未注册,需要手动关闭
executor.close();
throw ex;
} }
// 异步处理执行结果 // 异步处理执行结果finally 中统一释放 executor避免响应构建异常导致资源泄漏
executionFuture.onSuccess(result -> { executionFuture.onComplete(ar -> {
log.debug("执行成功,结果类型: {}, 结果值: {}", try {
result != null ? result.getClass().getSimpleName() : "null", long executionTime = System.currentTimeMillis() - startTime;
result); List<JsPlaygroundLogger.LogEntry> logEntries = executor.getLogs();
List<PlaygroundTestResp.LogEntry> respLogs = logEntries.stream()
// 获取日志 .map(entry -> PlaygroundTestResp.LogEntry.builder()
List<JsPlaygroundLogger.LogEntry> logEntries = executor.getLogs(); .level(entry.getLevel())
log.debug("获取到 {} 条日志记录", logEntries.size()); .message(entry.getMessage())
.timestamp(entry.getTimestamp())
List<PlaygroundTestResp.LogEntry> respLogs = logEntries.stream() .source(entry.getSource())
.map(entry -> PlaygroundTestResp.LogEntry.builder() .build())
.level(entry.getLevel()) .collect(Collectors.toList());
.message(entry.getMessage())
.timestamp(entry.getTimestamp())
.source(entry.getSource()) // 使用日志条目的来源标识
.build())
.collect(Collectors.toList());
long executionTime = System.currentTimeMillis() - startTime; if (ar.succeeded()) {
Object result = ar.result();
log.debug("执行成功,结果类型: {}, 结果摘要: {}",
result != null ? result.getClass().getSimpleName() : "null",
summarizeResult(result));
// 构建响应 PlaygroundTestResp response = PlaygroundTestResp.builder()
PlaygroundTestResp response = PlaygroundTestResp.builder() .success(true)
.success(true) .result(result)
.result(result) .logs(respLogs)
.logs(respLogs) .executionTime(executionTime)
.executionTime(executionTime) .build();
.build();
JsonObject jsonResponse = JsonObject.mapFrom(response); log.debug("测试成功响应: success=true, logCount={}", respLogs.size());
log.debug("测试成功响应: {}", jsonResponse.encodePrettily()); promise.complete(JsonObject.mapFrom(response));
promise.complete(jsonResponse); } else {
}).onFailure(e -> { Throwable e = ar.cause();
long executionTime = System.currentTimeMillis() - startTime; String errorMessage = e == null ? "执行失败" : e.getMessage();
String errorMessage = e.getMessage(); log.error("演练场执行失败", e);
log.error("演练场执行失败", e); PlaygroundTestResp response = PlaygroundTestResp.builder()
.success(false)
.error(errorMessage)
.executionTime(executionTime)
.logs(respLogs)
.build();
// 尝试获取已有的日志 promise.complete(JsonObject.mapFrom(response));
List<JsPlaygroundLogger.LogEntry> logEntries = executor.getLogs(); }
List<PlaygroundTestResp.LogEntry> respLogs = logEntries.stream() } catch (Exception e) {
.map(entry -> PlaygroundTestResp.LogEntry.builder() log.error("构建演练场响应失败", e);
.level(entry.getLevel()) promise.tryComplete(JsonObject.mapFrom(PlaygroundTestResp.builder()
.message(entry.getMessage()) .success(false)
.timestamp(entry.getTimestamp()) .error("构建响应失败: " + e.getMessage())
.source(entry.getSource()) // 使用日志条目的来源标识 .build()));
.build()) } finally {
.collect(Collectors.toList()); executor.close();
}
PlaygroundTestResp response = PlaygroundTestResp.builder()
.success(false)
.error(errorMessage)
.executionTime(executionTime)
.logs(respLogs)
.build();
promise.complete(JsonObject.mapFrom(response));
}); });
} catch (Exception e) { } catch (Exception e) {
@@ -462,19 +468,7 @@ public class PlaygroundApi {
} }
// 检查type是否已存在 // 检查type是否已存在
dbService.getPlaygroundParserList().onSuccess(listResult -> { dbService.playgroundParserTypeExists(type, null).onSuccess(exists -> {
var list = listResult.getJsonArray("data");
boolean exists = false;
if (list != null) {
for (int i = 0; i < list.size(); i++) {
var item = list.getJsonObject(i);
if (type.equals(item.getString("type"))) {
exists = true;
break;
}
}
}
if (exists) { if (exists) {
promise.complete(JsonResult.error("解析器类型 " + type + " 已存在,请使用其他类型标识").toJsonObject()); promise.complete(JsonResult.error("解析器类型 " + type + " 已存在,请使用其他类型标识").toJsonObject());
return; return;
@@ -493,25 +487,29 @@ public class PlaygroundApi {
parser.put("ip", getClientIp(ctx.request())); parser.put("ip", getClientIp(ctx.request()));
parser.put("enabled", true); parser.put("enabled", true);
try {
CustomParserRegistry.register(config);
} catch (Exception e) {
log.error("注册解析器失败", e);
promise.complete(JsonResult.error("保存失败,注册解析器失败: " + e.getMessage()).toJsonObject());
return;
}
dbService.savePlaygroundParser(parser).onSuccess(result -> { dbService.savePlaygroundParser(parser).onSuccess(result -> {
// 保存成功后,立即注册到解析器系统 if (!result.getBoolean("success", false)) {
try { CustomParserRegistry.unregister(type);
CustomParserRegistry.register(config); promise.complete(JsonResult.error(result.getString("msg", "保存失败")).toJsonObject());
log.info("已注册演练场解析器: {} ({})", displayName, type); return;
promise.complete(JsonResult.success("保存并注册成功").toJsonObject());
} catch (Exception e) {
log.error("注册解析器失败", e);
// 虽然注册失败,但保存成功了,返回警告
promise.complete(JsonResult.success(
"保存成功,但注册失败(重启服务后会自动加载): " + e.getMessage()
).toJsonObject());
} }
log.info("已注册演练场解析器: {} ({})", displayName, type);
promise.complete(JsonResult.success("保存并注册成功").toJsonObject());
}).onFailure(e -> { }).onFailure(e -> {
CustomParserRegistry.unregister(type);
log.error("保存解析器失败", e); log.error("保存解析器失败", e);
promise.complete(JsonResult.error("保存失败: " + e.getMessage()).toJsonObject()); promise.complete(JsonResult.error("保存失败: " + e.getMessage()).toJsonObject());
}); });
}).onFailure(e -> { }).onFailure(e -> {
log.error("获取解析器列表失败", e); log.error("检查解析器类型失败", e);
promise.complete(JsonResult.error("检查解析器失败: " + e.getMessage()).toJsonObject()); promise.complete(JsonResult.error("检查解析器失败: " + e.getMessage()).toJsonObject());
}); });
}).onFailure(e -> { }).onFailure(e -> {
@@ -557,6 +555,11 @@ public class PlaygroundApi {
return promise.future(); return promise.future();
} }
if (jsCode.length() > MAX_CODE_LENGTH) {
promise.complete(JsonResult.error("代码长度超过限制最大128KB当前长度: " + jsCode.length() + " 字节").toJsonObject());
return promise.future();
}
// 解析元数据 // 解析元数据
try { try {
var config = JsScriptMetadataParser.parseScript(jsCode); var config = JsScriptMetadataParser.parseScript(jsCode);
@@ -570,6 +573,7 @@ public class PlaygroundApi {
boolean enabled = body.getBoolean("enabled", true); boolean enabled = body.getBoolean("enabled", true);
JsonObject parser = new JsonObject(); JsonObject parser = new JsonObject();
parser.put("type", type);
parser.put("name", name); parser.put("name", name);
parser.put("displayName", displayName); parser.put("displayName", displayName);
parser.put("description", description); parser.put("description", description);
@@ -579,29 +583,73 @@ public class PlaygroundApi {
parser.put("jsCode", jsCode); parser.put("jsCode", jsCode);
parser.put("enabled", enabled); parser.put("enabled", enabled);
dbService.updatePlaygroundParser(id, parser).onSuccess(result -> { dbService.getPlaygroundParserById(id).onSuccess(oldResult -> {
// 更新成功后,重新注册解析器 String oldType = null;
try { if (oldResult.getBoolean("success", false) && oldResult.getJsonObject("data") != null) {
if (enabled) { oldType = oldResult.getJsonObject("data").getString("type");
// 先注销旧的(如果存在) } else {
CustomParserRegistry.unregister(type); promise.complete(JsonResult.error("解析器不存在").toJsonObject());
// 重新注册新的 return;
CustomParserRegistry.register(config);
log.info("已重新注册演练场解析器: {} ({})", displayName, type);
} else {
// 禁用时注销
CustomParserRegistry.unregister(type);
log.info("已注销演练场解析器: {}", type);
}
promise.complete(JsonResult.success("更新并重新注册成功").toJsonObject());
} catch (Exception e) {
log.error("重新注册解析器失败", e);
promise.complete(JsonResult.success(
"更新成功,但注册失败(重启服务后会自动加载): " + e.getMessage()
).toJsonObject());
} }
final String oldRegisteredType = oldType;
dbService.playgroundParserTypeExists(type, id).onSuccess(exists -> {
if (exists) {
promise.complete(JsonResult.error("解析器类型 " + type + " 已被其他解析器使用").toJsonObject());
return;
}
var oldConfig = CustomParserRegistry.get(oldRegisteredType);
boolean removedOldType = false;
boolean registeredNewType = false;
try {
if (StringUtils.isNotBlank(oldRegisteredType)) {
removedOldType = CustomParserRegistry.unregister(oldRegisteredType);
}
if (enabled) {
CustomParserRegistry.register(config);
registeredNewType = true;
}
} catch (Exception e) {
if (removedOldType && oldConfig != null) {
try {
CustomParserRegistry.register(oldConfig);
} catch (Exception restoreError) {
log.warn("恢复旧解析器注册失败: {}", oldRegisteredType, restoreError);
}
}
log.error("预注册解析器失败", e);
promise.complete(JsonResult.error("更新失败,注册解析器失败: " + e.getMessage()).toJsonObject());
return;
}
final boolean shouldRollbackNewType = registeredNewType;
final boolean shouldRestoreOldType = removedOldType;
final var oldParserConfig = oldConfig;
dbService.updatePlaygroundParser(id, parser).onSuccess(result -> {
if (!result.getBoolean("success", false)) {
rollbackParserRegistration(type, oldRegisteredType, shouldRollbackNewType,
shouldRestoreOldType, oldParserConfig);
promise.complete(JsonResult.error(result.getString("msg", "更新失败")).toJsonObject());
return;
}
if (!enabled) {
log.info("已注销演练场解析器: {}", oldRegisteredType);
} else {
log.info("已重新注册演练场解析器: {} ({})", displayName, type);
}
promise.complete(JsonResult.success("更新并重新注册成功").toJsonObject());
}).onFailure(e -> {
rollbackParserRegistration(type, oldRegisteredType, shouldRollbackNewType,
shouldRestoreOldType, oldParserConfig);
log.error("更新解析器失败", e);
promise.complete(JsonResult.error("更新失败: " + e.getMessage()).toJsonObject());
});
}).onFailure(e -> {
log.error("检查解析器类型失败", e);
promise.complete(JsonResult.error("更新失败: " + e.getMessage()).toJsonObject());
});
}).onFailure(e -> { }).onFailure(e -> {
log.error("更新解析器失败", e); log.error("获取旧解析器信息失败", e);
promise.complete(JsonResult.error("更新失败: " + e.getMessage()).toJsonObject()); promise.complete(JsonResult.error("更新失败: " + e.getMessage()).toJsonObject());
}); });
@@ -617,6 +665,24 @@ public class PlaygroundApi {
return promise.future(); return promise.future();
} }
private void rollbackParserRegistration(String newType, String oldType, boolean rollbackNewType,
boolean restoreOldType, cn.qaiu.parser.custom.CustomParserConfig oldConfig) {
if (rollbackNewType) {
try {
CustomParserRegistry.unregister(newType);
} catch (Exception rollbackError) {
log.warn("回滚新解析器注册失败: {}", newType, rollbackError);
}
}
if (restoreOldType && oldConfig != null) {
try {
CustomParserRegistry.register(oldConfig);
} catch (Exception restoreError) {
log.warn("恢复旧解析器注册失败: {}", oldType, restoreError);
}
}
}
/** /**
* 删除解析器 * 删除解析器
*/ */
@@ -695,5 +761,18 @@ public class PlaygroundApi {
} }
return ip; return ip;
} }
private String summarizeResult(Object result) {
if (result == null) {
return "null";
}
if (result instanceof String str) {
return "String(length=" + str.length() + ")";
}
if (result instanceof List<?> list) {
return "List(size=" + list.size() + ")";
}
return result.getClass().getSimpleName();
}
} }

View File

@@ -29,6 +29,8 @@ import lombok.extern.slf4j.Slf4j;
@RouteHandler("/") @RouteHandler("/")
public class ServerApi { public class ServerApi {
private static final String SKIP_CLIENT_LINKS = "_skipClientLinks";
private final CacheService cacheService = AsyncServiceUtil.getAsyncServiceInstance(CacheService.class); private final CacheService cacheService = AsyncServiceUtil.getAsyncServiceInstance(CacheService.class);
private final DbService dbService = AsyncServiceUtil.getAsyncServiceInstance(DbService.class); private final DbService dbService = AsyncServiceUtil.getAsyncServiceInstance(DbService.class);
@@ -38,16 +40,15 @@ public class ServerApi {
String url = URLParamUtil.parserParams(request); String url = URLParamUtil.parserParams(request);
// 构建 otherParam包含 UA 和解码后的认证参数 // 构建 otherParam包含 UA 和解码后的认证参数
JsonObject otherParam = buildOtherParam(request, auth); JsonObject otherParam = buildOtherParam(request, auth, true);
cacheService.getCachedByShareUrlAndPwd(url, pwd, otherParam) cacheService.getCachedByShareUrlAndPwd(url, pwd, otherParam)
.onSuccess(res -> ResponseUtil.redirect( .onSuccess(res -> ResponseUtil.redirect(
response.putHeader("nfd-cache-hit", res.getCacheHit().toString()) addCacheHeaders(response, res),
.putHeader("nfd-cache-expires", res.getExpires()),
res.getDirectLink(), promise)) res.getDirectLink(), promise))
.onFailure(t -> { .onFailure(t -> {
recordDonatedAccountFailureIfNeeded(otherParam, t); recordDonatedAccountFailureIfNeeded(otherParam, t);
promise.fail(t.fillInStackTrace()); promise.tryFail(t);
}); });
return promise.future(); return promise.future();
} }
@@ -60,6 +61,13 @@ public class ServerApi {
.onFailure(t -> recordDonatedAccountFailureIfNeeded(otherParam, t)); .onFailure(t -> recordDonatedAccountFailureIfNeeded(otherParam, t));
} }
public Future<CacheLinkInfo> parseJsonForRedirect(HttpServerRequest request, String pwd, String auth) {
String url = URLParamUtil.parserParams(request);
JsonObject otherParam = buildOtherParam(request, auth, true);
return cacheService.getCachedByShareUrlAndPwd(url, pwd, otherParam)
.onFailure(t -> recordDonatedAccountFailureIfNeeded(otherParam, t));
}
@RouteMapping(value = "/json/:type/:key", method = RouteMethod.GET) @RouteMapping(value = "/json/:type/:key", method = RouteMethod.GET)
public Future<CacheLinkInfo> parseKeyJson(HttpServerRequest request, String type, String key) { public Future<CacheLinkInfo> parseKeyJson(HttpServerRequest request, String type, String key) {
String pwd = ""; String pwd = "";
@@ -72,6 +80,18 @@ public class ServerApi {
return cacheService.getCachedByShareKeyAndPwd(type, key, pwd, JsonObject.of("UA",request.headers().get("user-agent"), "_requestOrigin", origin)); return cacheService.getCachedByShareKeyAndPwd(type, key, pwd, JsonObject.of("UA",request.headers().get("user-agent"), "_requestOrigin", origin));
} }
public Future<CacheLinkInfo> parseKeyJsonForRedirect(HttpServerRequest request, String type, String key) {
String pwd = "";
if (key.contains("@")) {
String[] keys = key.split("@");
key = keys[0];
pwd = keys[1];
}
String origin = resolveOrigin(request);
return cacheService.getCachedByShareKeyAndPwd(type, key, pwd,
JsonObject.of("UA", request.headers().get("user-agent"), "_requestOrigin", origin, SKIP_CLIENT_LINKS, true));
}
@RouteMapping(value = "/:type/:key", method = RouteMethod.GET) @RouteMapping(value = "/:type/:key", method = RouteMethod.GET)
public Future<Void> parseKey(HttpServerResponse response, HttpServerRequest request, String type, String key) { public Future<Void> parseKey(HttpServerResponse response, HttpServerRequest request, String type, String key) {
Promise<Void> promise = Promise.promise(); Promise<Void> promise = Promise.promise();
@@ -82,15 +102,23 @@ public class ServerApi {
pwd = keys[1]; pwd = keys[1];
} }
String origin = resolveOrigin(request); String origin = resolveOrigin(request);
cacheService.getCachedByShareKeyAndPwd(type, key, pwd, JsonObject.of("UA",request.headers().get("user-agent"), "_requestOrigin", origin)) cacheService.getCachedByShareKeyAndPwd(type, key, pwd,
JsonObject.of("UA", request.headers().get("user-agent"), "_requestOrigin", origin, SKIP_CLIENT_LINKS, true))
.onSuccess(res -> ResponseUtil.redirect( .onSuccess(res -> ResponseUtil.redirect(
response.putHeader("nfd-cache-hit", res.getCacheHit().toString()) addCacheHeaders(response, res),
.putHeader("nfd-cache-expires", res.getExpires()),
res.getDirectLink(), promise)) res.getDirectLink(), promise))
.onFailure(t -> promise.fail(t.fillInStackTrace())); .onFailure(promise::tryFail);
return promise.future(); return promise.future();
} }
private static HttpServerResponse addCacheHeaders(HttpServerResponse response, CacheLinkInfo cacheLinkInfo) {
if (response.ended() || response.closed()) {
return response;
}
return response.putHeader("nfd-cache-hit", cacheLinkInfo.getCacheHit().toString())
.putHeader("nfd-cache-expires", cacheLinkInfo.getExpires());
}
/** /**
* 解析请求来源地址,支持反向代理 * 解析请求来源地址,支持反向代理
*/ */
@@ -114,7 +142,14 @@ public class ServerApi {
* @return JsonObject * @return JsonObject
*/ */
private JsonObject buildOtherParam(HttpServerRequest request, String auth) { private JsonObject buildOtherParam(HttpServerRequest request, String auth) {
return buildOtherParam(request, auth, false);
}
private JsonObject buildOtherParam(HttpServerRequest request, String auth, boolean skipClientLinks) {
JsonObject otherParam = JsonObject.of("UA", request.headers().get("user-agent"), "_requestOrigin", resolveOrigin(request)); JsonObject otherParam = JsonObject.of("UA", request.headers().get("user-agent"), "_requestOrigin", resolveOrigin(request));
if (skipClientLinks) {
otherParam.put(SKIP_CLIENT_LINKS, true);
}
// 解码认证参数 // 解码认证参数
if (auth != null && !auth.isEmpty()) { if (auth != null && !auth.isEmpty()) {

View File

@@ -25,6 +25,11 @@ public interface DbService extends BaseAsyncService {
*/ */
Future<JsonObject> getPlaygroundParserList(); Future<JsonObject> getPlaygroundParserList();
/**
* 获取启动时需要注册的已启用演练场解析器
*/
Future<JsonObject> getEnabledPlaygroundParsersForLoad();
/** /**
* 保存演练场解析器 * 保存演练场解析器
*/ */
@@ -45,6 +50,11 @@ public interface DbService extends BaseAsyncService {
*/ */
Future<Integer> getPlaygroundParserCount(); Future<Integer> getPlaygroundParserCount();
/**
* 检查演练场解析器类型是否已存在
*/
Future<Boolean> playgroundParserTypeExists(String type, Long excludeId);
/** /**
* 根据ID获取演练场解析器 * 根据ID获取演练场解析器
*/ */

View File

@@ -27,6 +27,8 @@ import java.util.Map;
@Slf4j @Slf4j
public class CacheServiceImpl implements CacheService { public class CacheServiceImpl implements CacheService {
private static final String SKIP_CLIENT_LINKS = "_skipClientLinks";
private final CacheManager cacheManager = new CacheManager(); private final CacheManager cacheManager = new CacheManager();
static { static {
@@ -62,10 +64,14 @@ public class CacheServiceImpl implements CacheService {
try { try {
tool = parserCreate.createTool(); tool = parserCreate.createTool();
} catch (Exception e) { } catch (Exception e) {
promise.fail(e.getCause().getCause()); Throwable cause = e;
while (cause.getCause() != null) {
cause = cause.getCause();
}
promise.fail(cause);
return; return;
} }
tool.parse().onSuccess(redirectUrl -> { IPanTool.closeAfter(tool, tool::parse).onSuccess(redirectUrl -> {
// 使用 effectiveCacheDuration // 使用 effectiveCacheDuration
long expires = System.currentTimeMillis() + effectiveCacheDuration * 60 * 1000L; long expires = System.currentTimeMillis() + effectiveCacheDuration * 60 * 1000L;
result.setDirectLink(redirectUrl); result.setDirectLink(redirectUrl);
@@ -74,7 +80,7 @@ public class CacheServiceImpl implements CacheService {
result.setExpires(generateDate(expires)); result.setExpires(generateDate(expires));
// 调试日志检查解析器返回的otherParam // 调试日志检查解析器返回的otherParam
log.info("[解析完成] shareKey={}, otherParam.keys={}, hasFileInfo={}", log.debug("[解析完成] shareKey={}, otherParam.keys={}, hasFileInfo={}",
cacheKey, cacheKey,
shareLinkInfo.getOtherParam().keySet(), shareLinkInfo.getOtherParam().keySet(),
shareLinkInfo.getOtherParam().containsKey("fileInfo")); shareLinkInfo.getOtherParam().containsKey("fileInfo"));
@@ -90,17 +96,19 @@ public class CacheServiceImpl implements CacheService {
FileInfo fileInfo = (FileInfo) shareLinkInfo.getOtherParam().get("fileInfo"); FileInfo fileInfo = (FileInfo) shareLinkInfo.getOtherParam().get("fileInfo");
result.setFileInfo(fileInfo); result.setFileInfo(fileInfo);
cacheLinkInfo.setFileInfo(fileInfo); cacheLinkInfo.setFileInfo(fileInfo);
log.info("[设置文件信息] shareKey={}, fileName={}, size={}", log.debug("[设置文件信息] shareKey={}, fileName={}, size={}",
cacheKey, fileInfo.getFileName(), fileInfo.getSize()); cacheKey, fileInfo.getFileName(), fileInfo.getSize());
} catch (Exception e) { } catch (Exception e) {
log.error("文件对象转换异常: shareKey={}", cacheKey, e); log.error("文件对象转换异常: shareKey={}", cacheKey, e);
} }
} else { } else {
log.warn("[文件信息缺失] 解析器未返回fileInfo: shareKey={}, otherParam.keys={}", log.debug("[文件信息缺失] 解析器未返回fileInfo: shareKey={}, otherParam.keys={}",
cacheKey, shareLinkInfo.getOtherParam().keySet()); cacheKey, shareLinkInfo.getOtherParam().keySet());
} }
// 传递 downloadHeaders 并生成下载命令 if (shouldGenerateClientLinks(shareLinkInfo)) {
processDownloadHeaders(shareLinkInfo, cacheLinkInfo, result); // 传递 downloadHeaders 并生成下载命令
processDownloadHeaders(shareLinkInfo, cacheLinkInfo, result);
}
promise.complete(result); promise.complete(result);
// 更新缓存 // 更新缓存
cacheManager.cacheShareLink(cacheLinkInfo); cacheManager.cacheShareLink(cacheLinkInfo);
@@ -110,19 +118,21 @@ public class CacheServiceImpl implements CacheService {
// 缓存命中,生成过期时间并生成下载命令 // 缓存命中,生成过期时间并生成下载命令
result.setExpires(generateDate(result.getExpiration())); result.setExpires(generateDate(result.getExpiration()));
// 初始化 otherParam如果为空 if (shouldGenerateClientLinks(shareLinkInfo)) {
if (result.getOtherParam() == null) { // 初始化 otherParam如果为空
result.setOtherParam(new HashMap<>()); if (result.getOtherParam() == null) {
result.setOtherParam(new HashMap<>());
}
// 生成下载命令aria2、curl
generateDownloadCommands(result);
} }
// 生成下载命令aria2、curl
generateDownloadCommands(result);
promise.complete(result); promise.complete(result);
cacheManager.updateTotalByField(cacheKey, CacheTotalField.CACHE_HIT_TOTAL) cacheManager.updateTotalByField(cacheKey, CacheTotalField.CACHE_HIT_TOTAL)
.onFailure(e -> log.error("更新缓存命中计数失败: cacheKey={}", cacheKey, e)); .onFailure(e -> log.error("更新缓存命中计数失败: cacheKey={}", cacheKey, e));
} }
}).onFailure(t -> promise.fail(t.fillInStackTrace())); }).onFailure(promise::tryFail);
return promise.future(); return promise.future();
} }
@@ -131,6 +141,13 @@ public class CacheServiceImpl implements CacheService {
return DateFormatUtils.format(new Date(ts), "yyyy-MM-dd HH:mm:ss"); return DateFormatUtils.format(new Date(ts), "yyyy-MM-dd HH:mm:ss");
} }
private boolean shouldGenerateClientLinks(ShareLinkInfo shareLinkInfo) {
if (shareLinkInfo == null || shareLinkInfo.getOtherParam() == null) {
return true;
}
return !Boolean.TRUE.equals(shareLinkInfo.getOtherParam().get(SKIP_CLIENT_LINKS));
}
/** /**
* 处理下载请求头并生成下载命令 * 处理下载请求头并生成下载命令
* 从 shareLinkInfo 中提取 downloadHeaders传递到 cacheLinkInfo 和 result * 从 shareLinkInfo 中提取 downloadHeaders传递到 cacheLinkInfo 和 result
@@ -147,7 +164,7 @@ public class CacheServiceImpl implements CacheService {
Map<String, String> headers = (Map<String, String>) shareLinkInfo.getOtherParam().get("downloadHeaders"); Map<String, String> headers = (Map<String, String>) shareLinkInfo.getOtherParam().get("downloadHeaders");
if (headers != null) { if (headers != null) {
downloadHeaders = headers; downloadHeaders = headers;
log.info("从shareLinkInfo提取downloadHeaders: shareKey={}, 请求头数量={}", log.debug("从shareLinkInfo提取downloadHeaders: shareKey={}, 请求头数量={}",
cacheLinkInfo.getShareKey(), downloadHeaders.size()); cacheLinkInfo.getShareKey(), downloadHeaders.size());
} }
} }
@@ -255,14 +272,24 @@ public class CacheServiceImpl implements CacheService {
@Override @Override
public Future<CacheLinkInfo> getCachedByShareKeyAndPwd(String type, String shareKey, String pwd, JsonObject otherParam) { public Future<CacheLinkInfo> getCachedByShareKeyAndPwd(String type, String shareKey, String pwd, JsonObject otherParam) {
ParserCreate parserCreate = ParserCreate.fromType(type).shareKey(shareKey).setShareLinkInfoPwd(pwd); ParserCreate parserCreate;
try {
parserCreate = ParserCreate.fromType(type).shareKey(shareKey).setShareLinkInfoPwd(pwd);
} catch (Exception e) {
return Future.failedFuture(e);
}
parserCreate.getShareLinkInfo().getOtherParam().putAll(otherParam.getMap()); parserCreate.getShareLinkInfo().getOtherParam().putAll(otherParam.getMap());
return getAndSaveCachedShareLink(parserCreate); return getAndSaveCachedShareLink(parserCreate);
} }
@Override @Override
public Future<CacheLinkInfo> getCachedByShareUrlAndPwd(String shareUrl, String pwd, JsonObject otherParam) { public Future<CacheLinkInfo> getCachedByShareUrlAndPwd(String shareUrl, String pwd, JsonObject otherParam) {
ParserCreate parserCreate = ParserCreate.fromShareUrl(shareUrl).setShareLinkInfoPwd(pwd); ParserCreate parserCreate;
try {
parserCreate = ParserCreate.fromShareUrl(shareUrl).setShareLinkInfoPwd(pwd);
} catch (Exception e) {
return Future.failedFuture(e);
}
parserCreate.getShareLinkInfo().getOtherParam().putAll(otherParam.getMap()); parserCreate.getShareLinkInfo().getOtherParam().putAll(otherParam.getMap());
// 检查是否有临时认证参数 // 检查是否有临时认证参数

View File

@@ -85,33 +85,18 @@ public class DbServiceImpl implements DbService {
public Future<JsonObject> getPlaygroundParserList() { public Future<JsonObject> getPlaygroundParserList() {
JDBCPool client = JDBCPoolInit.instance().getPool(); JDBCPool client = JDBCPoolInit.instance().getPool();
Promise<JsonObject> promise = Promise.promise(); Promise<JsonObject> promise = Promise.promise();
String sql = "SELECT * FROM playground_parser ORDER BY create_time DESC"; String sql = """
SELECT id, name, type, display_name, description, author, version,
match_pattern, ip, create_time, update_time, enabled
FROM playground_parser
ORDER BY create_time DESC
LIMIT 100
""";
client.query(sql).execute().onSuccess(rows -> { client.query(sql).execute().onSuccess(rows -> {
List<JsonObject> list = new ArrayList<>(); List<JsonObject> list = new ArrayList<>();
for (Row row : rows) { for (Row row : rows) {
JsonObject parser = new JsonObject(); list.add(toPlaygroundParserJson(row, false));
parser.put("id", row.getLong("id"));
parser.put("name", row.getString("name"));
parser.put("type", row.getString("type"));
parser.put("displayName", row.getString("display_name"));
parser.put("description", row.getString("description"));
parser.put("author", row.getString("author"));
parser.put("version", row.getString("version"));
parser.put("matchPattern", row.getString("match_pattern"));
parser.put("jsCode", row.getString("js_code"));
parser.put("ip", row.getString("ip"));
// 将LocalDateTime转换为字符串格式避免序列化为数组
var createTime = row.getLocalDateTime("create_time");
if (createTime != null) {
parser.put("createTime", createTime.toString().replace("T", " "));
}
var updateTime = row.getLocalDateTime("update_time");
if (updateTime != null) {
parser.put("updateTime", updateTime.toString().replace("T", " "));
}
parser.put("enabled", row.getBoolean("enabled"));
list.add(parser);
} }
promise.complete(JsonResult.data(list).toJsonObject()); promise.complete(JsonResult.data(list).toJsonObject());
}).onFailure(e -> { }).onFailure(e -> {
@@ -122,6 +107,33 @@ public class DbServiceImpl implements DbService {
return promise.future(); return promise.future();
} }
@Override
public Future<JsonObject> getEnabledPlaygroundParsersForLoad() {
JDBCPool client = JDBCPoolInit.instance().getPool();
Promise<JsonObject> promise = Promise.promise();
String sql = """
SELECT id, name, type, display_name, description, author, version,
match_pattern, js_code, ip, create_time, update_time, enabled
FROM playground_parser
WHERE enabled = TRUE
ORDER BY update_time DESC, create_time DESC
LIMIT 100
""";
client.query(sql).execute().onSuccess(rows -> {
List<JsonObject> list = new ArrayList<>();
for (Row row : rows) {
list.add(toPlaygroundParserJson(row, true));
}
promise.complete(JsonResult.data(list).toJsonObject());
}).onFailure(e -> {
log.error("getEnabledPlaygroundParsersForLoad failed", e);
promise.fail(e);
});
return promise.future();
}
@Override @Override
public Future<JsonObject> savePlaygroundParser(JsonObject parser) { public Future<JsonObject> savePlaygroundParser(JsonObject parser) {
JDBCPool client = JDBCPoolInit.instance().getPool(); JDBCPool client = JDBCPoolInit.instance().getPool();
@@ -164,13 +176,14 @@ public class DbServiceImpl implements DbService {
String sql = """ String sql = """
UPDATE playground_parser UPDATE playground_parser
SET name = ?, display_name = ?, description = ?, author = ?, SET type = ?, name = ?, display_name = ?, description = ?, author = ?,
version = ?, match_pattern = ?, js_code = ?, update_time = NOW(), enabled = ? version = ?, match_pattern = ?, js_code = ?, update_time = NOW(), enabled = ?
WHERE id = ? WHERE id = ?
"""; """;
client.preparedQuery(sql) client.preparedQuery(sql)
.execute(Tuple.of( .execute(Tuple.of(
parser.getString("type"),
parser.getString("name"), parser.getString("name"),
parser.getString("displayName"), parser.getString("displayName"),
parser.getString("description"), parser.getString("description"),
@@ -182,6 +195,10 @@ public class DbServiceImpl implements DbService {
id id
)) ))
.onSuccess(res -> { .onSuccess(res -> {
if (res.rowCount() == 0) {
promise.complete(JsonResult.error("解析器不存在").toJsonObject());
return;
}
promise.complete(JsonResult.success("更新成功").toJsonObject()); promise.complete(JsonResult.success("更新成功").toJsonObject());
}) })
.onFailure(e -> { .onFailure(e -> {
@@ -230,6 +247,27 @@ public class DbServiceImpl implements DbService {
return promise.future(); return promise.future();
} }
@Override
public Future<Boolean> playgroundParserTypeExists(String type, Long excludeId) {
JDBCPool client = JDBCPoolInit.instance().getPool();
Promise<Boolean> promise = Promise.promise();
String sql = excludeId == null
? "SELECT COUNT(*) as count FROM playground_parser WHERE type = ?"
: "SELECT COUNT(*) as count FROM playground_parser WHERE type = ? AND id <> ?";
Tuple params = excludeId == null ? Tuple.of(type) : Tuple.of(type, excludeId);
client.preparedQuery(sql).execute(params).onSuccess(rows -> {
Integer count = rows.iterator().next().getInteger("count");
promise.complete(count != null && count > 0);
}).onFailure(e -> {
log.error("playgroundParserTypeExists failed", e);
promise.fail(e);
});
return promise.future();
}
@Override @Override
public Future<JsonObject> getPlaygroundParserById(Long id) { public Future<JsonObject> getPlaygroundParserById(Long id) {
JDBCPool client = JDBCPoolInit.instance().getPool(); JDBCPool client = JDBCPoolInit.instance().getPool();
@@ -242,28 +280,7 @@ public class DbServiceImpl implements DbService {
.onSuccess(rows -> { .onSuccess(rows -> {
if (rows.size() > 0) { if (rows.size() > 0) {
Row row = rows.iterator().next(); Row row = rows.iterator().next();
JsonObject parser = new JsonObject(); promise.complete(JsonResult.data(toPlaygroundParserJson(row, true)).toJsonObject());
parser.put("id", row.getLong("id"));
parser.put("name", row.getString("name"));
parser.put("type", row.getString("type"));
parser.put("displayName", row.getString("display_name"));
parser.put("description", row.getString("description"));
parser.put("author", row.getString("author"));
parser.put("version", row.getString("version"));
parser.put("matchPattern", row.getString("match_pattern"));
parser.put("jsCode", row.getString("js_code"));
parser.put("ip", row.getString("ip"));
// 将LocalDateTime转换为字符串格式避免序列化为数组
var createTime = row.getLocalDateTime("create_time");
if (createTime != null) {
parser.put("createTime", createTime.toString().replace("T", " "));
}
var updateTime = row.getLocalDateTime("update_time");
if (updateTime != null) {
parser.put("updateTime", updateTime.toString().replace("T", " "));
}
parser.put("enabled", row.getBoolean("enabled"));
promise.complete(JsonResult.data(parser).toJsonObject());
} else { } else {
promise.fail("解析器不存在"); promise.fail("解析器不存在");
} }
@@ -276,6 +293,32 @@ public class DbServiceImpl implements DbService {
return promise.future(); return promise.future();
} }
private JsonObject toPlaygroundParserJson(Row row, boolean includeJsCode) {
JsonObject parser = new JsonObject();
parser.put("id", row.getLong("id"));
parser.put("name", row.getString("name"));
parser.put("type", row.getString("type"));
parser.put("displayName", row.getString("display_name"));
parser.put("description", row.getString("description"));
parser.put("author", row.getString("author"));
parser.put("version", row.getString("version"));
parser.put("matchPattern", row.getString("match_pattern"));
if (includeJsCode) {
parser.put("jsCode", row.getString("js_code"));
}
parser.put("ip", row.getString("ip"));
var createTime = row.getLocalDateTime("create_time");
if (createTime != null) {
parser.put("createTime", createTime.toString().replace("T", " "));
}
var updateTime = row.getLocalDateTime("update_time");
if (updateTime != null) {
parser.put("updateTime", updateTime.toString().replace("T", " "));
}
parser.put("enabled", row.getBoolean("enabled"));
return parser;
}
// ========== 捐赠账号相关 ========== // ========== 捐赠账号相关 ==========
@Override @Override

View File

@@ -3,9 +3,11 @@ package cn.qaiu.lz.web.service.impl;
import cn.qaiu.db.pool.JDBCPoolInit; import cn.qaiu.db.pool.JDBCPoolInit;
import cn.qaiu.lz.web.service.ShoutService; import cn.qaiu.lz.web.service.ShoutService;
import cn.qaiu.vx.core.annotaions.Service; import cn.qaiu.vx.core.annotaions.Service;
import cn.qaiu.vx.core.util.VertxHolder;
import cn.qaiu.vx.core.model.JsonResult; import cn.qaiu.vx.core.model.JsonResult;
import io.vertx.core.Future; import io.vertx.core.Future;
import io.vertx.core.Promise; import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject; import io.vertx.core.json.JsonObject;
import io.vertx.jdbcclient.JDBCPool; import io.vertx.jdbcclient.JDBCPool;
import io.vertx.sqlclient.Tuple; import io.vertx.sqlclient.Tuple;
@@ -14,6 +16,8 @@ import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneId; import java.time.ZoneId;
import java.util.Random; import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j @Slf4j
@Service @Service
@@ -21,10 +25,18 @@ import java.util.Random;
public class ShoutServiceImpl implements ShoutService { public class ShoutServiceImpl implements ShoutService {
private static final int CODE_LENGTH = 6; private static final int CODE_LENGTH = 6;
private static final int EXPIRE_HOURS = 24; private static final int EXPIRE_HOURS = 24;
private static final long CLEANUP_INTERVAL_MILLIS = 3600_000L;
private static final long CLEANUP_SHUTDOWN_WAIT_MILLIS = 5_000L;
private static final AtomicBoolean CLEANUP_REGISTERED = new AtomicBoolean(false);
private static final AtomicInteger CLEANUP_IN_FLIGHT = new AtomicInteger(0);
private static final Object CLEANUP_MONITOR = new Object();
private static volatile Long cleanupTimerId;
private static volatile Vertx cleanupVertx;
private final JDBCPool jdbcPool = JDBCPoolInit.instance().getPool(); private final JDBCPool jdbcPool = JDBCPoolInit.instance().getPool();
@Override @Override
public Future<String> submitMessage(String content, String host) { public Future<String> submitMessage(String content, String host) {
registerCleanup();
Promise<String> promise = Promise.promise(); Promise<String> promise = Promise.promise();
String code = generateRandomCode(); String code = generateRandomCode();
// 判断一下当前code是否存在消息 // 判断一下当前code是否存在消息
@@ -50,6 +62,7 @@ public class ShoutServiceImpl implements ShoutService {
@Override @Override
public Future<JsonObject> retrieveMessage(String code) { public Future<JsonObject> retrieveMessage(String code) {
registerCleanup();
Promise<JsonObject> promise = Promise.promise(); Promise<JsonObject> promise = Promise.promise();
String sql = "SELECT content FROM t_messages WHERE code = ? AND expire_time > NOW()"; String sql = "SELECT content FROM t_messages WHERE code = ? AND expire_time > NOW()";
@@ -79,6 +92,109 @@ public class ShoutServiceImpl implements ShoutService {
jdbcPool.preparedQuery(sql).execute(Tuple.of(code)); jdbcPool.preparedQuery(sql).execute(Tuple.of(code));
} }
private static void registerCleanup() {
if (!CLEANUP_REGISTERED.compareAndSet(false, true)) {
return;
}
try {
Vertx vertx = VertxHolder.getVertxInstance();
cleanupVertx = vertx;
cleanupTimerId = vertx.setPeriodic(CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS,
id -> cleanupExpiredMessages());
cleanupExpiredMessages();
} catch (Exception e) {
cleanupTimerId = null;
cleanupVertx = null;
CLEANUP_REGISTERED.set(false);
log.warn("注册消息清理任务失败", e);
}
}
public static void cancelCleanup() {
Long timerId = cleanupTimerId;
Vertx vertx = cleanupVertx;
cleanupTimerId = null;
cleanupVertx = null;
CLEANUP_REGISTERED.set(false);
if (timerId == null || vertx == null) {
waitForCleanupToFinish();
return;
}
try {
if (vertx.cancelTimer(timerId)) {
log.info("消息定时清理任务已取消");
}
} catch (Exception e) {
log.warn("取消消息定时清理任务失败", e);
}
waitForCleanupToFinish();
}
private static void cleanupExpiredMessages() {
cleanupStarted();
boolean asyncCleanupStarted = false;
try {
if (!CLEANUP_REGISTERED.get()) {
return;
}
JDBCPoolInit poolInit = JDBCPoolInit.instance();
if (poolInit == null || poolInit.getPool() == null) {
log.debug("数据库连接池未就绪,跳过消息定时清理");
return;
}
JDBCPool pool = poolInit.getPool();
String sql = "DELETE FROM t_messages WHERE expire_time < NOW()";
Future<io.vertx.sqlclient.RowSet<io.vertx.sqlclient.Row>> cleanupFuture = pool.query(sql).execute();
asyncCleanupStarted = true;
cleanupFuture.onSuccess(res -> {
if (res.rowCount() > 0) {
log.info("清理过期消息 {} 条", res.rowCount());
}
})
.onFailure(e -> log.warn("清理过期消息失败", e))
.onComplete(ar -> cleanupFinished());
} catch (Exception e) {
log.warn("清理过期消息任务启动失败", e);
} finally {
if (!asyncCleanupStarted) {
cleanupFinished();
}
}
}
private static void cleanupStarted() {
CLEANUP_IN_FLIGHT.incrementAndGet();
}
private static void cleanupFinished() {
if (CLEANUP_IN_FLIGHT.decrementAndGet() <= 0) {
synchronized (CLEANUP_MONITOR) {
CLEANUP_MONITOR.notifyAll();
}
}
}
private static void waitForCleanupToFinish() {
long deadline = System.currentTimeMillis() + CLEANUP_SHUTDOWN_WAIT_MILLIS;
synchronized (CLEANUP_MONITOR) {
while (CLEANUP_IN_FLIGHT.get() > 0) {
long waitMillis = deadline - System.currentTimeMillis();
if (waitMillis <= 0) {
log.warn("等待消息定时清理结束超时,剩余任务数: {}", CLEANUP_IN_FLIGHT.get());
return;
}
try {
CLEANUP_MONITOR.wait(waitMillis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("等待消息定时清理结束被中断");
return;
}
}
}
}
private String generateRandomCode() { private String generateRandomCode() {
Random random = new Random(); Random random = new Random();
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();

View File

@@ -17,7 +17,7 @@ proxyConf: server-proxy
# JS演练场配置 # JS演练场配置
playground: playground:
# 是否启用演练场默认false不启用 # 是否启用演练场默认false不启用
enabled: true enabled: false
# 公开模式默认false需要密码访问设为true则无需密码 # 公开模式默认false需要密码访问设为true则无需密码
public: false public: false
# 访问密码,建议修改默认密码! # 访问密码,建议修改默认密码!
@@ -86,12 +86,12 @@ cache:
ct: 30 # 城通网盘 ct: 30 # 城通网盘
ec: 5 # 移动云空间 ec: 5 # 移动云空间
fc: # 亿方云 fc: # 亿方云
fj: 20 # 小飞机网盘 fj: 20 # 小飞机网盘 链接有效期约30分钟
fs: # 飞书云盘 fs: # 飞书云盘
iz: 20 # 蓝奏云优享 iz: 20 # 蓝奏云优享 链接有效期约30分钟
kd: # 可道云 kd: # 可道云
le: 2879 # 联想乐云 le: 2879 # 联想乐云
lz: 20 # 蓝奏云 lz: 30 # 蓝奏云 链接有效期约45分钟
other: # 其他网盘 other: # 其他网盘
p115: 30 # 115网盘 p115: 30 # 115网盘
pdb: # Dropbox pdb: # Dropbox

View File

@@ -37,10 +37,10 @@
<!-- 将文件输出设置成异步输出 --> <!-- 将文件输出设置成异步输出 -->
<appender name="ASYNC-FILE" class="ch.qos.logback.classic.AsyncAppender"> <appender name="ASYNC-FILE" class="ch.qos.logback.classic.AsyncAppender">
<!-- 不丢失日志.默认的,如果队列的80%已满,则会丢弃TRACT、DEBUGINFO级别日志 --> <!-- 队列剩余 20% 时开始丢弃 TRACE/DEBUG/INFO 级别日志,避免阻塞调用线程 -->
<discardingThreshold>0</discardingThreshold> <discardingThreshold>20</discardingThreshold>
<!-- 更改默认的队列的深度,该值会影响性能.默认值为256 --> <!-- 更改默认的队列的深度,该值会影响性能.默认值为256 -->
<queueSize>256</queueSize> <queueSize>512</queueSize>
<!-- 添加附加的appender,最多只能添加一个 --> <!-- 添加附加的appender,最多只能添加一个 -->
<appender-ref ref="FILE"/> <appender-ref ref="FILE"/>
</appender> </appender>
@@ -55,7 +55,7 @@
<logger name="io.netty" level="warn"/> <logger name="io.netty" level="warn"/>
<logger name="io.vertx" level="info"/> <logger name="io.vertx" level="info"/>
<logger name="com.zaxxer.hikari" level="info"/> <logger name="com.zaxxer.hikari" level="info"/>
<logger name="cn.qaiu" level="debug"/> <logger name="cn.qaiu" level="${NFD_LOG_LEVEL:-info}"/>
<root level="info"> <root level="info">
<appender-ref ref="STDOUT"/> <appender-ref ref="STDOUT"/>
<!-- <appender-ref ref="FILE"/>--> <!-- <appender-ref ref="FILE"/>-->