diff --git a/web-service/src/main/java/cn/qaiu/lz/AppMain.java b/web-service/src/main/java/cn/qaiu/lz/AppMain.java index d60090c..c3e0df0 100644 --- a/web-service/src/main/java/cn/qaiu/lz/AppMain.java +++ b/web-service/src/main/java/cn/qaiu/lz/AppMain.java @@ -3,9 +3,11 @@ package cn.qaiu.lz; import cn.qaiu.WebClientVertxInit; import cn.qaiu.db.pool.JDBCPoolInit; import cn.qaiu.lz.common.cache.CacheConfigLoader; +import cn.qaiu.lz.common.cache.CacheManager; import cn.qaiu.lz.common.interceptorImpl.RateLimiter; import cn.qaiu.lz.web.config.PlaygroundConfig; import cn.qaiu.lz.web.service.DbService; +import cn.qaiu.lz.web.service.impl.ShoutServiceImpl; import cn.qaiu.parser.custom.CustomParserConfig; import cn.qaiu.parser.custom.CustomParserRegistry; import cn.qaiu.parser.customjs.JsScriptMetadataParser; @@ -20,6 +22,7 @@ import io.vertx.core.json.jackson.DatabindCodec; import io.vertx.core.shareddata.LocalMap; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.time.DateFormatUtils; +import org.slf4j.LoggerFactory; import java.nio.file.Files; import java.nio.file.Path; @@ -38,22 +41,45 @@ import static cn.qaiu.vx.core.util.ConfigConstant.LOCAL; public class AppMain { public static void main(String[] args) { - // 先注册 ShutdownHook(JVM 逆序执行,先注册的后执行) - // 确保关闭顺序:Vert.x -> JDBCPoolInit -> JsParserExecutor - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - try { - JDBCPoolInit.instance().close(); - } catch (Exception e) { - // ignore - } - try { - cn.qaiu.parser.customjs.JsParserExecutor.shutdownExecutor(); - } catch (Exception e) { - // ignore - } - })); + applyRuntimeLogLevelOverride(); + Deploy deploy = Deploy.instance(); + // 先阻断应用级定时任务,再让 Vert.x 停入口和 verticle。 + deploy.addPreShutdownTask(CacheManager::cancelPeriodicCleanup); + deploy.addPreShutdownTask(ShoutServiceImpl::cancelCleanup); + // Vert.x 停完后再关数据库和解析器共享资源,避免请求还在路上就先关底层 client。 + deploy.addPostShutdownTask(() -> JDBCPoolInit.instance().close()); + deploy.addPostShutdownTask(cn.qaiu.parser.customjs.JsParserExecutor::shutdownExecutor); + deploy.addPostShutdownTask(cn.qaiu.parser.customjs.JsPlaygroundExecutor::shutdownPools); + deploy.addPostShutdownTask(cn.qaiu.parser.customjs.JsHttpClient::shutdownSharedClient); + deploy.addPostShutdownTask(cn.qaiu.parser.PanBase::shutdownSharedClients); + deploy.addPostShutdownTask(cn.qaiu.parser.IPanTool::shutdownCloseAfterScheduler); + deploy.addPostShutdownTask(cn.qaiu.parser.impl.PodTool::shutdownWorkerExecutor); // start - Deploy.instance().start(args, AppMain::exec); + deploy.start(args, AppMain::exec); + } + + private static void applyRuntimeLogLevelOverride() { + String levelName = System.getProperty("NFD_LOG_LEVEL"); + if (levelName == null || levelName.isBlank()) { + levelName = System.getenv("NFD_LOG_LEVEL"); + } + if (levelName == null || levelName.isBlank()) { + return; + } + try { + var level = ch.qos.logback.classic.Level.toLevel(levelName, null); + if (level == null) { + log.warn("忽略无效的 NFD_LOG_LEVEL: {}", levelName); + return; + } + var logger = LoggerFactory.getLogger("cn.qaiu"); + if (logger instanceof ch.qos.logback.classic.Logger logbackLogger) { + logbackLogger.setLevel(level); + log.info("cn.qaiu 日志级别已覆盖为 {}", level); + } + } catch (Exception e) { + log.warn("覆盖 cn.qaiu 日志级别失败: {}", e.getMessage()); + } } /** @@ -65,6 +91,8 @@ public class AppMain { private static void exec(JsonObject jsonObject) { WebClientVertxInit.init(VertxHolder.getVertxInstance()); DatabindCodec.mapper().registerModule(new JavaTimeModule()); + // 演练场配置要先加载,后续启动流程才能按开关决定是否注册动态解析器。 + PlaygroundConfig.loadFromJson(jsonObject); // 限流 if (jsonObject.containsKey("rateLimit")) { JsonObject rateLimit = jsonObject.getJsonObject("rateLimit"); @@ -108,7 +136,12 @@ public class AppMain { } catch (Exception e) { log.warn("读取代理配置失败,使用默认页面地址: {}", e.getMessage()); } - loadPlaygroundParsers(pageAddr); + if (PlaygroundConfig.getInstance().isEnabled()) { + loadPlaygroundParsers(pageAddr); + } else { + log.info("演练场功能已禁用,跳过加载演练场解析器"); + log.info("服务已启动,可通过 {} 访问页面", pageAddr); + } System.out.println("启动成功: \n本地服务地址: " + addr); }); }); @@ -142,9 +175,6 @@ public class AppMain { JsonObject auths = jsonObject.getJsonObject(ConfigConstant.AUTHS); localMap.put(ConfigConstant.AUTHS, auths); } - - // 演练场配置 - PlaygroundConfig.loadFromJson(jsonObject); } /** @@ -153,34 +183,31 @@ public class AppMain { private static void loadPlaygroundParsers(String accessAddr) { DbService dbService = AsyncServiceUtil.getAsyncServiceInstance(DbService.class); - dbService.getPlaygroundParserList().onSuccess(result -> { + dbService.getEnabledPlaygroundParsersForLoad().onSuccess(result -> { JsonArray parsers = result.getJsonArray("data"); if (parsers != null) { int loadedCount = 0; for (int i = 0; i < parsers.size(); i++) { JsonObject parser = parsers.getJsonObject(i); - // 只注册已启用的解析器 - if (parser.getBoolean("enabled", false)) { - try { - String jsCode = parser.getString("jsCode"); - if (jsCode == null || jsCode.trim().isEmpty()) { - log.error("加载演练场解析器失败: {} - JavaScript代码为空", parser.getString("name")); - continue; - } - CustomParserConfig config = JsScriptMetadataParser.parseScript(jsCode); - CustomParserRegistry.register(config); - loadedCount++; - log.info("已加载演练场解析器: {} ({})", - config.getDisplayName(), config.getType()); - } catch (Exception e) { - String parserName = parser.getString("name"); - String errorMsg = e.getMessage(); - log.error("加载演练场解析器失败: {} - {}", parserName, errorMsg, e); - // 如果是require相关错误,提供更详细的提示 - if (errorMsg != null && errorMsg.contains("require")) { - log.error("提示:演练场解析器不支持CommonJS模块系统(require),请确保代码使用ES5.1语法"); - } + try { + String jsCode = parser.getString("jsCode"); + if (jsCode == null || jsCode.trim().isEmpty()) { + log.error("加载演练场解析器失败: {} - JavaScript代码为空", parser.getString("name")); + continue; + } + CustomParserConfig config = JsScriptMetadataParser.parseScript(jsCode); + CustomParserRegistry.register(config); + loadedCount++; + log.info("已加载演练场解析器: {} ({})", + config.getDisplayName(), config.getType()); + } catch (Exception e) { + String parserName = parser.getString("name"); + String errorMsg = e.getMessage(); + log.error("加载演练场解析器失败: {} - {}", parserName, errorMsg, e); + // 如果是require相关错误,提供更详细的提示 + if (errorMsg != null && errorMsg.contains("require")) { + log.error("提示:演练场解析器不支持CommonJS模块系统(require),请确保代码使用ES5.1语法"); } } } diff --git a/web-service/src/main/java/cn/qaiu/lz/common/cache/CacheManager.java b/web-service/src/main/java/cn/qaiu/lz/common/cache/CacheManager.java index 947670d..df447ab 100644 --- a/web-service/src/main/java/cn/qaiu/lz/common/cache/CacheManager.java +++ b/web-service/src/main/java/cn/qaiu/lz/common/cache/CacheManager.java @@ -5,8 +5,10 @@ import cn.qaiu.db.pool.JDBCType; import cn.qaiu.lz.web.model.CacheLinkInfo; import cn.qaiu.lz.web.model.PanFileInfo; import cn.qaiu.lz.web.model.PanFileInfoRowMapper; +import cn.qaiu.vx.core.util.VertxHolder; import io.vertx.core.Future; import io.vertx.core.Promise; +import io.vertx.core.Vertx; import io.vertx.core.json.JsonObject; import io.vertx.sqlclient.Pool; import io.vertx.sqlclient.Row; @@ -18,14 +20,24 @@ import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; public class CacheManager { + private static final int MAX_SHARE_KEY_LENGTH = 1024; + private final Pool jdbcPool = JDBCPoolInit.instance().getPool(); private final JDBCType jdbcType = JDBCPoolInit.instance().getType(); private static final Logger LOGGER = LoggerFactory.getLogger(CacheManager.class); public Future get(String cacheKey) { + if (isOversizedShareKey(cacheKey)) { + LOGGER.warn("缓存key过长,跳过缓存读取: length={}, prefix={}", + cacheKey.length(), previewShareKey(cacheKey)); + return Future.succeededFuture(new CacheLinkInfo(JsonObject.of("cacheHit", false, "shareKey", cacheKey))); + } + String sql = "SELECT share_key as shareKey, direct_link as directLink, expiration FROM cache_link_info WHERE share_key = #{share_key}"; String sql2 = "SELECT * FROM pan_file_info WHERE share_key = #{share_key}"; Map params = new HashMap<>(); @@ -65,6 +77,17 @@ public class CacheManager { // 插入或更新缓存数据 public void cacheShareLink(CacheLinkInfo cacheLinkInfo) { + if (cacheLinkInfo == null) { + LOGGER.warn("缓存信息为空,跳过缓存写入"); + return; + } + if (isOversizedShareKey(cacheLinkInfo.getShareKey())) { + String shareKey = cacheLinkInfo.getShareKey(); + LOGGER.warn("缓存key过长,跳过缓存写入: length={}, prefix={}", + shareKey.length(), previewShareKey(shareKey)); + return; + } + String sql; if (jdbcType == JDBCType.MySQL) { sql = """ @@ -125,12 +148,19 @@ public class CacheManager { } }).onFailure(e -> LOGGER.error("文件信息插入失败", e)); } - }); + }) + .onFailure(e -> LOGGER.error("查询文件信息缓存失败: shareKey={}", cacheLinkInfo.getShareKey(), e)); } } // 写入网盘厂商API解析次数 public Future updateTotalByField(String shareKey, CacheTotalField field) { + if (isOversizedShareKey(shareKey)) { + LOGGER.warn("缓存key过长,跳过统计写入: length={}, prefix={}", + shareKey.length(), previewShareKey(shareKey)); + return Future.succeededFuture(0); + } + Promise promise = Promise.promise(); String fieldLower = field.name().toLowerCase(); String sql; @@ -179,6 +209,12 @@ public class CacheManager { } public Future getShareKeyTotal(String shareKey, String name) { + if (isOversizedShareKey(shareKey)) { + LOGGER.warn("缓存key过长,跳过统计读取: length={}, prefix={}", + shareKey.length(), previewShareKey(shareKey)); + return Future.succeededFuture(null); + } + String sql = """ SELECT `share_key`, SUM({total_name}) AS sum_num FROM `api_statistics_info` @@ -205,21 +241,56 @@ public class CacheManager { /** * 清理过期缓存记录,防止数据库无限增长 - * @return 删除的行数 + * 包括: + * 1. 清理 cache_link_info 中过期的记录 + * 2. 清理 pan_file_info 中孤立的记录(对应的 cache_link_info 已被删除) + * @return 删除的总行数 */ public Future cleanupExpiredCache() { - String sql = "DELETE FROM cache_link_info WHERE expiration > 0 AND expiration < #{now}"; - Map params = new HashMap<>(); - params.put("now", System.currentTimeMillis()); Promise promise = Promise.promise(); - SqlTemplate.forUpdate(jdbcPool, sql) + long now = System.currentTimeMillis(); + + // 第一步:清理 cache_link_info 中过期的记录 + String sqlDeleteExpired = "DELETE FROM cache_link_info WHERE expiration > 0 AND expiration < #{now}"; + Map params = new HashMap<>(); + params.put("now", now); + + SqlTemplate.forUpdate(jdbcPool, sqlDeleteExpired) .execute(params) .onSuccess(res -> { - int deleted = res.rowCount(); - if (deleted > 0) { - LOGGER.info("清理过期缓存记录 {} 条", deleted); + int deletedCache = res.rowCount(); + if (deletedCache > 0) { + LOGGER.info("清理过期缓存记录 {} 条", deletedCache); } - promise.complete(deleted); + + // 第二步:清理 pan_file_info 中孤立的记录 + // 使用 share_key 关联,create_time 使用字符串格式比较(yyyy-MM-dd HH:mm:ss) + String sqlDeleteOrphans = """ + DELETE FROM pan_file_info + WHERE share_key NOT IN ( + SELECT DISTINCT share_key FROM cache_link_info WHERE share_key IS NOT NULL + ) + AND (create_time IS NULL OR create_time < #{thresholdTime}) + """; + Map orphanParams = new HashMap<>(); + // 计算1天前的时间,转换为 yyyy-MM-dd HH:mm:ss 格式 + java.time.LocalDateTime thresholdTime = java.time.LocalDateTime.now().minusDays(1); + orphanParams.put("thresholdTime", thresholdTime.format(java.time.format.DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); + + SqlTemplate.forUpdate(jdbcPool, sqlDeleteOrphans) + .execute(orphanParams) + .onSuccess(res2 -> { + int deletedOrphans = res2.rowCount(); + if (deletedOrphans > 0) { + LOGGER.info("清理孤立文件信息记录 {} 条", deletedOrphans); + } + promise.complete(deletedCache + deletedOrphans); + }) + .onFailure(e -> { + LOGGER.warn("清理孤立文件信息记录失败(不影响主流程)", e); + // 即使孤立记录清理失败,也返回已删除的缓存记录数 + promise.complete(deletedCache); + }); }) .onFailure(e -> { LOGGER.error("清理过期缓存失败", e); @@ -232,31 +303,121 @@ public class CacheManager { * 注册定时清理过期缓存任务(每小时执行一次) * 应在应用启动后调用 */ - private static volatile boolean cleanupRegistered = false; + private static final long CLEANUP_INTERVAL_MILLIS = 3600_000L; + private static final long CLEANUP_SHUTDOWN_WAIT_MILLIS = 5_000L; + private static final AtomicBoolean CLEANUP_REGISTERED = new AtomicBoolean(false); + private static final AtomicInteger CLEANUP_IN_FLIGHT = new AtomicInteger(0); + private static final Object CLEANUP_MONITOR = new Object(); + private static volatile Long cleanupTimerId; + private static volatile Vertx cleanupVertx; public static void registerPeriodicCleanup() { - if (cleanupRegistered) return; + if (!CLEANUP_REGISTERED.compareAndSet(false, true)) { + return; + } try { - io.vertx.core.Vertx vertx = cn.qaiu.vx.core.util.VertxHolder.getVertxInstance(); - if (vertx == null) { - LOGGER.warn("Vertx 未就绪,缓存定时清理任务延迟注册"); - return; - } - cleanupRegistered = true; - vertx.setPeriodic(3600_000, 3600_000, id -> { - try { - new CacheManager().cleanupExpiredCache(); - } catch (Exception e) { - LOGGER.warn("定时清理缓存任务跳过(数据库可能未就绪)", e); - } - }); + Vertx vertx = VertxHolder.getVertxInstance(); + cleanupVertx = vertx; + cleanupTimerId = vertx.setPeriodic(CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, + id -> cleanupExpiredCacheSafely()); LOGGER.info("缓存定时清理任务已注册(每小时执行)"); } catch (Exception e) { + cleanupTimerId = null; + cleanupVertx = null; + CLEANUP_REGISTERED.set(false); LOGGER.warn("注册缓存定时清理任务失败", e); } } + public static void cancelPeriodicCleanup() { + Long timerId = cleanupTimerId; + Vertx vertx = cleanupVertx; + cleanupTimerId = null; + cleanupVertx = null; + CLEANUP_REGISTERED.set(false); + + if (timerId == null || vertx == null) { + waitForCleanupToFinish(); + return; + } + try { + if (vertx.cancelTimer(timerId)) { + LOGGER.info("缓存定时清理任务已取消"); + } + } catch (Exception e) { + LOGGER.warn("取消缓存定时清理任务失败", e); + } + waitForCleanupToFinish(); + } + + private static void cleanupExpiredCacheSafely() { + cleanupStarted(); + boolean asyncCleanupStarted = false; + try { + if (!CLEANUP_REGISTERED.get()) { + return; + } + JDBCPoolInit poolInit = JDBCPoolInit.instance(); + if (poolInit == null || poolInit.getPool() == null) { + LOGGER.debug("数据库连接池未就绪,跳过缓存定时清理"); + return; + } + Future cleanupFuture = new CacheManager().cleanupExpiredCache(); + asyncCleanupStarted = true; + cleanupFuture.onComplete(ar -> { + if (ar.failed()) { + LOGGER.warn("定时清理缓存失败", ar.cause()); + } + cleanupFinished(); + }); + } catch (Exception e) { + LOGGER.warn("定时清理缓存任务跳过(数据库可能正在关闭)", e); + } finally { + if (!asyncCleanupStarted) { + cleanupFinished(); + } + } + } + + private static void cleanupStarted() { + CLEANUP_IN_FLIGHT.incrementAndGet(); + } + + private static void cleanupFinished() { + if (CLEANUP_IN_FLIGHT.decrementAndGet() <= 0) { + synchronized (CLEANUP_MONITOR) { + CLEANUP_MONITOR.notifyAll(); + } + } + } + + private static void waitForCleanupToFinish() { + long deadline = System.currentTimeMillis() + CLEANUP_SHUTDOWN_WAIT_MILLIS; + synchronized (CLEANUP_MONITOR) { + while (CLEANUP_IN_FLIGHT.get() > 0) { + long waitMillis = deadline - System.currentTimeMillis(); + if (waitMillis <= 0) { + LOGGER.warn("等待缓存定时清理结束超时,剩余任务数: {}", CLEANUP_IN_FLIGHT.get()); + return; + } + try { + CLEANUP_MONITOR.wait(waitMillis); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.warn("等待缓存定时清理结束被中断"); + return; + } + } + } + } + public Future> getShareKeyTotal(String shareKey) { + if (isOversizedShareKey(shareKey)) { + LOGGER.warn("缓存key过长,跳过统计读取: length={}, prefix={}", + shareKey.length(), previewShareKey(shareKey)); + return Future.succeededFuture(null); + } + String sql = """ SELECT `share_key`, SUM(cache_hit_total) AS hit_total, SUM(api_parser_total) AS parser_total FROM `api_statistics_info` @@ -287,4 +448,16 @@ public class CacheManager { return promise.future(); } + private static boolean isOversizedShareKey(String shareKey) { + return shareKey != null && shareKey.length() > MAX_SHARE_KEY_LENGTH; + } + + private static String previewShareKey(String shareKey) { + if (shareKey == null) { + return ""; + } + int maxLength = 120; + return shareKey.length() <= maxLength ? shareKey : shareKey.substring(0, maxLength) + "..."; + } + } diff --git a/web-service/src/main/java/cn/qaiu/lz/common/interceptorImpl/DefaultInterceptor.java b/web-service/src/main/java/cn/qaiu/lz/common/interceptorImpl/DefaultInterceptor.java index 907df26..b1e1100 100644 --- a/web-service/src/main/java/cn/qaiu/lz/common/interceptorImpl/DefaultInterceptor.java +++ b/web-service/src/main/java/cn/qaiu/lz/common/interceptorImpl/DefaultInterceptor.java @@ -9,6 +9,10 @@ import io.vertx.core.json.JsonObject; import io.vertx.ext.web.RoutingContext; import lombok.extern.slf4j.Slf4j; +import java.util.List; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + import static cn.qaiu.vx.core.util.ConfigConstant.IGNORES_REG; import static io.vertx.core.http.HttpHeaders.CONTENT_TYPE; @@ -19,7 +23,15 @@ import static io.vertx.core.http.HttpHeaders.CONTENT_TYPE; @HandleSortFilter(1) public class DefaultInterceptor implements BeforeInterceptor { - protected final JsonArray ignores = SharedDataUtil.getJsonArrayForCustomConfig(IGNORES_REG); + /** 预编译的忽略路径正则列表,避免每次请求重新编译 */ + protected final List ignorePatterns; + + public DefaultInterceptor() { + JsonArray ignores = SharedDataUtil.getJsonArrayForCustomConfig(IGNORES_REG); + this.ignorePatterns = ignores.stream() + .map(obj -> Pattern.compile(obj.toString())) + .collect(Collectors.toList()); + } @Override public void handle(RoutingContext ctx) { @@ -38,28 +50,30 @@ public class DefaultInterceptor implements BeforeInterceptor { // limit: 1000 // # 限流的时间窗口(单位秒) // timeWindow: 60 - if (rateLimit.getBoolean("enable")) { - // 获取当前请求的路径 - String path = ctx.request().path(); - // 正则匹配路径 - if (ignores.stream().anyMatch(ignore -> path.matches(ignore.toString()))) { - // 如果匹配到忽略的路径,则不进行限流 - doNext(ctx); - return; - } - RateLimiter.checkRateLimit(ctx.request()) - .onSuccess(v -> { - // 继续执行下一个拦截器 - doNext(ctx); - }) - .onFailure(t -> { - // 限流失败,返回错误响应 - log.warn("Rate limit exceeded for path: {}", path); - ctx.response().putHeader(CONTENT_TYPE, "text/html; charset=utf-8") - .setStatusCode(429) - .end(t.getMessage()); - }); + if (!rateLimit.getBoolean("enable", false)) { + doNext(ctx); + return; } + // 获取当前请求的路径 + String path = ctx.request().path(); + // 正则匹配路径 + if (ignorePatterns.stream().anyMatch(pattern -> pattern.matcher(path).matches())) { + // 如果匹配到忽略的路径,则不进行限流 + doNext(ctx); + return; + } + RateLimiter.checkRateLimit(ctx.request()) + .onSuccess(v -> { + // 继续执行下一个拦截器 + doNext(ctx); + }) + .onFailure(t -> { + // 限流失败,返回错误响应 + log.warn("Rate limit exceeded for path: {}", path); + ctx.response().putHeader(CONTENT_TYPE, "text/html; charset=utf-8") + .setStatusCode(429) + .end(t.getMessage()); + }); } } diff --git a/web-service/src/main/java/cn/qaiu/lz/common/interceptorImpl/RateLimiter.java b/web-service/src/main/java/cn/qaiu/lz/common/interceptorImpl/RateLimiter.java index fa3937e..74c91dd 100644 --- a/web-service/src/main/java/cn/qaiu/lz/common/interceptorImpl/RateLimiter.java +++ b/web-service/src/main/java/cn/qaiu/lz/common/interceptorImpl/RateLimiter.java @@ -17,12 +17,19 @@ public class RateLimiter { private static final Map ipRequestMap = new ConcurrentHashMap<>(); private static int MAX_REQUESTS = 10; // 最大请求次数 + private static int MAX_ENTRIES = 10_000; private static long TIME_WINDOW = 60 * 1000; // 时间窗口(毫秒) - private static String PATH_REG; // 限流路径正则 + private static String PATH_REG = "/.*"; // 限流路径正则(默认匹配所有路径) + + // 上次清理时间 + private static volatile long lastCleanupTime = System.currentTimeMillis(); + // 清理间隔(30秒) + private static final long CLEANUP_INTERVAL = 30 * 1000; public static void init(JsonObject rateLimitConfig) { MAX_REQUESTS = rateLimitConfig.getInteger("limit", 10); + MAX_ENTRIES = rateLimitConfig.getInteger("maxEntries", 10_000); TIME_WINDOW = rateLimitConfig.getInteger("timeWindow", 60) * 1000L; // 转换为毫秒 PATH_REG = rateLimitConfig.getString("pathReg", "/.*"); log.info("RateLimiter initialized with max requests: {}, time window: {} ms, path regex: {}", @@ -39,28 +46,37 @@ public class RateLimiter { String ip = request.remoteAddress().host(); - // 定期清理过期条目,防止 Map 无限增长 - if (ipRequestMap.size() > 1000) { - long now = System.currentTimeMillis(); - ipRequestMap.entrySet().removeIf(entry -> now - entry.getValue().timestamp > TIME_WINDOW); + // 基于时间间隔的清理策略,避免 Map 无限增长 + long now = System.currentTimeMillis(); + if (now - lastCleanupTime > CLEANUP_INTERVAL) { + cleanupExpiredEntries(now, false); } - - RequestInfo info = ipRequestMap.compute(ip, (key, requestInfo) -> { - long currentTime = System.currentTimeMillis(); - if (requestInfo == null || currentTime - requestInfo.timestamp > TIME_WINDOW) { - // 初始化或重置计数器 - return new RequestInfo(1, currentTime); - } else { - // 增加计数器 - requestInfo.count.incrementAndGet(); - return requestInfo; + RequestInfo info; + synchronized (RateLimiter.class) { + if (!ipRequestMap.containsKey(ip) && ipRequestMap.size() >= MAX_ENTRIES) { + cleanupExpiredEntries(now, true); + if (ipRequestMap.size() >= MAX_ENTRIES) { + promise.fail("限流记录过多,请稍后再试。"); + return promise.future(); + } } - }); + + info = ipRequestMap.compute(ip, (key, requestInfo) -> { + if (requestInfo == null || now - requestInfo.timestamp > TIME_WINDOW) { + // 初始化或重置计数器 + return new RequestInfo(1, now); + } else { + // 增加计数器 + requestInfo.count.incrementAndGet(); + return requestInfo; + } + }); + } if (info.count.get() > MAX_REQUESTS) { // 超过限制 // 计算剩余时间 - long remainingTime = TIME_WINDOW - (System.currentTimeMillis() - info.timestamp); + long remainingTime = TIME_WINDOW - (now - info.timestamp); BigDecimal bigDecimal = BigDecimal.valueOf(remainingTime / 1000.0) .setScale(2, RoundingMode.HALF_UP); promise.fail("请求次数太多了,请" + bigDecimal + "秒后再试。"); @@ -71,6 +87,27 @@ public class RateLimiter { return promise.future(); } + /** + * 清理过期的限流条目 + * 使用 synchronized 避免并发清理 + */ + private static synchronized void cleanupExpiredEntries(long now, boolean force) { + // 双重检查,避免重复清理 + if (!force && now - lastCleanupTime <= CLEANUP_INTERVAL) { + return; + } + lastCleanupTime = now; + + int sizeBefore = ipRequestMap.size(); + if (sizeBefore > 0) { + ipRequestMap.entrySet().removeIf(entry -> now - entry.getValue().timestamp > TIME_WINDOW); + int sizeAfter = ipRequestMap.size(); + if (sizeBefore > 100 || sizeAfter != sizeBefore) { + log.debug("RateLimiter 清理过期条目: {} -> {}", sizeBefore, sizeAfter); + } + } + } + private static class RequestInfo { final AtomicInteger count; volatile long timestamp; diff --git a/web-service/src/main/java/cn/qaiu/lz/web/config/PlaygroundConfig.java b/web-service/src/main/java/cn/qaiu/lz/web/config/PlaygroundConfig.java index 2049bc5..2b03220 100644 --- a/web-service/src/main/java/cn/qaiu/lz/web/config/PlaygroundConfig.java +++ b/web-service/src/main/java/cn/qaiu/lz/web/config/PlaygroundConfig.java @@ -19,6 +19,7 @@ public class PlaygroundConfig { /** Token有效期:24小时 */ private static final long TOKEN_EXPIRY_MS = 24 * 60 * 60 * 1000L; + private static final int MAX_TOKEN_COUNT = 1024; private static final SecureRandom SECURE_RANDOM = new SecureRandom(); @@ -59,10 +60,10 @@ public class PlaygroundConfig { /** * 生成并存储一个新的认证Token,同时清理过期Token */ - public String generateToken() { - // 清理过期Token,防止Map无限增长 + public synchronized String generateToken() { long now = System.currentTimeMillis(); - validTokens.entrySet().removeIf(e -> now - e.getValue() > TOKEN_EXPIRY_MS); + cleanupExpiredTokens(now); + evictOldestTokensIfNeeded(); // 使用SecureRandom生成32字节的密码学安全Token byte[] bytes = new byte[32]; SECURE_RANDOM.nextBytes(bytes); @@ -78,16 +79,39 @@ public class PlaygroundConfig { /** * 校验Token是否合法且未过期 */ - public boolean validateToken(String token) { + public synchronized boolean validateToken(String token) { if (token == null || token.isEmpty()) return false; + long now = System.currentTimeMillis(); + cleanupExpiredTokens(now); Long createdAt = validTokens.get(token); if (createdAt == null) return false; - if (System.currentTimeMillis() - createdAt > TOKEN_EXPIRY_MS) { + if (now - createdAt > TOKEN_EXPIRY_MS) { validTokens.remove(token); return false; } return true; } + + private void cleanupExpiredTokens(long now) { + validTokens.entrySet().removeIf(e -> now - e.getValue() > TOKEN_EXPIRY_MS); + } + + private void evictOldestTokensIfNeeded() { + while (validTokens.size() >= MAX_TOKEN_COUNT) { + String oldestToken = null; + long oldestTime = Long.MAX_VALUE; + for (Map.Entry entry : validTokens.entrySet()) { + if (entry.getValue() < oldestTime) { + oldestTime = entry.getValue(); + oldestToken = entry.getKey(); + } + } + if (oldestToken == null) { + return; + } + validTokens.remove(oldestToken); + } + } /** * 获取单例实例 @@ -125,5 +149,13 @@ public class PlaygroundConfig { } else { log.info("未找到playground配置,使用默认值: enabled=false, public=false"); } + String enabledOverride = System.getProperty("NFD_PLAYGROUND_ENABLED"); + if (enabledOverride == null || enabledOverride.isBlank()) { + enabledOverride = System.getenv("NFD_PLAYGROUND_ENABLED"); + } + if (enabledOverride != null && !enabledOverride.isBlank()) { + cfg.enabled = Boolean.parseBoolean(enabledOverride); + log.info("Playground enabled 已被 NFD_PLAYGROUND_ENABLED 覆盖为 {}", cfg.enabled); + } } } diff --git a/web-service/src/main/java/cn/qaiu/lz/web/controller/ParserApi.java b/web-service/src/main/java/cn/qaiu/lz/web/controller/ParserApi.java index 5c6f493..cb925c6 100644 --- a/web-service/src/main/java/cn/qaiu/lz/web/controller/ParserApi.java +++ b/web-service/src/main/java/cn/qaiu/lz/web/controller/ParserApi.java @@ -13,6 +13,7 @@ import cn.qaiu.lz.web.model.LinkInfoResp; import cn.qaiu.lz.web.model.StatisticsInfo; import cn.qaiu.lz.web.service.DbService; import cn.qaiu.parser.PanDomainTemplate; +import cn.qaiu.parser.IPanTool; import cn.qaiu.parser.ParserCreate; import cn.qaiu.parser.clientlink.ClientLinkType; import cn.qaiu.vx.core.annotaions.RouteHandler; @@ -76,11 +77,30 @@ public class ParserApi { private static final CacheManager cacheManager = new CacheManager(); private static final ServerApi serverApi = new ServerApi(); + @RouteMapping(value = "/check/:type/:key", method = RouteMethod.GET) + public void check(HttpServerResponse response, String type, String key) { + response.putHeader("Content-Type", "text/plain; charset=utf-8") + .setStatusCode(200) + .end("ok"); + } + + @RouteMapping(value = "/check/:type/:key", method = RouteMethod.HEAD) + public void checkHead(HttpServerResponse response, String type, String key) { + response.putHeader("Content-Type", "text/plain; charset=utf-8") + .setStatusCode(200) + .end(); + } + @RouteMapping(value = "/linkInfo", method = RouteMethod.GET) public Future parse(HttpServerRequest request, String pwd, String auth) { Promise promise = Promise.promise(); String url = URLParamUtil.parserParams(request); - ParserCreate parserCreate = ParserCreate.fromShareUrl(url).setShareLinkInfoPwd(pwd); + ParserCreate parserCreate; + try { + parserCreate = ParserCreate.fromShareUrl(url).setShareLinkInfoPwd(pwd); + } catch (Exception e) { + return Future.failedFuture(e); + } ShareLinkInfo shareLinkInfo = parserCreate.getShareLinkInfo(); // 构建链接信息响应,如果有 auth 参数则附加到链接中 @@ -141,7 +161,12 @@ public class ParserApi { @RouteMapping("/getFileList") public Future> getFileList(HttpServerRequest request, String pwd, String dirId, String uuid) { String url = URLParamUtil.parserParams(request); - ParserCreate parserCreate = ParserCreate.fromShareUrl(url).setShareLinkInfoPwd(pwd); + ParserCreate parserCreate; + try { + parserCreate = ParserCreate.fromShareUrl(url).setShareLinkInfoPwd(pwd); + } catch (Exception e) { + return Future.failedFuture(e); + } String linkPrefix = getLinkPrefix(request); parserCreate.getShareLinkInfo().getOtherParam().put("domainName", linkPrefix); parserCreate.getShareLinkInfo().getOtherParam().put("_requestOrigin", linkPrefix); @@ -151,7 +176,8 @@ public class ParserApi { if (StringUtils.isNotBlank(uuid)) { parserCreate.getShareLinkInfo().getOtherParam().put("uuid", uuid); } - return parserCreate.createTool().parseFileList(); + IPanTool tool = parserCreate.createTool(); + return IPanTool.closeAfter(tool, tool::parseFileList); } // 目录解析下载文件 @@ -174,7 +200,8 @@ public class ParserApi { String linkPrefix = getLinkPrefix(request); shareLinkInfo.getOtherParam().put("domainName", linkPrefix); shareLinkInfo.getOtherParam().put("_requestOrigin", linkPrefix); - return parserCreate.createTool().parseById(); + IPanTool tool = parserCreate.createTool(); + return IPanTool.closeAfter(tool, tool::parseById); } @RouteMapping("/redirectUrl/:type/:param") @@ -183,10 +210,9 @@ public class ParserApi { getFileDownUrl(request, type, param) .onSuccess(res -> { - ResponseUtil.redirect(response, res); - promise.complete(); + ResponseUtil.redirect(response, res, promise); }) - .onFailure(t -> promise.fail(t.fillInStackTrace())); + .onFailure(promise::tryFail); return promise.future(); } @@ -212,7 +238,7 @@ public class ParserApi { } String previewURL = SharedDataUtil.getJsonStringForServerConfig("previewURL"); - serverApi.parseKeyJson(request, type, key).onSuccess(res -> { + serverApi.parseKeyJsonForRedirect(request, type, key).onSuccess(res -> { redirect(response, previewURL, res); }).onFailure(e -> { ResponseUtil.fireJsonResultResponse(response, JsonResult.error(e.toString())); @@ -248,7 +274,7 @@ public class ParserApi { } String previewURL = SharedDataUtil.getJsonStringForServerConfig("previewURL"); - serverApi.parseJson(request, pwd, null).onSuccess(res -> { + serverApi.parseJsonForRedirect(request, pwd, null).onSuccess(res -> { redirect(response, previewURL, res); }).onFailure(e -> { ResponseUtil.fireJsonResultResponse(response, JsonResult.error(e.toString())); @@ -264,10 +290,9 @@ public class ParserApi { getFileDownUrl(request, type, param) .onSuccess(res -> { String url = viewPrefix + URLEncoder.encode(res, StandardCharsets.UTF_8); - ResponseUtil.redirect(response, url); - promise.complete(); + ResponseUtil.redirect(response, url, promise); }) - .onFailure(t -> promise.fail(t.fillInStackTrace())); + .onFailure(promise::tryFail); return promise.future(); } @@ -320,7 +345,8 @@ public class ParserApi { } // 使用默认方法解析并生成客户端链接 - parserCreate.createTool().parseWithClientLinks() + IPanTool tool = parserCreate.createTool(); + IPanTool.closeAfter(tool, tool::parseWithClientLinks) .onSuccess(clientLinks -> { try { ClientLinkResp response = buildClientLinkResponse(shareLinkInfo, clientLinks); @@ -362,7 +388,8 @@ public class ParserApi { URLParamUtil.addParam(parserCreate); // 使用默认方法解析并生成客户端链接 - parserCreate.createTool().parseWithClientLinks() + IPanTool tool = parserCreate.createTool(); + IPanTool.closeAfter(tool, tool::parseWithClientLinks) .onSuccess(clientLinks -> { try { String clientLink = extractClientLinkByType(clientLinks, clientType); diff --git a/web-service/src/main/java/cn/qaiu/lz/web/controller/PlaygroundApi.java b/web-service/src/main/java/cn/qaiu/lz/web/controller/PlaygroundApi.java index 09ae694..71f956c 100644 --- a/web-service/src/main/java/cn/qaiu/lz/web/controller/PlaygroundApi.java +++ b/web-service/src/main/java/cn/qaiu/lz/web/controller/PlaygroundApi.java @@ -123,6 +123,10 @@ public class PlaygroundApi { // 获取密码 JsonObject body = ctx.body().asJsonObject(); + if (body == null) { + promise.complete(JsonResult.error("请求体不能为空").toJsonObject()); + return promise.future(); + } String password = body.getString("password"); if (StringUtils.isBlank(password)) { @@ -249,82 +253,84 @@ public class PlaygroundApi { ShareLinkInfo shareLinkInfo = parserCreate.getShareLinkInfo(); // 创建演练场执行器 - JsPlaygroundExecutor executor = new JsPlaygroundExecutor(shareLinkInfo, jsCode); + final JsPlaygroundExecutor executor = new JsPlaygroundExecutor(shareLinkInfo, jsCode); // 根据方法类型选择执行,并异步处理结果 Future executionFuture; - switch (method) { - case "parse": - executionFuture = executor.executeParseAsync().map(r -> (Object) r); - break; - case "parseFileList": - executionFuture = executor.executeParseFileListAsync().map(r -> (Object) r); - break; - case "parseById": - executionFuture = executor.executeParseByIdAsync().map(r -> (Object) r); - break; - default: - promise.fail(new IllegalArgumentException("未知的方法类型: " + method)); - return promise.future(); + try { + switch (method) { + case "parse": + executionFuture = executor.executeParseAsync().map(r -> (Object) r); + break; + case "parseFileList": + executionFuture = executor.executeParseFileListAsync().map(r -> (Object) r); + break; + case "parseById": + executionFuture = executor.executeParseByIdAsync().map(r -> (Object) r); + break; + default: + executor.close(); + promise.fail(new IllegalArgumentException("未知的方法类型: " + method)); + return promise.future(); + } + } catch (Exception ex) { + // 同步异常路径:executor 已创建但 Future 未注册,需要手动关闭 + executor.close(); + throw ex; } - // 异步处理执行结果 - executionFuture.onSuccess(result -> { - log.debug("执行成功,结果类型: {}, 结果值: {}", - result != null ? result.getClass().getSimpleName() : "null", - result); - - // 获取日志 - List logEntries = executor.getLogs(); - log.debug("获取到 {} 条日志记录", logEntries.size()); - - List respLogs = logEntries.stream() - .map(entry -> PlaygroundTestResp.LogEntry.builder() - .level(entry.getLevel()) - .message(entry.getMessage()) - .timestamp(entry.getTimestamp()) - .source(entry.getSource()) // 使用日志条目的来源标识 - .build()) - .collect(Collectors.toList()); + // 异步处理执行结果,finally 中统一释放 executor,避免响应构建异常导致资源泄漏 + executionFuture.onComplete(ar -> { + try { + long executionTime = System.currentTimeMillis() - startTime; + List logEntries = executor.getLogs(); + List respLogs = logEntries.stream() + .map(entry -> PlaygroundTestResp.LogEntry.builder() + .level(entry.getLevel()) + .message(entry.getMessage()) + .timestamp(entry.getTimestamp()) + .source(entry.getSource()) + .build()) + .collect(Collectors.toList()); - long executionTime = System.currentTimeMillis() - startTime; + if (ar.succeeded()) { + Object result = ar.result(); + log.debug("执行成功,结果类型: {}, 结果摘要: {}", + result != null ? result.getClass().getSimpleName() : "null", + summarizeResult(result)); - // 构建响应 - PlaygroundTestResp response = PlaygroundTestResp.builder() - .success(true) - .result(result) - .logs(respLogs) - .executionTime(executionTime) - .build(); + PlaygroundTestResp response = PlaygroundTestResp.builder() + .success(true) + .result(result) + .logs(respLogs) + .executionTime(executionTime) + .build(); - JsonObject jsonResponse = JsonObject.mapFrom(response); - log.debug("测试成功响应: {}", jsonResponse.encodePrettily()); - promise.complete(jsonResponse); - }).onFailure(e -> { - long executionTime = System.currentTimeMillis() - startTime; - String errorMessage = e.getMessage(); + log.debug("测试成功响应: success=true, logCount={}", respLogs.size()); + promise.complete(JsonObject.mapFrom(response)); + } else { + Throwable e = ar.cause(); + String errorMessage = e == null ? "执行失败" : e.getMessage(); + log.error("演练场执行失败", e); - log.error("演练场执行失败", e); + PlaygroundTestResp response = PlaygroundTestResp.builder() + .success(false) + .error(errorMessage) + .executionTime(executionTime) + .logs(respLogs) + .build(); - // 尝试获取已有的日志 - List logEntries = executor.getLogs(); - List respLogs = logEntries.stream() - .map(entry -> PlaygroundTestResp.LogEntry.builder() - .level(entry.getLevel()) - .message(entry.getMessage()) - .timestamp(entry.getTimestamp()) - .source(entry.getSource()) // 使用日志条目的来源标识 - .build()) - .collect(Collectors.toList()); - - PlaygroundTestResp response = PlaygroundTestResp.builder() - .success(false) - .error(errorMessage) - .executionTime(executionTime) - .logs(respLogs) - .build(); - - promise.complete(JsonObject.mapFrom(response)); + promise.complete(JsonObject.mapFrom(response)); + } + } catch (Exception e) { + log.error("构建演练场响应失败", e); + promise.tryComplete(JsonObject.mapFrom(PlaygroundTestResp.builder() + .success(false) + .error("构建响应失败: " + e.getMessage()) + .build())); + } finally { + executor.close(); + } }); } catch (Exception e) { @@ -462,19 +468,7 @@ public class PlaygroundApi { } // 检查type是否已存在 - dbService.getPlaygroundParserList().onSuccess(listResult -> { - var list = listResult.getJsonArray("data"); - boolean exists = false; - if (list != null) { - for (int i = 0; i < list.size(); i++) { - var item = list.getJsonObject(i); - if (type.equals(item.getString("type"))) { - exists = true; - break; - } - } - } - + dbService.playgroundParserTypeExists(type, null).onSuccess(exists -> { if (exists) { promise.complete(JsonResult.error("解析器类型 " + type + " 已存在,请使用其他类型标识").toJsonObject()); return; @@ -493,25 +487,29 @@ public class PlaygroundApi { parser.put("ip", getClientIp(ctx.request())); parser.put("enabled", true); + try { + CustomParserRegistry.register(config); + } catch (Exception e) { + log.error("注册解析器失败", e); + promise.complete(JsonResult.error("保存失败,注册解析器失败: " + e.getMessage()).toJsonObject()); + return; + } + dbService.savePlaygroundParser(parser).onSuccess(result -> { - // 保存成功后,立即注册到解析器系统 - try { - CustomParserRegistry.register(config); - log.info("已注册演练场解析器: {} ({})", displayName, type); - promise.complete(JsonResult.success("保存并注册成功").toJsonObject()); - } catch (Exception e) { - log.error("注册解析器失败", e); - // 虽然注册失败,但保存成功了,返回警告 - promise.complete(JsonResult.success( - "保存成功,但注册失败(重启服务后会自动加载): " + e.getMessage() - ).toJsonObject()); + if (!result.getBoolean("success", false)) { + CustomParserRegistry.unregister(type); + promise.complete(JsonResult.error(result.getString("msg", "保存失败")).toJsonObject()); + return; } + log.info("已注册演练场解析器: {} ({})", displayName, type); + promise.complete(JsonResult.success("保存并注册成功").toJsonObject()); }).onFailure(e -> { + CustomParserRegistry.unregister(type); log.error("保存解析器失败", e); promise.complete(JsonResult.error("保存失败: " + e.getMessage()).toJsonObject()); }); }).onFailure(e -> { - log.error("获取解析器列表失败", e); + log.error("检查解析器类型失败", e); promise.complete(JsonResult.error("检查解析器失败: " + e.getMessage()).toJsonObject()); }); }).onFailure(e -> { @@ -557,6 +555,11 @@ public class PlaygroundApi { return promise.future(); } + if (jsCode.length() > MAX_CODE_LENGTH) { + promise.complete(JsonResult.error("代码长度超过限制(最大128KB),当前长度: " + jsCode.length() + " 字节").toJsonObject()); + return promise.future(); + } + // 解析元数据 try { var config = JsScriptMetadataParser.parseScript(jsCode); @@ -570,6 +573,7 @@ public class PlaygroundApi { boolean enabled = body.getBoolean("enabled", true); JsonObject parser = new JsonObject(); + parser.put("type", type); parser.put("name", name); parser.put("displayName", displayName); parser.put("description", description); @@ -579,29 +583,73 @@ public class PlaygroundApi { parser.put("jsCode", jsCode); parser.put("enabled", enabled); - dbService.updatePlaygroundParser(id, parser).onSuccess(result -> { - // 更新成功后,重新注册解析器 - try { - if (enabled) { - // 先注销旧的(如果存在) - CustomParserRegistry.unregister(type); - // 重新注册新的 - CustomParserRegistry.register(config); - log.info("已重新注册演练场解析器: {} ({})", displayName, type); - } else { - // 禁用时注销 - CustomParserRegistry.unregister(type); - log.info("已注销演练场解析器: {}", type); - } - promise.complete(JsonResult.success("更新并重新注册成功").toJsonObject()); - } catch (Exception e) { - log.error("重新注册解析器失败", e); - promise.complete(JsonResult.success( - "更新成功,但注册失败(重启服务后会自动加载): " + e.getMessage() - ).toJsonObject()); + dbService.getPlaygroundParserById(id).onSuccess(oldResult -> { + String oldType = null; + if (oldResult.getBoolean("success", false) && oldResult.getJsonObject("data") != null) { + oldType = oldResult.getJsonObject("data").getString("type"); + } else { + promise.complete(JsonResult.error("解析器不存在").toJsonObject()); + return; } + final String oldRegisteredType = oldType; + dbService.playgroundParserTypeExists(type, id).onSuccess(exists -> { + if (exists) { + promise.complete(JsonResult.error("解析器类型 " + type + " 已被其他解析器使用").toJsonObject()); + return; + } + + var oldConfig = CustomParserRegistry.get(oldRegisteredType); + boolean removedOldType = false; + boolean registeredNewType = false; + try { + if (StringUtils.isNotBlank(oldRegisteredType)) { + removedOldType = CustomParserRegistry.unregister(oldRegisteredType); + } + if (enabled) { + CustomParserRegistry.register(config); + registeredNewType = true; + } + } catch (Exception e) { + if (removedOldType && oldConfig != null) { + try { + CustomParserRegistry.register(oldConfig); + } catch (Exception restoreError) { + log.warn("恢复旧解析器注册失败: {}", oldRegisteredType, restoreError); + } + } + log.error("预注册解析器失败", e); + promise.complete(JsonResult.error("更新失败,注册解析器失败: " + e.getMessage()).toJsonObject()); + return; + } + final boolean shouldRollbackNewType = registeredNewType; + final boolean shouldRestoreOldType = removedOldType; + final var oldParserConfig = oldConfig; + + dbService.updatePlaygroundParser(id, parser).onSuccess(result -> { + if (!result.getBoolean("success", false)) { + rollbackParserRegistration(type, oldRegisteredType, shouldRollbackNewType, + shouldRestoreOldType, oldParserConfig); + promise.complete(JsonResult.error(result.getString("msg", "更新失败")).toJsonObject()); + return; + } + if (!enabled) { + log.info("已注销演练场解析器: {}", oldRegisteredType); + } else { + log.info("已重新注册演练场解析器: {} ({})", displayName, type); + } + promise.complete(JsonResult.success("更新并重新注册成功").toJsonObject()); + }).onFailure(e -> { + rollbackParserRegistration(type, oldRegisteredType, shouldRollbackNewType, + shouldRestoreOldType, oldParserConfig); + log.error("更新解析器失败", e); + promise.complete(JsonResult.error("更新失败: " + e.getMessage()).toJsonObject()); + }); + }).onFailure(e -> { + log.error("检查解析器类型失败", e); + promise.complete(JsonResult.error("更新失败: " + e.getMessage()).toJsonObject()); + }); }).onFailure(e -> { - log.error("更新解析器失败", e); + log.error("获取旧解析器信息失败", e); promise.complete(JsonResult.error("更新失败: " + e.getMessage()).toJsonObject()); }); @@ -617,6 +665,24 @@ public class PlaygroundApi { return promise.future(); } + private void rollbackParserRegistration(String newType, String oldType, boolean rollbackNewType, + boolean restoreOldType, cn.qaiu.parser.custom.CustomParserConfig oldConfig) { + if (rollbackNewType) { + try { + CustomParserRegistry.unregister(newType); + } catch (Exception rollbackError) { + log.warn("回滚新解析器注册失败: {}", newType, rollbackError); + } + } + if (restoreOldType && oldConfig != null) { + try { + CustomParserRegistry.register(oldConfig); + } catch (Exception restoreError) { + log.warn("恢复旧解析器注册失败: {}", oldType, restoreError); + } + } + } + /** * 删除解析器 */ @@ -695,5 +761,18 @@ public class PlaygroundApi { } return ip; } + + private String summarizeResult(Object result) { + if (result == null) { + return "null"; + } + if (result instanceof String str) { + return "String(length=" + str.length() + ")"; + } + if (result instanceof List list) { + return "List(size=" + list.size() + ")"; + } + return result.getClass().getSimpleName(); + } } diff --git a/web-service/src/main/java/cn/qaiu/lz/web/controller/ServerApi.java b/web-service/src/main/java/cn/qaiu/lz/web/controller/ServerApi.java index 6d612aa..282d444 100644 --- a/web-service/src/main/java/cn/qaiu/lz/web/controller/ServerApi.java +++ b/web-service/src/main/java/cn/qaiu/lz/web/controller/ServerApi.java @@ -29,6 +29,8 @@ import lombok.extern.slf4j.Slf4j; @RouteHandler("/") public class ServerApi { + private static final String SKIP_CLIENT_LINKS = "_skipClientLinks"; + private final CacheService cacheService = AsyncServiceUtil.getAsyncServiceInstance(CacheService.class); private final DbService dbService = AsyncServiceUtil.getAsyncServiceInstance(DbService.class); @@ -38,16 +40,15 @@ public class ServerApi { String url = URLParamUtil.parserParams(request); // 构建 otherParam,包含 UA 和解码后的认证参数 - JsonObject otherParam = buildOtherParam(request, auth); + JsonObject otherParam = buildOtherParam(request, auth, true); cacheService.getCachedByShareUrlAndPwd(url, pwd, otherParam) .onSuccess(res -> ResponseUtil.redirect( - response.putHeader("nfd-cache-hit", res.getCacheHit().toString()) - .putHeader("nfd-cache-expires", res.getExpires()), + addCacheHeaders(response, res), res.getDirectLink(), promise)) .onFailure(t -> { recordDonatedAccountFailureIfNeeded(otherParam, t); - promise.fail(t.fillInStackTrace()); + promise.tryFail(t); }); return promise.future(); } @@ -60,6 +61,13 @@ public class ServerApi { .onFailure(t -> recordDonatedAccountFailureIfNeeded(otherParam, t)); } + public Future parseJsonForRedirect(HttpServerRequest request, String pwd, String auth) { + String url = URLParamUtil.parserParams(request); + JsonObject otherParam = buildOtherParam(request, auth, true); + return cacheService.getCachedByShareUrlAndPwd(url, pwd, otherParam) + .onFailure(t -> recordDonatedAccountFailureIfNeeded(otherParam, t)); + } + @RouteMapping(value = "/json/:type/:key", method = RouteMethod.GET) public Future parseKeyJson(HttpServerRequest request, String type, String key) { String pwd = ""; @@ -72,6 +80,18 @@ public class ServerApi { return cacheService.getCachedByShareKeyAndPwd(type, key, pwd, JsonObject.of("UA",request.headers().get("user-agent"), "_requestOrigin", origin)); } + public Future parseKeyJsonForRedirect(HttpServerRequest request, String type, String key) { + String pwd = ""; + if (key.contains("@")) { + String[] keys = key.split("@"); + key = keys[0]; + pwd = keys[1]; + } + String origin = resolveOrigin(request); + return cacheService.getCachedByShareKeyAndPwd(type, key, pwd, + JsonObject.of("UA", request.headers().get("user-agent"), "_requestOrigin", origin, SKIP_CLIENT_LINKS, true)); + } + @RouteMapping(value = "/:type/:key", method = RouteMethod.GET) public Future parseKey(HttpServerResponse response, HttpServerRequest request, String type, String key) { Promise promise = Promise.promise(); @@ -82,15 +102,23 @@ public class ServerApi { pwd = keys[1]; } String origin = resolveOrigin(request); - cacheService.getCachedByShareKeyAndPwd(type, key, pwd, JsonObject.of("UA",request.headers().get("user-agent"), "_requestOrigin", origin)) + cacheService.getCachedByShareKeyAndPwd(type, key, pwd, + JsonObject.of("UA", request.headers().get("user-agent"), "_requestOrigin", origin, SKIP_CLIENT_LINKS, true)) .onSuccess(res -> ResponseUtil.redirect( - response.putHeader("nfd-cache-hit", res.getCacheHit().toString()) - .putHeader("nfd-cache-expires", res.getExpires()), + addCacheHeaders(response, res), res.getDirectLink(), promise)) - .onFailure(t -> promise.fail(t.fillInStackTrace())); + .onFailure(promise::tryFail); return promise.future(); } + private static HttpServerResponse addCacheHeaders(HttpServerResponse response, CacheLinkInfo cacheLinkInfo) { + if (response.ended() || response.closed()) { + return response; + } + return response.putHeader("nfd-cache-hit", cacheLinkInfo.getCacheHit().toString()) + .putHeader("nfd-cache-expires", cacheLinkInfo.getExpires()); + } + /** * 解析请求来源地址,支持反向代理 */ @@ -114,7 +142,14 @@ public class ServerApi { * @return JsonObject */ private JsonObject buildOtherParam(HttpServerRequest request, String auth) { + return buildOtherParam(request, auth, false); + } + + private JsonObject buildOtherParam(HttpServerRequest request, String auth, boolean skipClientLinks) { JsonObject otherParam = JsonObject.of("UA", request.headers().get("user-agent"), "_requestOrigin", resolveOrigin(request)); + if (skipClientLinks) { + otherParam.put(SKIP_CLIENT_LINKS, true); + } // 解码认证参数 if (auth != null && !auth.isEmpty()) { diff --git a/web-service/src/main/java/cn/qaiu/lz/web/service/DbService.java b/web-service/src/main/java/cn/qaiu/lz/web/service/DbService.java index a996b4a..3b8ddfe 100644 --- a/web-service/src/main/java/cn/qaiu/lz/web/service/DbService.java +++ b/web-service/src/main/java/cn/qaiu/lz/web/service/DbService.java @@ -25,6 +25,11 @@ public interface DbService extends BaseAsyncService { */ Future getPlaygroundParserList(); + /** + * 获取启动时需要注册的已启用演练场解析器 + */ + Future getEnabledPlaygroundParsersForLoad(); + /** * 保存演练场解析器 */ @@ -45,6 +50,11 @@ public interface DbService extends BaseAsyncService { */ Future getPlaygroundParserCount(); + /** + * 检查演练场解析器类型是否已存在 + */ + Future playgroundParserTypeExists(String type, Long excludeId); + /** * 根据ID获取演练场解析器 */ diff --git a/web-service/src/main/java/cn/qaiu/lz/web/service/impl/CacheServiceImpl.java b/web-service/src/main/java/cn/qaiu/lz/web/service/impl/CacheServiceImpl.java index 8dbd0c1..8b8a303 100644 --- a/web-service/src/main/java/cn/qaiu/lz/web/service/impl/CacheServiceImpl.java +++ b/web-service/src/main/java/cn/qaiu/lz/web/service/impl/CacheServiceImpl.java @@ -27,6 +27,8 @@ import java.util.Map; @Slf4j public class CacheServiceImpl implements CacheService { + private static final String SKIP_CLIENT_LINKS = "_skipClientLinks"; + private final CacheManager cacheManager = new CacheManager(); static { @@ -62,10 +64,14 @@ public class CacheServiceImpl implements CacheService { try { tool = parserCreate.createTool(); } catch (Exception e) { - promise.fail(e.getCause().getCause()); + Throwable cause = e; + while (cause.getCause() != null) { + cause = cause.getCause(); + } + promise.fail(cause); return; } - tool.parse().onSuccess(redirectUrl -> { + IPanTool.closeAfter(tool, tool::parse).onSuccess(redirectUrl -> { // 使用 effectiveCacheDuration long expires = System.currentTimeMillis() + effectiveCacheDuration * 60 * 1000L; result.setDirectLink(redirectUrl); @@ -74,7 +80,7 @@ public class CacheServiceImpl implements CacheService { result.setExpires(generateDate(expires)); // 调试日志:检查解析器返回的otherParam - log.info("[解析完成] shareKey={}, otherParam.keys={}, hasFileInfo={}", + log.debug("[解析完成] shareKey={}, otherParam.keys={}, hasFileInfo={}", cacheKey, shareLinkInfo.getOtherParam().keySet(), shareLinkInfo.getOtherParam().containsKey("fileInfo")); @@ -90,17 +96,19 @@ public class CacheServiceImpl implements CacheService { FileInfo fileInfo = (FileInfo) shareLinkInfo.getOtherParam().get("fileInfo"); result.setFileInfo(fileInfo); cacheLinkInfo.setFileInfo(fileInfo); - log.info("[设置文件信息] shareKey={}, fileName={}, size={}", + log.debug("[设置文件信息] shareKey={}, fileName={}, size={}", cacheKey, fileInfo.getFileName(), fileInfo.getSize()); } catch (Exception e) { log.error("文件对象转换异常: shareKey={}", cacheKey, e); } } else { - log.warn("[文件信息缺失] 解析器未返回fileInfo: shareKey={}, otherParam.keys={}", + log.debug("[文件信息缺失] 解析器未返回fileInfo: shareKey={}, otherParam.keys={}", cacheKey, shareLinkInfo.getOtherParam().keySet()); } - // 传递 downloadHeaders 并生成下载命令 - processDownloadHeaders(shareLinkInfo, cacheLinkInfo, result); + if (shouldGenerateClientLinks(shareLinkInfo)) { + // 传递 downloadHeaders 并生成下载命令 + processDownloadHeaders(shareLinkInfo, cacheLinkInfo, result); + } promise.complete(result); // 更新缓存 cacheManager.cacheShareLink(cacheLinkInfo); @@ -110,19 +118,21 @@ public class CacheServiceImpl implements CacheService { // 缓存命中,生成过期时间并生成下载命令 result.setExpires(generateDate(result.getExpiration())); - // 初始化 otherParam(如果为空) - if (result.getOtherParam() == null) { - result.setOtherParam(new HashMap<>()); + if (shouldGenerateClientLinks(shareLinkInfo)) { + // 初始化 otherParam(如果为空) + if (result.getOtherParam() == null) { + result.setOtherParam(new HashMap<>()); + } + + // 生成下载命令(aria2、curl) + generateDownloadCommands(result); } - // 生成下载命令(aria2、curl) - generateDownloadCommands(result); - promise.complete(result); cacheManager.updateTotalByField(cacheKey, CacheTotalField.CACHE_HIT_TOTAL) .onFailure(e -> log.error("更新缓存命中计数失败: cacheKey={}", cacheKey, e)); } - }).onFailure(t -> promise.fail(t.fillInStackTrace())); + }).onFailure(promise::tryFail); return promise.future(); } @@ -131,6 +141,13 @@ public class CacheServiceImpl implements CacheService { return DateFormatUtils.format(new Date(ts), "yyyy-MM-dd HH:mm:ss"); } + private boolean shouldGenerateClientLinks(ShareLinkInfo shareLinkInfo) { + if (shareLinkInfo == null || shareLinkInfo.getOtherParam() == null) { + return true; + } + return !Boolean.TRUE.equals(shareLinkInfo.getOtherParam().get(SKIP_CLIENT_LINKS)); + } + /** * 处理下载请求头并生成下载命令 * 从 shareLinkInfo 中提取 downloadHeaders,传递到 cacheLinkInfo 和 result @@ -147,7 +164,7 @@ public class CacheServiceImpl implements CacheService { Map headers = (Map) shareLinkInfo.getOtherParam().get("downloadHeaders"); if (headers != null) { downloadHeaders = headers; - log.info("从shareLinkInfo提取downloadHeaders: shareKey={}, 请求头数量={}", + log.debug("从shareLinkInfo提取downloadHeaders: shareKey={}, 请求头数量={}", cacheLinkInfo.getShareKey(), downloadHeaders.size()); } } @@ -255,14 +272,24 @@ public class CacheServiceImpl implements CacheService { @Override public Future getCachedByShareKeyAndPwd(String type, String shareKey, String pwd, JsonObject otherParam) { - ParserCreate parserCreate = ParserCreate.fromType(type).shareKey(shareKey).setShareLinkInfoPwd(pwd); + ParserCreate parserCreate; + try { + parserCreate = ParserCreate.fromType(type).shareKey(shareKey).setShareLinkInfoPwd(pwd); + } catch (Exception e) { + return Future.failedFuture(e); + } parserCreate.getShareLinkInfo().getOtherParam().putAll(otherParam.getMap()); return getAndSaveCachedShareLink(parserCreate); } @Override public Future getCachedByShareUrlAndPwd(String shareUrl, String pwd, JsonObject otherParam) { - ParserCreate parserCreate = ParserCreate.fromShareUrl(shareUrl).setShareLinkInfoPwd(pwd); + ParserCreate parserCreate; + try { + parserCreate = ParserCreate.fromShareUrl(shareUrl).setShareLinkInfoPwd(pwd); + } catch (Exception e) { + return Future.failedFuture(e); + } parserCreate.getShareLinkInfo().getOtherParam().putAll(otherParam.getMap()); // 检查是否有临时认证参数 diff --git a/web-service/src/main/java/cn/qaiu/lz/web/service/impl/DbServiceImpl.java b/web-service/src/main/java/cn/qaiu/lz/web/service/impl/DbServiceImpl.java index 4d8584a..dae80f3 100644 --- a/web-service/src/main/java/cn/qaiu/lz/web/service/impl/DbServiceImpl.java +++ b/web-service/src/main/java/cn/qaiu/lz/web/service/impl/DbServiceImpl.java @@ -85,33 +85,18 @@ public class DbServiceImpl implements DbService { public Future getPlaygroundParserList() { JDBCPool client = JDBCPoolInit.instance().getPool(); Promise promise = Promise.promise(); - String sql = "SELECT * FROM playground_parser ORDER BY create_time DESC"; + String sql = """ + SELECT id, name, type, display_name, description, author, version, + match_pattern, ip, create_time, update_time, enabled + FROM playground_parser + ORDER BY create_time DESC + LIMIT 100 + """; client.query(sql).execute().onSuccess(rows -> { List list = new ArrayList<>(); for (Row row : rows) { - JsonObject parser = new JsonObject(); - parser.put("id", row.getLong("id")); - parser.put("name", row.getString("name")); - parser.put("type", row.getString("type")); - parser.put("displayName", row.getString("display_name")); - parser.put("description", row.getString("description")); - parser.put("author", row.getString("author")); - parser.put("version", row.getString("version")); - parser.put("matchPattern", row.getString("match_pattern")); - parser.put("jsCode", row.getString("js_code")); - parser.put("ip", row.getString("ip")); - // 将LocalDateTime转换为字符串格式,避免序列化为数组 - var createTime = row.getLocalDateTime("create_time"); - if (createTime != null) { - parser.put("createTime", createTime.toString().replace("T", " ")); - } - var updateTime = row.getLocalDateTime("update_time"); - if (updateTime != null) { - parser.put("updateTime", updateTime.toString().replace("T", " ")); - } - parser.put("enabled", row.getBoolean("enabled")); - list.add(parser); + list.add(toPlaygroundParserJson(row, false)); } promise.complete(JsonResult.data(list).toJsonObject()); }).onFailure(e -> { @@ -122,6 +107,33 @@ public class DbServiceImpl implements DbService { return promise.future(); } + @Override + public Future getEnabledPlaygroundParsersForLoad() { + JDBCPool client = JDBCPoolInit.instance().getPool(); + Promise promise = Promise.promise(); + String sql = """ + SELECT id, name, type, display_name, description, author, version, + match_pattern, js_code, ip, create_time, update_time, enabled + FROM playground_parser + WHERE enabled = TRUE + ORDER BY update_time DESC, create_time DESC + LIMIT 100 + """; + + client.query(sql).execute().onSuccess(rows -> { + List list = new ArrayList<>(); + for (Row row : rows) { + list.add(toPlaygroundParserJson(row, true)); + } + promise.complete(JsonResult.data(list).toJsonObject()); + }).onFailure(e -> { + log.error("getEnabledPlaygroundParsersForLoad failed", e); + promise.fail(e); + }); + + return promise.future(); + } + @Override public Future savePlaygroundParser(JsonObject parser) { JDBCPool client = JDBCPoolInit.instance().getPool(); @@ -164,13 +176,14 @@ public class DbServiceImpl implements DbService { String sql = """ UPDATE playground_parser - SET name = ?, display_name = ?, description = ?, author = ?, + SET type = ?, name = ?, display_name = ?, description = ?, author = ?, version = ?, match_pattern = ?, js_code = ?, update_time = NOW(), enabled = ? WHERE id = ? """; client.preparedQuery(sql) .execute(Tuple.of( + parser.getString("type"), parser.getString("name"), parser.getString("displayName"), parser.getString("description"), @@ -182,6 +195,10 @@ public class DbServiceImpl implements DbService { id )) .onSuccess(res -> { + if (res.rowCount() == 0) { + promise.complete(JsonResult.error("解析器不存在").toJsonObject()); + return; + } promise.complete(JsonResult.success("更新成功").toJsonObject()); }) .onFailure(e -> { @@ -230,6 +247,27 @@ public class DbServiceImpl implements DbService { return promise.future(); } + @Override + public Future playgroundParserTypeExists(String type, Long excludeId) { + JDBCPool client = JDBCPoolInit.instance().getPool(); + Promise promise = Promise.promise(); + + String sql = excludeId == null + ? "SELECT COUNT(*) as count FROM playground_parser WHERE type = ?" + : "SELECT COUNT(*) as count FROM playground_parser WHERE type = ? AND id <> ?"; + Tuple params = excludeId == null ? Tuple.of(type) : Tuple.of(type, excludeId); + + client.preparedQuery(sql).execute(params).onSuccess(rows -> { + Integer count = rows.iterator().next().getInteger("count"); + promise.complete(count != null && count > 0); + }).onFailure(e -> { + log.error("playgroundParserTypeExists failed", e); + promise.fail(e); + }); + + return promise.future(); + } + @Override public Future getPlaygroundParserById(Long id) { JDBCPool client = JDBCPoolInit.instance().getPool(); @@ -242,28 +280,7 @@ public class DbServiceImpl implements DbService { .onSuccess(rows -> { if (rows.size() > 0) { Row row = rows.iterator().next(); - JsonObject parser = new JsonObject(); - parser.put("id", row.getLong("id")); - parser.put("name", row.getString("name")); - parser.put("type", row.getString("type")); - parser.put("displayName", row.getString("display_name")); - parser.put("description", row.getString("description")); - parser.put("author", row.getString("author")); - parser.put("version", row.getString("version")); - parser.put("matchPattern", row.getString("match_pattern")); - parser.put("jsCode", row.getString("js_code")); - parser.put("ip", row.getString("ip")); - // 将LocalDateTime转换为字符串格式,避免序列化为数组 - var createTime = row.getLocalDateTime("create_time"); - if (createTime != null) { - parser.put("createTime", createTime.toString().replace("T", " ")); - } - var updateTime = row.getLocalDateTime("update_time"); - if (updateTime != null) { - parser.put("updateTime", updateTime.toString().replace("T", " ")); - } - parser.put("enabled", row.getBoolean("enabled")); - promise.complete(JsonResult.data(parser).toJsonObject()); + promise.complete(JsonResult.data(toPlaygroundParserJson(row, true)).toJsonObject()); } else { promise.fail("解析器不存在"); } @@ -276,6 +293,32 @@ public class DbServiceImpl implements DbService { return promise.future(); } + private JsonObject toPlaygroundParserJson(Row row, boolean includeJsCode) { + JsonObject parser = new JsonObject(); + parser.put("id", row.getLong("id")); + parser.put("name", row.getString("name")); + parser.put("type", row.getString("type")); + parser.put("displayName", row.getString("display_name")); + parser.put("description", row.getString("description")); + parser.put("author", row.getString("author")); + parser.put("version", row.getString("version")); + parser.put("matchPattern", row.getString("match_pattern")); + if (includeJsCode) { + parser.put("jsCode", row.getString("js_code")); + } + parser.put("ip", row.getString("ip")); + var createTime = row.getLocalDateTime("create_time"); + if (createTime != null) { + parser.put("createTime", createTime.toString().replace("T", " ")); + } + var updateTime = row.getLocalDateTime("update_time"); + if (updateTime != null) { + parser.put("updateTime", updateTime.toString().replace("T", " ")); + } + parser.put("enabled", row.getBoolean("enabled")); + return parser; + } + // ========== 捐赠账号相关 ========== @Override diff --git a/web-service/src/main/java/cn/qaiu/lz/web/service/impl/ShoutServiceImpl.java b/web-service/src/main/java/cn/qaiu/lz/web/service/impl/ShoutServiceImpl.java index d1ed531..618857b 100644 --- a/web-service/src/main/java/cn/qaiu/lz/web/service/impl/ShoutServiceImpl.java +++ b/web-service/src/main/java/cn/qaiu/lz/web/service/impl/ShoutServiceImpl.java @@ -3,9 +3,11 @@ package cn.qaiu.lz.web.service.impl; import cn.qaiu.db.pool.JDBCPoolInit; import cn.qaiu.lz.web.service.ShoutService; import cn.qaiu.vx.core.annotaions.Service; +import cn.qaiu.vx.core.util.VertxHolder; import cn.qaiu.vx.core.model.JsonResult; import io.vertx.core.Future; import io.vertx.core.Promise; +import io.vertx.core.Vertx; import io.vertx.core.json.JsonObject; import io.vertx.jdbcclient.JDBCPool; import io.vertx.sqlclient.Tuple; @@ -14,6 +16,8 @@ import lombok.extern.slf4j.Slf4j; import java.time.LocalDateTime; import java.time.ZoneId; import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; @Slf4j @Service @@ -21,10 +25,18 @@ import java.util.Random; public class ShoutServiceImpl implements ShoutService { private static final int CODE_LENGTH = 6; private static final int EXPIRE_HOURS = 24; + private static final long CLEANUP_INTERVAL_MILLIS = 3600_000L; + private static final long CLEANUP_SHUTDOWN_WAIT_MILLIS = 5_000L; + private static final AtomicBoolean CLEANUP_REGISTERED = new AtomicBoolean(false); + private static final AtomicInteger CLEANUP_IN_FLIGHT = new AtomicInteger(0); + private static final Object CLEANUP_MONITOR = new Object(); + private static volatile Long cleanupTimerId; + private static volatile Vertx cleanupVertx; private final JDBCPool jdbcPool = JDBCPoolInit.instance().getPool(); @Override public Future submitMessage(String content, String host) { + registerCleanup(); Promise promise = Promise.promise(); String code = generateRandomCode(); // 判断一下当前code是否存在消息 @@ -50,6 +62,7 @@ public class ShoutServiceImpl implements ShoutService { @Override public Future retrieveMessage(String code) { + registerCleanup(); Promise promise = Promise.promise(); String sql = "SELECT content FROM t_messages WHERE code = ? AND expire_time > NOW()"; @@ -79,6 +92,109 @@ public class ShoutServiceImpl implements ShoutService { jdbcPool.preparedQuery(sql).execute(Tuple.of(code)); } + private static void registerCleanup() { + if (!CLEANUP_REGISTERED.compareAndSet(false, true)) { + return; + } + try { + Vertx vertx = VertxHolder.getVertxInstance(); + cleanupVertx = vertx; + cleanupTimerId = vertx.setPeriodic(CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, + id -> cleanupExpiredMessages()); + cleanupExpiredMessages(); + } catch (Exception e) { + cleanupTimerId = null; + cleanupVertx = null; + CLEANUP_REGISTERED.set(false); + log.warn("注册消息清理任务失败", e); + } + } + + public static void cancelCleanup() { + Long timerId = cleanupTimerId; + Vertx vertx = cleanupVertx; + cleanupTimerId = null; + cleanupVertx = null; + CLEANUP_REGISTERED.set(false); + + if (timerId == null || vertx == null) { + waitForCleanupToFinish(); + return; + } + try { + if (vertx.cancelTimer(timerId)) { + log.info("消息定时清理任务已取消"); + } + } catch (Exception e) { + log.warn("取消消息定时清理任务失败", e); + } + waitForCleanupToFinish(); + } + + private static void cleanupExpiredMessages() { + cleanupStarted(); + boolean asyncCleanupStarted = false; + try { + if (!CLEANUP_REGISTERED.get()) { + return; + } + JDBCPoolInit poolInit = JDBCPoolInit.instance(); + if (poolInit == null || poolInit.getPool() == null) { + log.debug("数据库连接池未就绪,跳过消息定时清理"); + return; + } + JDBCPool pool = poolInit.getPool(); + String sql = "DELETE FROM t_messages WHERE expire_time < NOW()"; + Future> cleanupFuture = pool.query(sql).execute(); + asyncCleanupStarted = true; + cleanupFuture.onSuccess(res -> { + if (res.rowCount() > 0) { + log.info("清理过期消息 {} 条", res.rowCount()); + } + }) + .onFailure(e -> log.warn("清理过期消息失败", e)) + .onComplete(ar -> cleanupFinished()); + } catch (Exception e) { + log.warn("清理过期消息任务启动失败", e); + } finally { + if (!asyncCleanupStarted) { + cleanupFinished(); + } + } + } + + private static void cleanupStarted() { + CLEANUP_IN_FLIGHT.incrementAndGet(); + } + + private static void cleanupFinished() { + if (CLEANUP_IN_FLIGHT.decrementAndGet() <= 0) { + synchronized (CLEANUP_MONITOR) { + CLEANUP_MONITOR.notifyAll(); + } + } + } + + private static void waitForCleanupToFinish() { + long deadline = System.currentTimeMillis() + CLEANUP_SHUTDOWN_WAIT_MILLIS; + synchronized (CLEANUP_MONITOR) { + while (CLEANUP_IN_FLIGHT.get() > 0) { + long waitMillis = deadline - System.currentTimeMillis(); + if (waitMillis <= 0) { + log.warn("等待消息定时清理结束超时,剩余任务数: {}", CLEANUP_IN_FLIGHT.get()); + return; + } + try { + CLEANUP_MONITOR.wait(waitMillis); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.warn("等待消息定时清理结束被中断"); + return; + } + } + } + } + private String generateRandomCode() { Random random = new Random(); StringBuilder sb = new StringBuilder(); diff --git a/web-service/src/main/resources/app-dev.yml b/web-service/src/main/resources/app-dev.yml index b5e3892..4a7f80c 100644 --- a/web-service/src/main/resources/app-dev.yml +++ b/web-service/src/main/resources/app-dev.yml @@ -17,7 +17,7 @@ proxyConf: server-proxy # JS演练场配置 playground: # 是否启用演练场,默认false不启用 - enabled: true + enabled: false # 公开模式,默认false需要密码访问,设为true则无需密码 public: false # 访问密码,建议修改默认密码! @@ -86,12 +86,12 @@ cache: ct: 30 # 城通网盘 ec: 5 # 移动云空间 fc: # 亿方云 - fj: 20 # 小飞机网盘 + fj: 20 # 小飞机网盘 链接有效期约30分钟 fs: # 飞书云盘 - iz: 20 # 蓝奏云优享 + iz: 20 # 蓝奏云优享 链接有效期约30分钟 kd: # 可道云 le: 2879 # 联想乐云 - lz: 20 # 蓝奏云 + lz: 30 # 蓝奏云 链接有效期约45分钟 other: # 其他网盘 p115: 30 # 115网盘 pdb: # Dropbox diff --git a/web-service/src/main/resources/logback.xml b/web-service/src/main/resources/logback.xml index af6dda5..86adaf2 100644 --- a/web-service/src/main/resources/logback.xml +++ b/web-service/src/main/resources/logback.xml @@ -37,10 +37,10 @@ - - 0 + + 20 - 256 + 512 @@ -55,7 +55,7 @@ - +