From 46c6827edae9c220453093f15425bf9e54e96d25 Mon Sep 17 00:00:00 2001 From: yukaidi Date: Wed, 10 Jun 2026 21:19:18 +0800 Subject: [PATCH] fix(core): harden proxy and routing lifecycle --- .../src/main/java/cn/qaiu/vx/core/Deploy.java | 64 +++- .../cn/qaiu/vx/core/base/BaseHttpApi.java | 83 ++++- .../handlerfactory/RouterHandlerFactory.java | 45 ++- .../java/cn/qaiu/vx/core/util/CommonUtil.java | 24 +- .../cn/qaiu/vx/core/util/FutureUtils.java | 40 ++- .../cn/qaiu/vx/core/util/ReflectionUtil.java | 12 +- .../cn/qaiu/vx/core/util/ResponseUtil.java | 23 +- .../cn/qaiu/vx/core/util/SharedDataUtil.java | 9 +- .../vx/core/verticle/HttpProxyVerticle.java | 322 ++++++++++++++---- .../core/verticle/ReverseProxyVerticle.java | 183 ++++++++-- .../qaiu/vx/core/verticle/RouterVerticle.java | 13 +- 11 files changed, 644 insertions(+), 174 deletions(-) diff --git a/core/src/main/java/cn/qaiu/vx/core/Deploy.java b/core/src/main/java/cn/qaiu/vx/core/Deploy.java index 5fb606f..3b3103f 100644 --- a/core/src/main/java/cn/qaiu/vx/core/Deploy.java +++ b/core/src/main/java/cn/qaiu/vx/core/Deploy.java @@ -21,6 +21,8 @@ import java.nio.file.Path; import java.util.Calendar; import java.util.Date; import java.util.UUID; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.locks.LockSupport; import static cn.qaiu.vx.core.util.ConfigConstant.*; @@ -45,11 +47,25 @@ public final class Deploy { private Handler handle; private Thread mainThread; + private final List preShutdownTasks = new CopyOnWriteArrayList<>(); + private final List postShutdownTasks = new CopyOnWriteArrayList<>(); public static Deploy instance() { return INSTANCE; } + public void addPreShutdownTask(Runnable task) { + if (task != null) { + preShutdownTasks.add(task); + } + } + + public void addPostShutdownTask(Runnable task) { + if (task != null) { + postShutdownTasks.add(task); + } + } + /** * * @param args 启动参数 @@ -133,9 +149,16 @@ public final class Deploy { customConfig = globalConfig.getJsonObject(CUSTOM); JsonObject vertxConfig = globalConfig.getJsonObject(VERTX); - Integer vertxConfigELPS = vertxConfig.getInteger(EVENT_LOOP_POOL_SIZE); - var vertxOptions = vertxConfigELPS == 0 ? - new VertxOptions() : new VertxOptions(vertxConfig); + JsonObject vertxOptionsConfig = vertxConfig.copy(); + if (vertxOptionsConfig.getInteger(EVENT_LOOP_POOL_SIZE, 0) == 0) { + vertxOptionsConfig.remove(EVENT_LOOP_POOL_SIZE); + } + if (vertxOptionsConfig.getInteger("workerPoolSize", 0) == 0) { + vertxOptionsConfig.remove("workerPoolSize"); + } + Integer vertxConfigELPS = vertxConfig.getInteger(EVENT_LOOP_POOL_SIZE, 0); + var vertxOptions = vertxOptionsConfig.isEmpty() ? + new VertxOptions() : new VertxOptions(vertxOptionsConfig); // vertxOptions.setAddressResolverOptions( // new AddressResolverOptions(). @@ -151,12 +174,16 @@ public final class Deploy { // 注册 ShutdownHook,确保进程退出时优雅关闭资源 Runtime.getRuntime().addShutdownHook(new Thread(() -> { - LOGGER.info("JVM shutting down, closing Vert.x..."); + LOGGER.info("JVM shutting down..."); + runShutdownTasks("before Vert.x close", preShutdownTasks); try { + LOGGER.info("Closing Vert.x..."); vertx.close().toCompletionStage().toCompletableFuture().get(10, java.util.concurrent.TimeUnit.SECONDS); LOGGER.info("Vert.x closed successfully"); } catch (Exception e) { LOGGER.warn("Vert.x close error or timeout", e); + } finally { + runShutdownTasks("after Vert.x close", postShutdownTasks); } })); //配置保存在共享数据中 @@ -165,24 +192,25 @@ public final class Deploy { localMap.put(GLOBAL_CONFIG, globalConfig); localMap.put(CUSTOM_CONFIG, customConfig); localMap.put(SERVER, globalConfig.getJsonObject(SERVER)); - var future0 = vertx.createSharedWorkerExecutor("other-handle") - .executeBlocking(() -> { + WorkerExecutor otherHandleExecutor = vertx.createSharedWorkerExecutor("other-handle"); + var future0 = otherHandleExecutor.executeBlocking(() -> { handle.handle(globalConfig); return "Other handle complete"; }); future0.onSuccess(res -> { + otherHandleExecutor.close(); LOGGER.info(res); // 部署 路由、异步service、反向代理 服务 var future1 = vertx.deployVerticle(RouterVerticle.class, getWorkDeploymentOptions("Router")); var future2 = vertx.deployVerticle(ServiceVerticle.class, getWorkDeploymentOptions("Service")); - var future3 = vertx.deployVerticle(ReverseProxyVerticle.class, getWorkDeploymentOptions("proxy")); + var future3 = vertx.deployVerticle(ReverseProxyVerticle.class, getWorkDeploymentOptions("proxy", 1)); JsonObject jsonObject = ((JsonObject) localMap.get(GLOBAL_CONFIG)).getJsonObject("proxy-server"); if (jsonObject != null) { genPwd(jsonObject); - var future4 = vertx.deployVerticle(HttpProxyVerticle.class, getWorkDeploymentOptions("proxy")); + var future4 = vertx.deployVerticle(HttpProxyVerticle.class, getWorkDeploymentOptions("proxy", 1)); future4.onSuccess(LOGGER::info); future4.onFailure(e -> LOGGER.error("Other handle error", e)); Future.all(future1, future2, future3, future4) @@ -194,7 +222,10 @@ public final class Deploy { .onFailure(this::deployVerticalFailed); } - }).onFailure(e -> LOGGER.error("Other handle error", e)); + }).onFailure(e -> { + otherHandleExecutor.close(); + LOGGER.error("Other handle error", e); + }); } private static void genPwd(JsonObject jsonObject) { @@ -211,6 +242,21 @@ public final class Deploy { jsonObject.getString("password")); LOGGER.info("==============server info================"); } + + private static void runShutdownTasks(String stage, List tasks) { + if (tasks.isEmpty()) { + return; + } + LOGGER.info("Running {} shutdown tasks: {}", stage, tasks.size()); + for (Runnable task : tasks) { + try { + task.run(); + } catch (Exception e) { + LOGGER.warn("Shutdown task failed at stage {}", stage, e); + } + } + } + /** * 部署失败 * diff --git a/core/src/main/java/cn/qaiu/vx/core/base/BaseHttpApi.java b/core/src/main/java/cn/qaiu/vx/core/base/BaseHttpApi.java index 0b8261c..d3d27a5 100644 --- a/core/src/main/java/cn/qaiu/vx/core/base/BaseHttpApi.java +++ b/core/src/main/java/cn/qaiu/vx/core/base/BaseHttpApi.java @@ -1,14 +1,20 @@ package cn.qaiu.vx.core.base; +import cn.qaiu.vx.core.annotaions.HandleSortFilter; import cn.qaiu.vx.core.interceptor.AfterInterceptor; import cn.qaiu.vx.core.model.JsonResult; -import cn.qaiu.vx.core.util.CommonUtil; import cn.qaiu.vx.core.util.ReflectionUtil; import io.vertx.core.json.JsonObject; import io.vertx.ext.web.RoutingContext; import org.reflections.Reflections; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.Objects; import java.util.Set; +import java.util.stream.Collectors; import static cn.qaiu.vx.core.util.ResponseUtil.*; @@ -22,9 +28,10 @@ public interface BaseHttpApi { // 需要扫描注册的Router路径 Reflections reflections = ReflectionUtil.getReflections(); + Logger LOGGER = LoggerFactory.getLogger(BaseHttpApi.class); default void doFireJsonObjectResponse(RoutingContext ctx, JsonObject jsonObject) { - if (!ctx.response().ended()) { + if (!isResponseDone(ctx)) { fireJsonObjectResponse(ctx, jsonObject); } handleAfterInterceptor(ctx, jsonObject); @@ -32,14 +39,14 @@ public interface BaseHttpApi { default void doFireJsonResultResponse(RoutingContext ctx, JsonResult jsonResult) { - if (!ctx.response().ended()) { + if (!isResponseDone(ctx)) { fireJsonResultResponse(ctx, jsonResult); } handleAfterInterceptor(ctx, jsonResult.toJsonObject()); } default void doFireJsonObjectResponse(RoutingContext ctx, JsonObject jsonObject, int statusCode) { - if (!ctx.response().ended()) { + if (!isResponseDone(ctx)) { fireJsonObjectResponse(ctx, jsonObject, statusCode); } handleAfterInterceptor(ctx, jsonObject); @@ -47,30 +54,78 @@ public interface BaseHttpApi { default void doFireJsonResultResponse(RoutingContext ctx, JsonResult jsonResult, int statusCode) { - if (!ctx.response().ended()) { + if (!isResponseDone(ctx)) { fireJsonResultResponse(ctx, jsonResult, statusCode); } handleAfterInterceptor(ctx, jsonResult.toJsonObject()); } default Set getAfterInterceptor() { + return AfterInterceptorHolder.INSTANCES; + } - Set> afterInterceptorClassSet = - reflections.getSubTypesOf(AfterInterceptor.class); - if (afterInterceptorClassSet == null) { - return null; + class AfterInterceptorHolder { + private static final Set INSTANCES = loadAfterInterceptors(); + + private static Set loadAfterInterceptors() { + Set> afterInterceptorClassSet = + reflections.getSubTypesOf(AfterInterceptor.class); + if (afterInterceptorClassSet == null || afterInterceptorClassSet.isEmpty()) { + return Collections.emptySet(); + } + return afterInterceptorClassSet.stream() + .filter(AfterInterceptorHolder::isEnabled) + .sorted(AfterInterceptorHolder::compareOrder) + .map(AfterInterceptorHolder::newInterceptor) + .filter(Objects::nonNull) + .collect(Collectors.collectingAndThen( + Collectors.toCollection(LinkedHashSet::new), + Collections::unmodifiableSet)); + } + + private static boolean isEnabled(Class clazz) { + HandleSortFilter sort = clazz.getAnnotation(HandleSortFilter.class); + return sort == null || sort.value() >= 0; + } + + private static int compareOrder(Class left, Class right) { + return Integer.compare(order(left), order(right)); + } + + private static int order(Class clazz) { + HandleSortFilter sort = clazz.getAnnotation(HandleSortFilter.class); + return sort == null ? 0 : sort.value(); + } + + private static AfterInterceptor newInterceptor(Class clazz) { + try { + return ReflectionUtil.newWithNoParam(clazz); + } catch (Exception e) { + LOGGER.warn("AfterInterceptor 初始化失败,已跳过: {}", clazz.getName(), e); + return null; + } } - return CommonUtil.sortClassSet(afterInterceptorClassSet); } default void handleAfterInterceptor(RoutingContext ctx, JsonObject jsonObject) { - Set afterInterceptor = getAfterInterceptor(); - if (afterInterceptor != null) { - afterInterceptor.forEach(ai -> ai.handle(ctx, jsonObject)); + if (ctx.response().closed()) { + return; } - if (!ctx.response().ended()) { + Set afterInterceptor = getAfterInterceptor(); + afterInterceptor.forEach(ai -> { + try { + ai.handle(ctx, jsonObject); + } catch (Exception e) { + LOGGER.warn("AfterInterceptor 执行失败: {}", ai.getClass().getName(), e); + } + }); + if (!isResponseDone(ctx)) { fireTextResponse(ctx, "handleAfterInterceptor: response not end"); } } + default boolean isResponseDone(RoutingContext ctx) { + return ctx.response().ended() || ctx.response().closed(); + } + } diff --git a/core/src/main/java/cn/qaiu/vx/core/handlerfactory/RouterHandlerFactory.java b/core/src/main/java/cn/qaiu/vx/core/handlerfactory/RouterHandlerFactory.java index 92bb224..7d4e711 100644 --- a/core/src/main/java/cn/qaiu/vx/core/handlerfactory/RouterHandlerFactory.java +++ b/core/src/main/java/cn/qaiu/vx/core/handlerfactory/RouterHandlerFactory.java @@ -34,6 +34,7 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.time.LocalDateTime; import java.util.*; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -96,7 +97,9 @@ public class RouterHandlerFactory implements BaseHttpApi { mainRouter.route().handler(CorsHandler.create().addRelativeOrigin(".*").allowCredentials(true).allowedMethods(httpMethods)); // 配置文件上传路径 - mainRouter.route().handler(BodyHandler.create().setUploadsDirectory("uploads")); + mainRouter.route().handler(BodyHandler.create() + .setUploadsDirectory("uploads") + .setBodyLimit(2L * 1024 * 1024)); // 拦截器 Set> interceptorSet = getInterceptorSet(); @@ -175,7 +178,7 @@ public class RouterHandlerFactory implements BaseHttpApi { route.handler(TimeoutHandler.create(SharedDataUtil.getCustomConfig().getInteger(ROUTE_TIME_OUT))); route.handler(ResponseTimeHandler.create()); route.handler(ctx -> handlerMethod(instance, method, ctx)).failureHandler(ctx -> { - if (ctx.response().ended()) return; + if (isResponseDone(ctx)) return; // 超时处理器状态码503 if (ctx.statusCode() == 503 || ctx.failure() == null) { doFireJsonResultResponse(ctx, JsonResult.error("未知异常, 请联系管理员"), 503); @@ -394,26 +397,34 @@ public class RouterHandlerFactory implements BaseHttpApi { if (data instanceof JsonResult jsonResult) { doFireJsonResultResponse(ctx, (JsonResult) data, jsonResult.getCode()); - } - if (data instanceof JsonObject) { + } else if (data instanceof JsonObject) { doFireJsonObjectResponse(ctx, ((JsonObject) data)); } else if (data instanceof Future) { // 处理异步响应 - ((Future) data).onSuccess(res -> { - if (res instanceof JsonResult jsonResult) { - doFireJsonResultResponse(ctx, jsonResult, jsonResult.getCode()); + Future responseFuture = (Future) data; + AtomicReference ctxRef = new AtomicReference<>(ctx); + ctx.addEndHandler(v -> ctxRef.set(null)); + responseFuture.onComplete(ar -> { + RoutingContext responseCtx = ctxRef.getAndSet(null); + if (responseCtx == null || isResponseDone(responseCtx)) { + return; } - if (res instanceof JsonObject) { - doFireJsonObjectResponse(ctx, ((JsonObject) res)); - } else if (res != null) { - doFireJsonResultResponse(ctx, JsonResult.data(res)); + if (ar.succeeded()) { + Object res = ar.result(); + if (res instanceof JsonResult jsonResult) { + doFireJsonResultResponse(responseCtx, jsonResult, jsonResult.getCode()); + } else if (res instanceof JsonObject) { + doFireJsonObjectResponse(responseCtx, ((JsonObject) res)); + } else if (res != null) { + doFireJsonResultResponse(responseCtx, JsonResult.data(res)); + } else { + doFireJsonResultResponse(responseCtx, JsonResult.data(null)); + } } else { - doFireJsonResultResponse(ctx, JsonResult.data(null)); + Throwable e = ar.cause(); + LOGGER.error("请求处理失败", e); + String msg = e != null && e.getMessage() != null ? e.getMessage() : "服务器内部错误"; + doFireJsonResultResponse(responseCtx, JsonResult.error(msg), 500); } - - }).onFailure(e -> { - LOGGER.error("请求处理失败", e); - String msg = e.getMessage() != null ? e.getMessage() : "服务器内部错误"; - doFireJsonResultResponse(ctx, JsonResult.error(msg), 500); }); } else { doFireJsonResultResponse(ctx, JsonResult.data(data)); diff --git a/core/src/main/java/cn/qaiu/vx/core/util/CommonUtil.java b/core/src/main/java/cn/qaiu/vx/core/util/CommonUtil.java index 3f77baa..6bf4c8b 100644 --- a/core/src/main/java/cn/qaiu/vx/core/util/CommonUtil.java +++ b/core/src/main/java/cn/qaiu/vx/core/util/CommonUtil.java @@ -17,6 +17,8 @@ import java.util.LinkedHashSet; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.regex.Pattern; import java.util.stream.Collectors; /** @@ -29,6 +31,16 @@ public class CommonUtil { private static final Logger LOGGER = LoggerFactory.getLogger(CommonUtil.class); + /** 正则表达式缓存,避免每次调用重新编译 */ + private static final ConcurrentHashMap PATTERN_CACHE = new ConcurrentHashMap<>(); + + /** + * 获取预编译的 Pattern(带缓存) + */ + private static Pattern getCachedPattern(String regex) { + return PATTERN_CACHE.computeIfAbsent(regex, Pattern::compile); + } + /** * 匹配正则list * @@ -39,7 +51,7 @@ public class CommonUtil { public static boolean matchRegList(List regList, String destStr) { // 判断是否忽略 for (Object ignores : regList) { - if (destStr.matches(ignores.toString())) { + if (getCachedPattern(ignores.toString()).matcher(destStr).matches()) { return true; } } @@ -147,10 +159,12 @@ public class CommonUtil { public static String getAppVersion() { if (null == appVersion) { Properties properties = new Properties(); - try { - properties.load(CommonUtil.class.getClassLoader().getResourceAsStream("app.properties")); - if (!properties.isEmpty()) { - appVersion = properties.getProperty("app.version") + "build" + properties.getProperty("build"); + try (var is = CommonUtil.class.getClassLoader().getResourceAsStream("app.properties")) { + if (is != null) { + properties.load(is); + if (!properties.isEmpty()) { + appVersion = properties.getProperty("app.version") + "build" + properties.getProperty("build"); + } } } catch (IOException e) { LOGGER.error("读取app.properties失败", e); diff --git a/core/src/main/java/cn/qaiu/vx/core/util/FutureUtils.java b/core/src/main/java/cn/qaiu/vx/core/util/FutureUtils.java index f008994..dfc277f 100644 --- a/core/src/main/java/cn/qaiu/vx/core/util/FutureUtils.java +++ b/core/src/main/java/cn/qaiu/vx/core/util/FutureUtils.java @@ -4,17 +4,41 @@ import io.vertx.core.Future; import io.vertx.core.Promise; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; public class FutureUtils { - public static T getResult(Future future) { - try { - return future.toCompletionStage().toCompletableFuture().get(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } - } + /** 默认同步等待超时时间(秒) */ + private static final long DEFAULT_TIMEOUT_SECONDS = 120; + + public static T getResult(Future future) { + try { + return future.toCompletionStage().toCompletableFuture() + .get(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("线程被中断", e); + } catch (TimeoutException e) { + throw new RuntimeException("等待Future超时(" + DEFAULT_TIMEOUT_SECONDS + "秒)", e); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + throw new RuntimeException(cause != null ? cause : e); + } + } + public static T getResult(Promise promise) { - return promise.future().toCompletionStage().toCompletableFuture().join(); + try { + return promise.future().toCompletionStage().toCompletableFuture() + .get(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("线程被中断", e); + } catch (TimeoutException e) { + throw new RuntimeException("等待Promise超时(" + DEFAULT_TIMEOUT_SECONDS + "秒)", e); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + throw new RuntimeException(cause != null ? cause : e); + } } } diff --git a/core/src/main/java/cn/qaiu/vx/core/util/ReflectionUtil.java b/core/src/main/java/cn/qaiu/vx/core/util/ReflectionUtil.java index 8cb1b50..95290de 100644 --- a/core/src/main/java/cn/qaiu/vx/core/util/ReflectionUtil.java +++ b/core/src/main/java/cn/qaiu/vx/core/util/ReflectionUtil.java @@ -24,6 +24,7 @@ import java.lang.reflect.Method; import java.net.URL; import java.text.ParseException; import java.util.*; +import java.util.regex.Pattern; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,6 +45,12 @@ public final class ReflectionUtil { // 缓存Reflections实例,避免重复扫描(每次扫描约35K+值,耗时1-3秒,占用大量内存) private static final Map REFLECTIONS_CACHE = new java.util.concurrent.ConcurrentHashMap<>(); + // 预编译的类型匹配正则,避免每次请求重新编译 + private static final Pattern BASIC_TYPE_PATTERN = Pattern.compile( + "^java\\.lang\\.((Boolean)|(Character)|(Byte)|(Short)|(Integer)|(Long)|(Float)|(Double)|(String))$"); + private static final Pattern BASIC_TYPE_ARRAY_PATTERN = Pattern.compile( + "^(boolean|char|byte|short|int|long|float|double|String)\\[]$"); + /** * 以默认配置的基础包路径获取反射器 * @@ -234,8 +241,7 @@ public final class ReflectionUtil { if (ctClass.isPrimitive() || "java.util.Date".equals(ctClass.getName())) { return true; } - return ctClass.getName().matches("^java\\.lang\\.((Boolean)|(Character)|(Byte)|(Short)|(Integer)|(Long)|" + - "(Float)|(Double)|(String))$"); + return BASIC_TYPE_PATTERN.matcher(ctClass.getName()).matches(); } /** @@ -246,7 +252,7 @@ public final class ReflectionUtil { public static boolean isBasicTypeArray(CtClass ctClass) { if (!ctClass.isArray()) { return false; - } else return (ctClass.getName().matches("^(boolean|char|byte|short|int|long|float|double|String)\\[]$")); + } else return BASIC_TYPE_ARRAY_PATTERN.matcher(ctClass.getName()).matches(); } /** diff --git a/core/src/main/java/cn/qaiu/vx/core/util/ResponseUtil.java b/core/src/main/java/cn/qaiu/vx/core/util/ResponseUtil.java index e6f259a..9d298f9 100644 --- a/core/src/main/java/cn/qaiu/vx/core/util/ResponseUtil.java +++ b/core/src/main/java/cn/qaiu/vx/core/util/ResponseUtil.java @@ -12,14 +12,21 @@ import static io.vertx.core.http.HttpHeaders.CONTENT_TYPE; public class ResponseUtil { public static void redirect(HttpServerResponse response, String url) { + if (response.ended() || response.closed()) { + return; + } response.putHeader(CONTENT_TYPE, "text/html; charset=utf-8") .putHeader("Referrer-Policy", "no-referrer") .putHeader(HttpHeaders.LOCATION, url).setStatusCode(302).end(); } public static void redirect(HttpServerResponse response, String url, Promise promise) { - redirect(response, url); - promise.complete(); + try { + redirect(response, url); + promise.tryComplete(); + } catch (Throwable t) { + promise.tryFail(t); + } } public static void fireJsonObjectResponse(RoutingContext ctx, JsonObject jsonObject) { @@ -31,12 +38,18 @@ public class ResponseUtil { } public static void fireJsonObjectResponse(RoutingContext ctx, JsonObject jsonObject, int statusCode) { + if (ctx.response().ended() || ctx.response().closed()) { + return; + } ctx.response().putHeader(CONTENT_TYPE, "application/json; charset=utf-8") .setStatusCode(statusCode) .end(jsonObject.encode()); } public static void fireJsonObjectResponse(HttpServerResponse ctx, JsonObject jsonObject, int statusCode) { + if (ctx.ended() || ctx.closed()) { + return; + } ctx.putHeader(CONTENT_TYPE, "application/json; charset=utf-8") .setStatusCode(statusCode) .end(jsonObject.encode()); @@ -55,10 +68,16 @@ public class ResponseUtil { } public static void fireTextResponse(RoutingContext ctx, String text) { + if (ctx.response().ended() || ctx.response().closed()) { + return; + } ctx.response().putHeader(CONTENT_TYPE, "text/html; charset=utf-8").end(text); } public static void sendError(RoutingContext ctx, int statusCode) { + if (ctx.response().ended() || ctx.response().closed()) { + return; + } ctx.response().setStatusCode(statusCode).end(); } } diff --git a/core/src/main/java/cn/qaiu/vx/core/util/SharedDataUtil.java b/core/src/main/java/cn/qaiu/vx/core/util/SharedDataUtil.java index 5181e42..a184caa 100644 --- a/core/src/main/java/cn/qaiu/vx/core/util/SharedDataUtil.java +++ b/core/src/main/java/cn/qaiu/vx/core/util/SharedDataUtil.java @@ -3,7 +3,6 @@ package cn.qaiu.vx.core.util; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.core.shareddata.LocalMap; -import io.vertx.core.shareddata.SharedData; /** * vertx 共享数据 @@ -13,10 +12,8 @@ import io.vertx.core.shareddata.SharedData; */ public class SharedDataUtil { - private static final SharedData sharedData = VertxHolder.getVertxInstance().sharedData(); - - public static SharedData shareData() { - return sharedData; + public static io.vertx.core.shareddata.SharedData shareData() { + return VertxHolder.getVertxInstance().sharedData(); } public static LocalMap getLocalMap(String key) { @@ -24,7 +21,7 @@ public class SharedDataUtil { } public static LocalMap getLocalMapWithCast(String key) { - return sharedData.getLocalMap(key); + return shareData().getLocalMap(key); } public static JsonObject getJsonConfig(String key) { diff --git a/core/src/main/java/cn/qaiu/vx/core/verticle/HttpProxyVerticle.java b/core/src/main/java/cn/qaiu/vx/core/verticle/HttpProxyVerticle.java index c6fdfc3..3ddff9a 100644 --- a/core/src/main/java/cn/qaiu/vx/core/verticle/HttpProxyVerticle.java +++ b/core/src/main/java/cn/qaiu/vx/core/verticle/HttpProxyVerticle.java @@ -1,10 +1,13 @@ package cn.qaiu.vx.core.verticle; import io.vertx.core.AbstractVerticle; +import io.vertx.core.Future; +import io.vertx.core.Promise; import io.vertx.core.http.*; import io.vertx.core.json.JsonObject; import io.vertx.core.net.NetClient; import io.vertx.core.net.NetClientOptions; +import io.vertx.core.net.NetSocket; import io.vertx.core.net.ProxyOptions; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -24,13 +27,16 @@ public class HttpProxyVerticle extends AbstractVerticle { private HttpClient httpClient; private NetClient netClient; + private HttpServer httpServer; + private volatile boolean stopping = false; private JsonObject proxyPreConf; private JsonObject proxyServerConf; @Override - public void start() { + public void start(io.vertx.core.Promise startPromise) { + stopping = false; proxyServerConf = ((JsonObject)vertx.sharedData().getLocalMap(LOCAL).get(GLOBAL_CONFIG)).getJsonObject("proxy-server"); proxyPreConf = ((JsonObject)vertx.sharedData().getLocalMap(LOCAL).get(GLOBAL_CONFIG)).getJsonObject("proxy-pre"); Integer serverPort = proxyServerConf.getInteger("port"); @@ -41,7 +47,12 @@ public class HttpProxyVerticle extends AbstractVerticle { } // 初始化 HTTP 客户端,用于向目标服务器发送 HTTP 请求 - HttpClientOptions httpClientOptions = new HttpClientOptions(); + HttpClientOptions httpClientOptions = new HttpClientOptions() + .setMaxPoolSize(64) + .setMaxWaitQueueSize(256) + .setConnectTimeout(15000) + .setIdleTimeout(60) + .setKeepAlive(true); if (proxyOptions != null) { httpClientOptions.setProxyOptions(proxyOptions); } @@ -54,14 +65,14 @@ public class HttpProxyVerticle extends AbstractVerticle { httpServerOptions.setClientAuth(ClientAuth.REQUIRED); } - HttpServer server = vertx.createHttpServer(); - server.requestHandler(this::handleClientRequest); + httpServer = vertx.createHttpServer(httpServerOptions); + httpServer.requestHandler(this::handleClientRequest); // 初始化 NetClient,用于在 CONNECT 请求中建立 TCP 连接隧道 NetClientOptions netClientOptions = new NetClientOptions(); if (proxyOptions != null) { - httpClientOptions.setProxyOptions(proxyOptions); + netClientOptions.setProxyOptions(proxyOptions); } netClient = vertx.createNetClient(netClientOptions @@ -69,16 +80,22 @@ public class HttpProxyVerticle extends AbstractVerticle { .setTrustAll(true)); // 启动 HTTP 代理服务器 - server.listen(serverPort) - .onSuccess(res-> LOGGER.info("HTTP Proxy server started on port {}", serverPort)) - .onFailure(err-> LOGGER.error("Failed to start HTTP Proxy server: " + err.getMessage())); + httpServer.listen(serverPort) + .onSuccess(res -> { + LOGGER.info("HTTP Proxy server started on port {}", serverPort); + startPromise.complete(); + }) + .onFailure(err -> { + LOGGER.error("Failed to start HTTP Proxy server: " + err.getMessage(), err); + closeClients().onComplete(close -> startPromise.fail(err)); + }); } // 处理 HTTP CONNECT 请求,用于代理 HTTPS 流量 private void handleConnectRequest(HttpServerRequest clientRequest) { String[] uriParts = clientRequest.uri().split(":"); if (uriParts.length != 2) { - clientRequest.response().setStatusCode(400).end("Bad Request: Invalid URI format"); + failClientResponse(clientRequest.response(), 400, "Bad Request: Invalid URI format"); return; } @@ -88,45 +105,61 @@ public class HttpProxyVerticle extends AbstractVerticle { try { targetPort = Integer.parseInt(uriParts[1]); } catch (NumberFormatException e) { - clientRequest.response().setStatusCode(400).end("Bad Request: Invalid port"); + failClientResponse(clientRequest.response(), 400, "Bad Request: Invalid port"); return; } clientRequest.pause(); // 通过 NetClient 连接目标服务器并创建隧道 - netClient.connect(targetPort, targetHost) - .onSuccess(targetSocket -> { - // Upgrade client connection to NetSocket and implement bidirectional data flow - clientRequest.toNetSocket() - .onSuccess(clientSocket -> { - // Set up bidirectional data forwarding - clientSocket.handler(targetSocket::write); - targetSocket.handler(clientSocket::write); + try { + netClient.connect(targetPort, targetHost) + .onSuccess(targetSocket -> { + // Upgrade client connection to NetSocket and implement bidirectional data flow + clientRequest.toNetSocket() + .onSuccess(clientSocket -> { + clientSocket.pipeTo(targetSocket) + .onFailure(err -> { + LOGGER.debug("CONNECT client -> target pipe closed", err); + closeTunnelSockets(clientSocket, targetSocket); + }); + targetSocket.pipeTo(clientSocket) + .onFailure(err -> { + LOGGER.debug("CONNECT target -> client pipe closed", err); + closeTunnelSockets(clientSocket, targetSocket); + }); - // Close the other socket when one side closes - clientSocket.closeHandler(v -> targetSocket.close()); - targetSocket.closeHandler(v -> clientSocket.close()); - }) - .onFailure(clientSocketAttempt -> { - System.err.println("Failed to upgrade client connection to socket: " + clientSocketAttempt.getMessage()); - targetSocket.close(); - clientRequest.response().setStatusCode(500).end("Internal Server Error"); - }); - }) - .onFailure(connectionAttempt -> { - System.err.println("Failed to connect to target: " + connectionAttempt.getMessage()); - clientRequest.response().setStatusCode(502).end("Bad Gateway: Unable to connect to target"); - }); + // Close the other socket when one side closes + clientSocket.closeHandler(v -> targetSocket.close()); + targetSocket.closeHandler(v -> clientSocket.close()); + }) + .onFailure(clientSocketAttempt -> { + System.err.println("Failed to upgrade client connection to socket: " + clientSocketAttempt.getMessage()); + targetSocket.close(); + failClientRequestAndClose(clientRequest, 500, "Internal Server Error"); + }); + }) + .onFailure(connectionAttempt -> { + LOGGER.warn("Failed to connect to target: {}", connectionAttempt.getMessage()); + failClientRequestAndClose(clientRequest, 502, "Bad Gateway: Unable to connect to target"); + }); + } catch (Exception e) { + LOGGER.warn("CONNECT 请求创建失败", e); + failClientRequestAndClose(clientRequest, 502, "Bad Gateway: Unable to connect to target"); + } } // 处理客户端的 HTTP 请求 private void handleClientRequest(HttpServerRequest clientRequest) { + if (stopping) { + failClientResponse(clientRequest.response(), 503, "Service Unavailable"); + return; + } // 打印来源ip和访问目标URI LOGGER.debug("source: {}, target: {}", clientRequest.remoteAddress().toString(), clientRequest.uri()); if (proxyServerConf.containsKey("username") && StringUtils.isNotBlank(proxyServerConf.getString("username"))) { String s = clientRequest.headers().get("Proxy-Authorization"); if (s == null) { - clientRequest.response().setStatusCode(403).end(); + failClientResponse(clientRequest.response(), 403, null); return; } String[] split; @@ -134,19 +167,19 @@ public class HttpProxyVerticle extends AbstractVerticle { split = new String(Base64.getDecoder().decode(s.replace("Basic ", ""))).split(":"); } catch (IllegalArgumentException e) { LOGGER.warn("Proxy-Authorization header is not valid Base64"); - clientRequest.response().setStatusCode(403).end(); + failClientResponse(clientRequest.response(), 403, null); return; } if (split.length <= 1) { LOGGER.warn("Proxy-Authorization header format invalid: missing username:password separator"); - clientRequest.response().setStatusCode(403).end(); + failClientResponse(clientRequest.response(), 403, null); return; } String username = proxyServerConf.getString("username"); String password = proxyServerConf.getString("password"); if (!split[0].equals(username) || !split[1].equals(password)) { LOGGER.info("-----auth failed------\nusername: {}", split[0]); - clientRequest.response().setStatusCode(403).end(); + failClientResponse(clientRequest.response(), 403, null); return; } } @@ -165,40 +198,147 @@ public class HttpProxyVerticle extends AbstractVerticle { // 获取目标主机 String hostHeader = clientRequest.getHeader("Host"); if (hostHeader == null) { - clientRequest.response().setStatusCode(400).end("Host header is missing"); + failClientResponse(clientRequest.response(), 400, "Host header is missing"); return; } - String targetHost = hostHeader.split(":")[0]; - int targetPort = extractPortFromUrl(clientRequest.uri()); // 默认为 HTTP 的端口 - clientRequest.pause(); // 暂停客户端请求的读取,避免数据丢失 + HostAndPort target; + try { + target = parseHostHeader(hostHeader); + } catch (IllegalArgumentException e) { + failClientResponse(clientRequest.response(), 400, "Bad Request: Invalid Host header"); + return; + } + String targetHost = target.host(); + int targetPort = extractPortFromUrl(clientRequest.uri(), target.port()); // 默认为 HTTP 的端口 + if (targetPort <= 0) { + failClientResponse(clientRequest.response(), 400, "Bad Request: Invalid target port"); + return; + } + clientRequest.pause(); // 暂停客户端请求的读取,等上游请求创建完成 - httpClient.request(clientRequest.method(), targetPort, targetHost, clientRequest.uri()) - .onSuccess(request -> { - clientRequest.resume(); // 恢复客户端请求的读取 + try { + httpClient.request(clientRequest.method(), targetPort, targetHost, clientRequest.uri()) + .onSuccess(request -> { + // 逐个设置请求头 + clientRequest.headers().forEach(header -> request.putHeader(header.getKey(), header.getValue())); - // 逐个设置请求头 - clientRequest.headers().forEach(header -> request.putHeader(header.getKey(), header.getValue())); + request.response() + .onSuccess(response -> { + HttpServerResponse clientResponse = clientRequest.response(); + if (clientResponse.ended() || clientResponse.closed()) { + response.resume(); + return; + } + clientResponse.setStatusCode(response.statusCode()); + clientResponse.headers().setAll(response.headers()); + response.pipeTo(clientResponse) + .onFailure(err -> { + LOGGER.error("HTTP代理响应转发失败", err); + try { + response.request().reset(); + } catch (Exception e) { + LOGGER.debug("HTTP代理上游响应已关闭", e); + } + failClientRequestAndClose(clientRequest, 502, "Bad Gateway: Unable to reach target"); + }); + }) + .onFailure(err -> { + LOGGER.error("HTTP代理响应失败", err); + try { + request.reset(); + } catch (Exception e) { + LOGGER.debug("HTTP代理上游请求已关闭", e); + } + failClientRequestAndClose(clientRequest, 502, "Bad Gateway: Unable to reach target"); + }); - // 将客户端请求的 body 转发给目标服务器 - clientRequest.bodyHandler(body -> - request.send(body) - .onSuccess(response -> { - clientRequest.response().setStatusCode(response.statusCode()); - clientRequest.response().headers().setAll(response.headers()); - response.body() - .onSuccess(b -> clientRequest.response().end(b)) - .onFailure(err -> clientRequest.response() - .setStatusCode(502).end("Bad Gateway: Unable to reach target")); - }) - .onFailure(err -> clientRequest.response() - .setStatusCode(502).end("Bad Gateway: Unable to reach target")) - ); - }) - .onFailure(err -> { - LOGGER.error("HTTP请求失败", err); - clientRequest.response().setStatusCode(502).end("Bad Gateway: Request failed"); - }); + clientRequest.pipeTo(request) + .onFailure(err -> { + LOGGER.error("HTTP代理请求转发失败", err); + try { + request.reset(); + } catch (Exception e) { + LOGGER.debug("HTTP代理上游请求已关闭", e); + } + failClientRequestAndClose(clientRequest, 502, "Bad Gateway: Unable to reach target"); + }); + clientRequest.resume(); + }) + .onFailure(err -> { + LOGGER.error("HTTP请求失败", err); + failClientRequestAndClose(clientRequest, 502, "Bad Gateway: Request failed"); + }); + } catch (Exception e) { + LOGGER.error("HTTP请求创建失败", e); + failClientRequestAndClose(clientRequest, 502, "Bad Gateway: Request failed"); + } + } + + private void failClientResponse(HttpServerResponse response, String message) { + failClientResponse(response, 502, message); + } + + private void failClientResponse(HttpServerResponse response, int statusCode, String message) { + if (response.ended() || response.closed()) { + return; + } + try { + if (!response.headWritten()) { + response.setStatusCode(statusCode); + if (message == null) { + response.end(); + } else { + response.end(message); + } + } else { + response.reset(); + } + } catch (Exception e) { + LOGGER.debug("客户端响应已关闭,忽略代理错误响应", e); + } + } + + private void failClientRequestAndClose(HttpServerRequest request, int statusCode, String message) { + HttpServerResponse response = request.response(); + if (response.ended() || response.closed()) { + closeClientConnection(request); + return; + } + try { + if (!response.headWritten()) { + response.setStatusCode(statusCode); + Future endFuture = message == null ? response.end() : response.end(message); + endFuture.onComplete(v -> closeClientConnection(request)); + } else { + response.reset(); + closeClientConnection(request); + } + } catch (Exception e) { + LOGGER.debug("客户端响应已关闭,关闭代理连接", e); + closeClientConnection(request); + } + } + + private void closeClientConnection(HttpServerRequest request) { + try { + request.connection().close(); + } catch (Exception e) { + LOGGER.debug("关闭客户端代理连接失败", e); + } + } + + private void closeTunnelSockets(NetSocket clientSocket, NetSocket targetSocket) { + try { + clientSocket.close(); + } catch (Exception e) { + LOGGER.debug("关闭CONNECT客户端socket失败", e); + } + try { + targetSocket.close(); + } catch (Exception e) { + LOGGER.debug("关闭CONNECT目标socket失败", e); + } } @@ -209,6 +349,10 @@ public class HttpProxyVerticle extends AbstractVerticle { * @return 提取的端口号,如果没有指定端口,则返回默认端口 */ public static int extractPortFromUrl(String urlString) { + return extractPortFromUrl(urlString, 80); + } + + public static int extractPortFromUrl(String urlString, int defaultPort) { try { URI uri = new URI(urlString); int port = uri.getPort(); @@ -217,7 +361,7 @@ public class HttpProxyVerticle extends AbstractVerticle { if ("https".equalsIgnoreCase(uri.getScheme())) { port = 443; // HTTPS 默认端口 } else { - port = 80; // HTTP 默认端口 + port = defaultPort; // HTTP 默认端口 } } return port; @@ -228,16 +372,48 @@ public class HttpProxyVerticle extends AbstractVerticle { } } + private HostAndPort parseHostHeader(String hostHeader) { + if (hostHeader.startsWith("[")) { + int end = hostHeader.indexOf(']'); + if (end > 0) { + String host = hostHeader.substring(1, end); + int port = 80; + if (hostHeader.length() > end + 2 && hostHeader.charAt(end + 1) == ':') { + port = Integer.parseInt(hostHeader.substring(end + 2)); + } + return new HostAndPort(host, port); + } + } + int lastColon = hostHeader.lastIndexOf(':'); + if (lastColon > 0 && hostHeader.indexOf(':') == lastColon) { + return new HostAndPort(hostHeader.substring(0, lastColon), Integer.parseInt(hostHeader.substring(lastColon + 1))); + } + return new HostAndPort(hostHeader, 80); + } + + private record HostAndPort(String host, int port) { + } + @Override - public void stop() { - // 停止 HTTP 客户端以释放资源 - if (httpClient != null) { - httpClient.close(); - } - if (netClient != null) { - netClient.close(); - } + public void stop(Promise stopPromise) { + stopping = true; + Future serverClose = httpServer == null ? Future.succeededFuture() : httpServer.close(); + serverClose.onComplete(serverResult -> closeClients().onComplete(clientResult -> { + if (serverResult.failed()) { + stopPromise.fail(serverResult.cause()); + } else if (clientResult.failed()) { + stopPromise.fail(clientResult.cause()); + } else { + stopPromise.complete(); + } + })); + } + + private Future closeClients() { + Future httpClientClose = httpClient == null ? Future.succeededFuture() : httpClient.close(); + Future netClientClose = netClient == null ? Future.succeededFuture() : netClient.close(); + return Future.all(httpClientClose, netClientClose).mapEmpty(); } } diff --git a/core/src/main/java/cn/qaiu/vx/core/verticle/ReverseProxyVerticle.java b/core/src/main/java/cn/qaiu/vx/core/verticle/ReverseProxyVerticle.java index 7715758..7a59291 100644 --- a/core/src/main/java/cn/qaiu/vx/core/verticle/ReverseProxyVerticle.java +++ b/core/src/main/java/cn/qaiu/vx/core/verticle/ReverseProxyVerticle.java @@ -3,17 +3,20 @@ package cn.qaiu.vx.core.verticle; import cn.qaiu.vx.core.util.*; import io.vertx.core.AbstractVerticle; import io.vertx.core.Future; +import io.vertx.core.Handler; import io.vertx.core.Promise; import io.vertx.core.http.HttpClient; import io.vertx.core.http.HttpClientOptions; import io.vertx.core.http.HttpServer; import io.vertx.core.http.HttpServerOptions; import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.http.HttpServerResponse; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.core.net.PemKeyCertOptions; import io.vertx.ext.web.Route; import io.vertx.ext.web.Router; +import io.vertx.ext.web.RoutingContext; import io.vertx.ext.web.handler.StaticHandler; import io.vertx.ext.web.proxy.handler.ProxyHandler; import io.vertx.httpproxy.HttpProxy; @@ -27,7 +30,9 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.net.MalformedURLException; import java.net.URL; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -45,10 +50,6 @@ public class ReverseProxyVerticle extends AbstractVerticle { private static final Logger LOGGER = LoggerFactory.getLogger(ReverseProxyVerticle.class); - private static final String PATH_PROXY_CONFIG = SharedDataUtil - .getJsonConfig(ConfigConstant.GLOBAL_CONFIG) - .getString("proxyConf"); - private static final Future CONFIG = ConfigUtil.readYamlConfig(PATH_PROXY_CONFIG); private static final String DEFAULT_PATH_404 = "webroot/err/page404.html"; private static String serverName = "Vert.x-proxy-server"; //Server name in Http response header @@ -58,26 +59,40 @@ public class ReverseProxyVerticle extends AbstractVerticle { /** * 【优化】HttpClient连接池,按host:port缓存复用,避免每个请求都创建新连接 */ - private final Map httpClientPool = new ConcurrentHashMap<>(); + private final Map httpClientPool = new ConcurrentHashMap<>(); + private final List httpServers = new ArrayList<>(); + private volatile boolean stopping = false; + + /** + * 连接池条目。HttpProxy 会持有这里的 HttpClient 引用,不能在路由仍可用时关闭。 + */ + private static class HttpClientEntry { + final HttpClient client; + + HttpClientEntry(HttpClient client) { + this.client = client; + } + } /** * 【优化】高并发场景下的HttpClient配置 */ - private static final int MAX_POOL_SIZE = 100; // 最大连接池大小 - private static final int MAX_WAIT_QUEUE_SIZE = 500; // 最大等待队列大小 + private static final int MAX_POOL_SIZE = 32; // 最大连接池大小 + private static final int MAX_WAIT_QUEUE_SIZE = 128; // 最大等待队列大小 private static final int CONNECT_TIMEOUT = 30000; // 连接超时30秒 private static final int IDLE_TIMEOUT = 60; // 空闲超时60秒 private static final boolean KEEP_ALIVE = true; // 启用Keep-Alive - private static final boolean PIPELINING = true; // 启用HTTP管线化 - - + private static final boolean PIPELINING = false; // 代理场景关闭管线化,避免慢响应堆积 @Override public void start(Promise startPromise) { - CONFIG.onSuccess(this::handleProxyConfList).onFailure(e -> { + stopping = false; + String pathProxyConfig = SharedDataUtil + .getJsonConfig(ConfigConstant.GLOBAL_CONFIG) + .getString("proxyConf"); + ConfigUtil.readYamlConfig(pathProxyConfig).onSuccess(config -> startProxyServers(config).onComplete(startPromise)).onFailure(e -> { LOGGER.info("web代理配置已禁用,当前仅支持API调用"); + startPromise.complete(); }); -// createFileListener - startPromise.complete(); } /** @@ -85,16 +100,49 @@ public class ReverseProxyVerticle extends AbstractVerticle { */ @Override public void stop(Promise stopPromise) { - LOGGER.info("Stopping ReverseProxyVerticle, closing {} HttpClient connections...", httpClientPool.size()); - httpClientPool.values().forEach(client -> { + stopping = true; + LOGGER.info("Stopping ReverseProxyVerticle, closing {} servers and {} HttpClient connections...", + httpServers.size(), httpClientPool.size()); + + List> serverCloseFutures = new ArrayList<>(); + httpServers.forEach(server -> serverCloseFutures.add(server.close())); + Future serverCloseFuture = serverCloseFutures.isEmpty() + ? Future.succeededFuture() + : Future.all(serverCloseFutures).mapEmpty(); + + serverCloseFuture.onComplete(serverClose -> { + List> clientCloseFutures = new ArrayList<>(); + closeHttpClients(clientCloseFutures); + Future clientCloseFuture = clientCloseFutures.isEmpty() + ? Future.succeededFuture() + : Future.all(clientCloseFutures).mapEmpty(); + + clientCloseFuture.onComplete(clientClose -> { + if (serverClose.succeeded()) { + httpServers.clear(); + } + if (clientClose.succeeded()) { + httpClientPool.clear(); + } + if (serverClose.failed()) { + stopPromise.fail(serverClose.cause()); + } else if (clientClose.failed()) { + stopPromise.fail(clientClose.cause()); + } else { + stopPromise.complete(); + } + }); + }); + } + + private void closeHttpClients(List> closeFutures) { + httpClientPool.values().forEach(entry -> { try { - client.close(); + closeFutures.add(entry.client.close()); } catch (Exception e) { LOGGER.warn("Error closing HttpClient: {}", e.getMessage()); } }); - httpClientPool.clear(); - stopPromise.complete(); } /** @@ -105,7 +153,7 @@ public class ReverseProxyVerticle extends AbstractVerticle { */ private HttpClient getOrCreateHttpClient(String host, int port) { String key = host + ":" + port; - return httpClientPool.computeIfAbsent(key, k -> { + HttpClientEntry entry = httpClientPool.computeIfAbsent(key, k -> { LOGGER.info("Creating new HttpClient for {}", key); HttpClientOptions options = new HttpClientOptions() .setMaxPoolSize(MAX_POOL_SIZE) // 连接池大小 @@ -116,15 +164,16 @@ public class ReverseProxyVerticle extends AbstractVerticle { .setKeepAliveTimeout(120) // Keep-Alive超时120秒 .setPipelining(PIPELINING) // HTTP管线化 .setPipeliningLimit(10) // 管线化限制 - .setDecompressionSupported(true) // 支持解压响应 + .setDecompressionSupported(false) // 代理不解压,避免放大内存 .setTcpKeepAlive(true) // TCP Keep-Alive .setTcpNoDelay(true) // 禁用Nagle算法,降低延迟 .setTcpFastOpen(true) // 启用TCP Fast Open .setTcpQuickAck(true) // 启用TCP Quick ACK .setReuseAddress(true) // 允许地址重用 .setReusePort(true); // 允许端口重用 - return vertx.createHttpClient(options); + return new HttpClientEntry(vertx.createHttpClient(options)); }); + return entry.client; } /** @@ -137,7 +186,7 @@ public class ReverseProxyVerticle extends AbstractVerticle { * * @param config proxy config */ - private void handleProxyConfList(JsonObject config) { + private Future startProxyServers(JsonObject config) { serverName = config.getString("server-name"); // 解析全局 trusted-proxies JsonArray trustedArr = config.getJsonArray("trusted-proxies"); @@ -149,13 +198,15 @@ public class ReverseProxyVerticle extends AbstractVerticle { }); } JsonArray proxyConfList = config.getJsonArray("proxy"); + List> listenFutures = new ArrayList<>(); if (proxyConfList != null) { proxyConfList.forEach(proxyConf -> { if (proxyConf instanceof JsonObject) { - handleProxyConf((JsonObject) proxyConf); + listenFutures.add(handleProxyConf((JsonObject) proxyConf)); } }); } + return listenFutures.isEmpty() ? Future.succeededFuture() : Future.all(listenFutures).mapEmpty(); } /** @@ -201,7 +252,7 @@ public class ReverseProxyVerticle extends AbstractVerticle { * * @param proxyConf 代理配置 */ - private void handleProxyConf(JsonObject proxyConf) { + private Future handleProxyConf(JsonObject proxyConf) { // page404 path if (proxyConf.containsKey( @@ -226,6 +277,10 @@ public class ReverseProxyVerticle extends AbstractVerticle { // Add Server name header proxyRouter.route().handler(ctx -> { + if (stopping) { + sendProxyError(ctx, 503, "Service Unavailable"); + return; + } String realPath = ctx.request().uri(); if (realPath.startsWith(REROUTE_PATH_PREFIX)) { // vertx web proxy暂不支持rewrite, 所以这里进行手动替换, 请求地址中的请求path前缀替换为originPath @@ -234,7 +289,9 @@ public class ReverseProxyVerticle extends AbstractVerticle { return; } - ctx.response().putHeader("Server", serverName); + if (!ctx.response().ended() && !ctx.response().closed()) { + ctx.response().putHeader("Server", serverName); + } ctx.next(); }); @@ -250,15 +307,19 @@ public class ReverseProxyVerticle extends AbstractVerticle { // Send page404 page proxyRouter.errorHandler(404, ctx -> { - ctx.response().sendFile(proxyConf.getString("page404")); + sendNotFoundPage(ctx, proxyConf.getString("page404")); }); + proxyRouter.errorHandler(500, this::handleProxyFailure); HttpServer server = getHttpsServer(proxyConf); server.requestHandler(proxyRouter); Integer port = proxyConf.getInteger("listen"); LOGGER.info("proxy server start on {} port", port); - server.listen(port); + return server.listen(port) + .onSuccess(s -> httpServers.add(s)) + .onFailure(e -> LOGGER.error("proxy server start failed on {} port", port, e)) + .mapEmpty(); } private HttpServer getHttpsServer(JsonObject proxyConf) { @@ -267,7 +328,7 @@ public class ReverseProxyVerticle extends AbstractVerticle { .setTcpKeepAlive(true) // TCP Keep-Alive .setTcpNoDelay(true) // 禁用Nagle算法 .setCompressionSupported(true) // 启用压缩 - .setAcceptBacklog(50000) // 增加积压队列到50000 + .setAcceptBacklog(1024) // 限制积压队列,避免小容器内存膨胀 .setIdleTimeout(120) // 空闲超时120秒 .setTcpFastOpen(true) // 启用TCP Fast Open .setTcpQuickAck(true) // 启用TCP Quick ACK @@ -303,6 +364,67 @@ public class ReverseProxyVerticle extends AbstractVerticle { return vertx.createHttpServer(httpServerOptions); } + private void addProxyHandler(Route route, HttpProxy httpProxy) { + Handler proxyHandler = ProxyHandler.create(httpProxy); + route.handler(ctx -> { + try { + proxyHandler.handle(ctx); + } catch (Throwable t) { + LOGGER.error("反向代理处理异常", t); + ctx.fail(t); + } + }).failureHandler(this::handleProxyFailure); + } + + private void handleProxyFailure(RoutingContext ctx) { + Throwable failure = ctx.failure(); + if (failure != null) { + LOGGER.error("反向代理路由失败", failure); + } + int statusCode = ctx.statusCode() > 0 ? ctx.statusCode() : 502; + if (statusCode < 400) { + statusCode = 502; + } + sendProxyError(ctx, statusCode, "Bad Gateway"); + } + + private void sendNotFoundPage(RoutingContext ctx, String page404) { + HttpServerResponse response = ctx.response(); + if (response.ended() || response.closed()) { + return; + } + try { + if (response.headWritten()) { + response.reset(); + return; + } + response.sendFile(page404) + .onFailure(e -> { + LOGGER.warn("发送代理 404 页面失败: {}", page404, e); + sendProxyError(ctx, 404, "404 not found"); + }); + } catch (Exception e) { + LOGGER.warn("发送代理 404 页面异常: {}", page404, e); + sendProxyError(ctx, 404, "404 not found"); + } + } + + private void sendProxyError(RoutingContext ctx, int statusCode, String message) { + HttpServerResponse response = ctx.response(); + if (response.ended() || response.closed()) { + return; + } + try { + if (!response.headWritten()) { + response.setStatusCode(statusCode).end(message); + } else { + response.reset(); + } + } catch (Exception e) { + LOGGER.debug("代理响应已关闭,忽略错误响应", e); + } + } + /** * 处理静态资源配置 * @@ -391,15 +513,14 @@ public class ReverseProxyVerticle extends AbstractVerticle { if (StringUtils.isEmpty(originPath) || path.equals(originPath)) { Route route = path.startsWith("~") ? proxyRouter.routeWithRegex(path.substring(1)) : proxyRouter.route(path); - // 【优化】为代理处理器添加超时 - route.handler(ProxyHandler.create(httpProxy)); + addProxyHandler(route, httpProxy); } else { // 配置 /api/, / => 请求 /api/test 代理后 /test // 配置 /api/, /xxx => 请求 /api/test 代理后 /xxx/test final String path0 = path; final String originPath0 = REROUTE_PATH_PREFIX + originPath; - proxyRouter.route(originPath0 + "*").handler(ProxyHandler.create(httpProxy)); + addProxyHandler(proxyRouter.route(originPath0 + "*"), httpProxy); proxyRouter.route(path0 + "*").handler(ctx -> { String realPath = ctx.request().uri(); if (realPath.startsWith(path0)) { diff --git a/core/src/main/java/cn/qaiu/vx/core/verticle/RouterVerticle.java b/core/src/main/java/cn/qaiu/vx/core/verticle/RouterVerticle.java index b85a484..6fae9e8 100644 --- a/core/src/main/java/cn/qaiu/vx/core/verticle/RouterVerticle.java +++ b/core/src/main/java/cn/qaiu/vx/core/verticle/RouterVerticle.java @@ -22,21 +22,22 @@ public class RouterVerticle extends AbstractVerticle { private static final Logger LOGGER = LoggerFactory.getLogger(RouterVerticle.class); - private static final int port = SharedDataUtil.getValueForServerConfig("port"); - - private static final JsonObject globalConfig = SharedDataUtil.getJsonConfig("globalConfig"); - private HttpServer server; private Router router; + private int port; + private JsonObject globalConfig; static { LOGGER.info(JacksonConfig.class.getSimpleName() + " >> "); JacksonConfig.nothing(); - LOGGER.info("To start listening to port {} ......", port); } @Override public void start(Promise startPromise) { + port = SharedDataUtil.getValueForServerConfig("port"); + globalConfig = SharedDataUtil.getJsonConfig("globalConfig"); + LOGGER.info("To start listening to port {} ......", port); + // 端口是否占用 if (CommonUtil.isPortUsing(port)) { throw new RuntimeException("Start fail: the '" + port + "' port is already in use..."); @@ -64,7 +65,7 @@ public class RouterVerticle extends AbstractVerticle { SharedDataUtil.getJsonStringForServerConfig("contextPath")).createRouter(); server = vertx.createHttpServer(options); - server.requestHandler(router).webSocketHandler(s->{}).listen() + server.requestHandler(router).listen() .onSuccess(s -> startPromise.complete()) .onFailure(e -> startPromise.fail(e.getCause())); }