fix(core): harden proxy and routing lifecycle

This commit is contained in:
yukaidi
2026-06-10 21:19:18 +08:00
parent 1ef6e120a8
commit 46c6827eda
11 changed files with 644 additions and 174 deletions

View File

@@ -21,6 +21,8 @@ import java.nio.file.Path;
import java.util.Calendar; import java.util.Calendar;
import java.util.Date; import java.util.Date;
import java.util.UUID; import java.util.UUID;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.LockSupport;
import static cn.qaiu.vx.core.util.ConfigConstant.*; import static cn.qaiu.vx.core.util.ConfigConstant.*;
@@ -45,11 +47,25 @@ public final class Deploy {
private Handler<JsonObject> handle; private Handler<JsonObject> handle;
private Thread mainThread; private Thread mainThread;
private final List<Runnable> preShutdownTasks = new CopyOnWriteArrayList<>();
private final List<Runnable> postShutdownTasks = new CopyOnWriteArrayList<>();
public static Deploy instance() { public static Deploy instance() {
return INSTANCE; return INSTANCE;
} }
public void addPreShutdownTask(Runnable task) {
if (task != null) {
preShutdownTasks.add(task);
}
}
public void addPostShutdownTask(Runnable task) {
if (task != null) {
postShutdownTasks.add(task);
}
}
/** /**
* *
* @param args 启动参数 * @param args 启动参数
@@ -133,9 +149,16 @@ public final class Deploy {
customConfig = globalConfig.getJsonObject(CUSTOM); customConfig = globalConfig.getJsonObject(CUSTOM);
JsonObject vertxConfig = globalConfig.getJsonObject(VERTX); JsonObject vertxConfig = globalConfig.getJsonObject(VERTX);
Integer vertxConfigELPS = vertxConfig.getInteger(EVENT_LOOP_POOL_SIZE); JsonObject vertxOptionsConfig = vertxConfig.copy();
var vertxOptions = vertxConfigELPS == 0 ? if (vertxOptionsConfig.getInteger(EVENT_LOOP_POOL_SIZE, 0) == 0) {
new VertxOptions() : new VertxOptions(vertxConfig); vertxOptionsConfig.remove(EVENT_LOOP_POOL_SIZE);
}
if (vertxOptionsConfig.getInteger("workerPoolSize", 0) == 0) {
vertxOptionsConfig.remove("workerPoolSize");
}
Integer vertxConfigELPS = vertxConfig.getInteger(EVENT_LOOP_POOL_SIZE, 0);
var vertxOptions = vertxOptionsConfig.isEmpty() ?
new VertxOptions() : new VertxOptions(vertxOptionsConfig);
// vertxOptions.setAddressResolverOptions( // vertxOptions.setAddressResolverOptions(
// new AddressResolverOptions(). // new AddressResolverOptions().
@@ -151,12 +174,16 @@ public final class Deploy {
// 注册 ShutdownHook确保进程退出时优雅关闭资源 // 注册 ShutdownHook确保进程退出时优雅关闭资源
Runtime.getRuntime().addShutdownHook(new Thread(() -> { Runtime.getRuntime().addShutdownHook(new Thread(() -> {
LOGGER.info("JVM shutting down, closing Vert.x..."); LOGGER.info("JVM shutting down...");
runShutdownTasks("before Vert.x close", preShutdownTasks);
try { try {
LOGGER.info("Closing Vert.x...");
vertx.close().toCompletionStage().toCompletableFuture().get(10, java.util.concurrent.TimeUnit.SECONDS); vertx.close().toCompletionStage().toCompletableFuture().get(10, java.util.concurrent.TimeUnit.SECONDS);
LOGGER.info("Vert.x closed successfully"); LOGGER.info("Vert.x closed successfully");
} catch (Exception e) { } catch (Exception e) {
LOGGER.warn("Vert.x close error or timeout", e); LOGGER.warn("Vert.x close error or timeout", e);
} finally {
runShutdownTasks("after Vert.x close", postShutdownTasks);
} }
})); }));
//配置保存在共享数据中 //配置保存在共享数据中
@@ -165,24 +192,25 @@ public final class Deploy {
localMap.put(GLOBAL_CONFIG, globalConfig); localMap.put(GLOBAL_CONFIG, globalConfig);
localMap.put(CUSTOM_CONFIG, customConfig); localMap.put(CUSTOM_CONFIG, customConfig);
localMap.put(SERVER, globalConfig.getJsonObject(SERVER)); localMap.put(SERVER, globalConfig.getJsonObject(SERVER));
var future0 = vertx.createSharedWorkerExecutor("other-handle") WorkerExecutor otherHandleExecutor = vertx.createSharedWorkerExecutor("other-handle");
.executeBlocking(() -> { var future0 = otherHandleExecutor.executeBlocking(() -> {
handle.handle(globalConfig); handle.handle(globalConfig);
return "Other handle complete"; return "Other handle complete";
}); });
future0.onSuccess(res -> { future0.onSuccess(res -> {
otherHandleExecutor.close();
LOGGER.info(res); LOGGER.info(res);
// 部署 路由、异步service、反向代理 服务 // 部署 路由、异步service、反向代理 服务
var future1 = vertx.deployVerticle(RouterVerticle.class, getWorkDeploymentOptions("Router")); var future1 = vertx.deployVerticle(RouterVerticle.class, getWorkDeploymentOptions("Router"));
var future2 = vertx.deployVerticle(ServiceVerticle.class, getWorkDeploymentOptions("Service")); var future2 = vertx.deployVerticle(ServiceVerticle.class, getWorkDeploymentOptions("Service"));
var future3 = vertx.deployVerticle(ReverseProxyVerticle.class, getWorkDeploymentOptions("proxy")); var future3 = vertx.deployVerticle(ReverseProxyVerticle.class, getWorkDeploymentOptions("proxy", 1));
JsonObject jsonObject = ((JsonObject) localMap.get(GLOBAL_CONFIG)).getJsonObject("proxy-server"); JsonObject jsonObject = ((JsonObject) localMap.get(GLOBAL_CONFIG)).getJsonObject("proxy-server");
if (jsonObject != null) { if (jsonObject != null) {
genPwd(jsonObject); genPwd(jsonObject);
var future4 = vertx.deployVerticle(HttpProxyVerticle.class, getWorkDeploymentOptions("proxy")); var future4 = vertx.deployVerticle(HttpProxyVerticle.class, getWorkDeploymentOptions("proxy", 1));
future4.onSuccess(LOGGER::info); future4.onSuccess(LOGGER::info);
future4.onFailure(e -> LOGGER.error("Other handle error", e)); future4.onFailure(e -> LOGGER.error("Other handle error", e));
Future.all(future1, future2, future3, future4) Future.all(future1, future2, future3, future4)
@@ -194,7 +222,10 @@ public final class Deploy {
.onFailure(this::deployVerticalFailed); .onFailure(this::deployVerticalFailed);
} }
}).onFailure(e -> LOGGER.error("Other handle error", e)); }).onFailure(e -> {
otherHandleExecutor.close();
LOGGER.error("Other handle error", e);
});
} }
private static void genPwd(JsonObject jsonObject) { private static void genPwd(JsonObject jsonObject) {
@@ -211,6 +242,21 @@ public final class Deploy {
jsonObject.getString("password")); jsonObject.getString("password"));
LOGGER.info("==============server info================"); LOGGER.info("==============server info================");
} }
private static void runShutdownTasks(String stage, List<Runnable> tasks) {
if (tasks.isEmpty()) {
return;
}
LOGGER.info("Running {} shutdown tasks: {}", stage, tasks.size());
for (Runnable task : tasks) {
try {
task.run();
} catch (Exception e) {
LOGGER.warn("Shutdown task failed at stage {}", stage, e);
}
}
}
/** /**
* 部署失败 * 部署失败
* *

View File

@@ -1,14 +1,20 @@
package cn.qaiu.vx.core.base; package cn.qaiu.vx.core.base;
import cn.qaiu.vx.core.annotaions.HandleSortFilter;
import cn.qaiu.vx.core.interceptor.AfterInterceptor; import cn.qaiu.vx.core.interceptor.AfterInterceptor;
import cn.qaiu.vx.core.model.JsonResult; import cn.qaiu.vx.core.model.JsonResult;
import cn.qaiu.vx.core.util.CommonUtil;
import cn.qaiu.vx.core.util.ReflectionUtil; import cn.qaiu.vx.core.util.ReflectionUtil;
import io.vertx.core.json.JsonObject; import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.RoutingContext; import io.vertx.ext.web.RoutingContext;
import org.reflections.Reflections; import org.reflections.Reflections;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors;
import static cn.qaiu.vx.core.util.ResponseUtil.*; import static cn.qaiu.vx.core.util.ResponseUtil.*;
@@ -22,9 +28,10 @@ public interface BaseHttpApi {
// 需要扫描注册的Router路径 // 需要扫描注册的Router路径
Reflections reflections = ReflectionUtil.getReflections(); Reflections reflections = ReflectionUtil.getReflections();
Logger LOGGER = LoggerFactory.getLogger(BaseHttpApi.class);
default void doFireJsonObjectResponse(RoutingContext ctx, JsonObject jsonObject) { default void doFireJsonObjectResponse(RoutingContext ctx, JsonObject jsonObject) {
if (!ctx.response().ended()) { if (!isResponseDone(ctx)) {
fireJsonObjectResponse(ctx, jsonObject); fireJsonObjectResponse(ctx, jsonObject);
} }
handleAfterInterceptor(ctx, jsonObject); handleAfterInterceptor(ctx, jsonObject);
@@ -32,14 +39,14 @@ public interface BaseHttpApi {
default <T> void doFireJsonResultResponse(RoutingContext ctx, JsonResult<T> jsonResult) { default <T> void doFireJsonResultResponse(RoutingContext ctx, JsonResult<T> jsonResult) {
if (!ctx.response().ended()) { if (!isResponseDone(ctx)) {
fireJsonResultResponse(ctx, jsonResult); fireJsonResultResponse(ctx, jsonResult);
} }
handleAfterInterceptor(ctx, jsonResult.toJsonObject()); handleAfterInterceptor(ctx, jsonResult.toJsonObject());
} }
default void doFireJsonObjectResponse(RoutingContext ctx, JsonObject jsonObject, int statusCode) { default void doFireJsonObjectResponse(RoutingContext ctx, JsonObject jsonObject, int statusCode) {
if (!ctx.response().ended()) { if (!isResponseDone(ctx)) {
fireJsonObjectResponse(ctx, jsonObject, statusCode); fireJsonObjectResponse(ctx, jsonObject, statusCode);
} }
handleAfterInterceptor(ctx, jsonObject); handleAfterInterceptor(ctx, jsonObject);
@@ -47,30 +54,78 @@ public interface BaseHttpApi {
default <T> void doFireJsonResultResponse(RoutingContext ctx, JsonResult<T> jsonResult, int statusCode) { default <T> void doFireJsonResultResponse(RoutingContext ctx, JsonResult<T> jsonResult, int statusCode) {
if (!ctx.response().ended()) { if (!isResponseDone(ctx)) {
fireJsonResultResponse(ctx, jsonResult, statusCode); fireJsonResultResponse(ctx, jsonResult, statusCode);
} }
handleAfterInterceptor(ctx, jsonResult.toJsonObject()); handleAfterInterceptor(ctx, jsonResult.toJsonObject());
} }
default Set<AfterInterceptor> getAfterInterceptor() { default Set<AfterInterceptor> getAfterInterceptor() {
return AfterInterceptorHolder.INSTANCES;
}
class AfterInterceptorHolder {
private static final Set<AfterInterceptor> INSTANCES = loadAfterInterceptors();
private static Set<AfterInterceptor> loadAfterInterceptors() {
Set<Class<? extends AfterInterceptor>> afterInterceptorClassSet = Set<Class<? extends AfterInterceptor>> afterInterceptorClassSet =
reflections.getSubTypesOf(AfterInterceptor.class); reflections.getSubTypesOf(AfterInterceptor.class);
if (afterInterceptorClassSet == null) { if (afterInterceptorClassSet == null || afterInterceptorClassSet.isEmpty()) {
return Collections.emptySet();
}
return afterInterceptorClassSet.stream()
.filter(AfterInterceptorHolder::isEnabled)
.sorted(AfterInterceptorHolder::compareOrder)
.map(AfterInterceptorHolder::newInterceptor)
.filter(Objects::nonNull)
.collect(Collectors.collectingAndThen(
Collectors.toCollection(LinkedHashSet::new),
Collections::unmodifiableSet));
}
private static boolean isEnabled(Class<? extends AfterInterceptor> clazz) {
HandleSortFilter sort = clazz.getAnnotation(HandleSortFilter.class);
return sort == null || sort.value() >= 0;
}
private static int compareOrder(Class<? extends AfterInterceptor> left, Class<? extends AfterInterceptor> right) {
return Integer.compare(order(left), order(right));
}
private static int order(Class<? extends AfterInterceptor> clazz) {
HandleSortFilter sort = clazz.getAnnotation(HandleSortFilter.class);
return sort == null ? 0 : sort.value();
}
private static AfterInterceptor newInterceptor(Class<? extends AfterInterceptor> clazz) {
try {
return ReflectionUtil.newWithNoParam(clazz);
} catch (Exception e) {
LOGGER.warn("AfterInterceptor 初始化失败,已跳过: {}", clazz.getName(), e);
return null; return null;
} }
return CommonUtil.sortClassSet(afterInterceptorClassSet); }
} }
default void handleAfterInterceptor(RoutingContext ctx, JsonObject jsonObject) { default void handleAfterInterceptor(RoutingContext ctx, JsonObject jsonObject) {
Set<AfterInterceptor> afterInterceptor = getAfterInterceptor(); if (ctx.response().closed()) {
if (afterInterceptor != null) { return;
afterInterceptor.forEach(ai -> ai.handle(ctx, jsonObject));
} }
if (!ctx.response().ended()) { Set<AfterInterceptor> afterInterceptor = getAfterInterceptor();
afterInterceptor.forEach(ai -> {
try {
ai.handle(ctx, jsonObject);
} catch (Exception e) {
LOGGER.warn("AfterInterceptor 执行失败: {}", ai.getClass().getName(), e);
}
});
if (!isResponseDone(ctx)) {
fireTextResponse(ctx, "handleAfterInterceptor: response not end"); fireTextResponse(ctx, "handleAfterInterceptor: response not end");
} }
} }
default boolean isResponseDone(RoutingContext ctx) {
return ctx.response().ended() || ctx.response().closed();
}
} }

View File

@@ -34,6 +34,7 @@ import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.*; import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
@@ -96,7 +97,9 @@ public class RouterHandlerFactory implements BaseHttpApi {
mainRouter.route().handler(CorsHandler.create().addRelativeOrigin(".*").allowCredentials(true).allowedMethods(httpMethods)); mainRouter.route().handler(CorsHandler.create().addRelativeOrigin(".*").allowCredentials(true).allowedMethods(httpMethods));
// 配置文件上传路径 // 配置文件上传路径
mainRouter.route().handler(BodyHandler.create().setUploadsDirectory("uploads")); mainRouter.route().handler(BodyHandler.create()
.setUploadsDirectory("uploads")
.setBodyLimit(2L * 1024 * 1024));
// 拦截器 // 拦截器
Set<Handler<RoutingContext>> interceptorSet = getInterceptorSet(); Set<Handler<RoutingContext>> interceptorSet = getInterceptorSet();
@@ -175,7 +178,7 @@ public class RouterHandlerFactory implements BaseHttpApi {
route.handler(TimeoutHandler.create(SharedDataUtil.getCustomConfig().getInteger(ROUTE_TIME_OUT))); route.handler(TimeoutHandler.create(SharedDataUtil.getCustomConfig().getInteger(ROUTE_TIME_OUT)));
route.handler(ResponseTimeHandler.create()); route.handler(ResponseTimeHandler.create());
route.handler(ctx -> handlerMethod(instance, method, ctx)).failureHandler(ctx -> { route.handler(ctx -> handlerMethod(instance, method, ctx)).failureHandler(ctx -> {
if (ctx.response().ended()) return; if (isResponseDone(ctx)) return;
// 超时处理器状态码503 // 超时处理器状态码503
if (ctx.statusCode() == 503 || ctx.failure() == null) { if (ctx.statusCode() == 503 || ctx.failure() == null) {
doFireJsonResultResponse(ctx, JsonResult.error("未知异常, 请联系管理员"), 503); doFireJsonResultResponse(ctx, JsonResult.error("未知异常, 请联系管理员"), 503);
@@ -394,26 +397,34 @@ public class RouterHandlerFactory implements BaseHttpApi {
if (data instanceof JsonResult jsonResult) { if (data instanceof JsonResult jsonResult) {
doFireJsonResultResponse(ctx, (JsonResult<?>) data, jsonResult.getCode()); doFireJsonResultResponse(ctx, (JsonResult<?>) data, jsonResult.getCode());
} } else if (data instanceof JsonObject) {
if (data instanceof JsonObject) {
doFireJsonObjectResponse(ctx, ((JsonObject) data)); doFireJsonObjectResponse(ctx, ((JsonObject) data));
} else if (data instanceof Future) { // 处理异步响应 } else if (data instanceof Future) { // 处理异步响应
((Future<?>) data).onSuccess(res -> { Future<?> responseFuture = (Future<?>) data;
AtomicReference<RoutingContext> ctxRef = new AtomicReference<>(ctx);
ctx.addEndHandler(v -> ctxRef.set(null));
responseFuture.onComplete(ar -> {
RoutingContext responseCtx = ctxRef.getAndSet(null);
if (responseCtx == null || isResponseDone(responseCtx)) {
return;
}
if (ar.succeeded()) {
Object res = ar.result();
if (res instanceof JsonResult jsonResult) { if (res instanceof JsonResult jsonResult) {
doFireJsonResultResponse(ctx, jsonResult, jsonResult.getCode()); doFireJsonResultResponse(responseCtx, jsonResult, jsonResult.getCode());
} } else if (res instanceof JsonObject) {
if (res instanceof JsonObject) { doFireJsonObjectResponse(responseCtx, ((JsonObject) res));
doFireJsonObjectResponse(ctx, ((JsonObject) res));
} else if (res != null) { } else if (res != null) {
doFireJsonResultResponse(ctx, JsonResult.data(res)); doFireJsonResultResponse(responseCtx, JsonResult.data(res));
} else { } else {
doFireJsonResultResponse(ctx, JsonResult.data(null)); doFireJsonResultResponse(responseCtx, JsonResult.data(null));
} }
} else {
}).onFailure(e -> { Throwable e = ar.cause();
LOGGER.error("请求处理失败", e); LOGGER.error("请求处理失败", e);
String msg = e.getMessage() != null ? e.getMessage() : "服务器内部错误"; String msg = e != null && e.getMessage() != null ? e.getMessage() : "服务器内部错误";
doFireJsonResultResponse(ctx, JsonResult.error(msg), 500); doFireJsonResultResponse(responseCtx, JsonResult.error(msg), 500);
}
}); });
} else { } else {
doFireJsonResultResponse(ctx, JsonResult.data(data)); doFireJsonResultResponse(ctx, JsonResult.data(data));

View File

@@ -17,6 +17,8 @@ import java.util.LinkedHashSet;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@@ -29,6 +31,16 @@ public class CommonUtil {
private static final Logger LOGGER = LoggerFactory.getLogger(CommonUtil.class); private static final Logger LOGGER = LoggerFactory.getLogger(CommonUtil.class);
/** 正则表达式缓存,避免每次调用重新编译 */
private static final ConcurrentHashMap<String, Pattern> PATTERN_CACHE = new ConcurrentHashMap<>();
/**
* 获取预编译的 Pattern带缓存
*/
private static Pattern getCachedPattern(String regex) {
return PATTERN_CACHE.computeIfAbsent(regex, Pattern::compile);
}
/** /**
* 匹配正则list * 匹配正则list
* *
@@ -39,7 +51,7 @@ public class CommonUtil {
public static boolean matchRegList(List<?> regList, String destStr) { public static boolean matchRegList(List<?> regList, String destStr) {
// 判断是否忽略 // 判断是否忽略
for (Object ignores : regList) { for (Object ignores : regList) {
if (destStr.matches(ignores.toString())) { if (getCachedPattern(ignores.toString()).matcher(destStr).matches()) {
return true; return true;
} }
} }
@@ -147,11 +159,13 @@ public class CommonUtil {
public static String getAppVersion() { public static String getAppVersion() {
if (null == appVersion) { if (null == appVersion) {
Properties properties = new Properties(); Properties properties = new Properties();
try { try (var is = CommonUtil.class.getClassLoader().getResourceAsStream("app.properties")) {
properties.load(CommonUtil.class.getClassLoader().getResourceAsStream("app.properties")); if (is != null) {
properties.load(is);
if (!properties.isEmpty()) { if (!properties.isEmpty()) {
appVersion = properties.getProperty("app.version") + "build" + properties.getProperty("build"); appVersion = properties.getProperty("app.version") + "build" + properties.getProperty("build");
} }
}
} catch (IOException e) { } catch (IOException e) {
LOGGER.error("读取app.properties失败", e); LOGGER.error("读取app.properties失败", e);
} }

View File

@@ -4,17 +4,41 @@ import io.vertx.core.Future;
import io.vertx.core.Promise; import io.vertx.core.Promise;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class FutureUtils { public class FutureUtils {
/** 默认同步等待超时时间(秒) */
private static final long DEFAULT_TIMEOUT_SECONDS = 120;
public static <T> T getResult(Future<T> future) { public static <T> T getResult(Future<T> future) {
try { try {
return future.toCompletionStage().toCompletableFuture().get(); return future.toCompletionStage().toCompletableFuture()
} catch (InterruptedException | ExecutionException e) { .get(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
throw new RuntimeException(e); } catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("线程被中断", e);
} catch (TimeoutException e) {
throw new RuntimeException("等待Future超时" + DEFAULT_TIMEOUT_SECONDS + "秒)", e);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
throw new RuntimeException(cause != null ? cause : e);
} }
} }
public static <T> T getResult(Promise<T> promise) { public static <T> T getResult(Promise<T> promise) {
return promise.future().toCompletionStage().toCompletableFuture().join(); try {
return promise.future().toCompletionStage().toCompletableFuture()
.get(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("线程被中断", e);
} catch (TimeoutException e) {
throw new RuntimeException("等待Promise超时" + DEFAULT_TIMEOUT_SECONDS + "秒)", e);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
throw new RuntimeException(cause != null ? cause : e);
}
} }
} }

View File

@@ -24,6 +24,7 @@ import java.lang.reflect.Method;
import java.net.URL; import java.net.URL;
import java.text.ParseException; import java.text.ParseException;
import java.util.*; import java.util.*;
import java.util.regex.Pattern;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -44,6 +45,12 @@ public final class ReflectionUtil {
// 缓存Reflections实例避免重复扫描每次扫描约35K+值耗时1-3秒占用大量内存 // 缓存Reflections实例避免重复扫描每次扫描约35K+值耗时1-3秒占用大量内存
private static final Map<String, Reflections> REFLECTIONS_CACHE = new java.util.concurrent.ConcurrentHashMap<>(); private static final Map<String, Reflections> REFLECTIONS_CACHE = new java.util.concurrent.ConcurrentHashMap<>();
// 预编译的类型匹配正则,避免每次请求重新编译
private static final Pattern BASIC_TYPE_PATTERN = Pattern.compile(
"^java\\.lang\\.((Boolean)|(Character)|(Byte)|(Short)|(Integer)|(Long)|(Float)|(Double)|(String))$");
private static final Pattern BASIC_TYPE_ARRAY_PATTERN = Pattern.compile(
"^(boolean|char|byte|short|int|long|float|double|String)\\[]$");
/** /**
* 以默认配置的基础包路径获取反射器 * 以默认配置的基础包路径获取反射器
* *
@@ -234,8 +241,7 @@ public final class ReflectionUtil {
if (ctClass.isPrimitive() || "java.util.Date".equals(ctClass.getName())) { if (ctClass.isPrimitive() || "java.util.Date".equals(ctClass.getName())) {
return true; return true;
} }
return ctClass.getName().matches("^java\\.lang\\.((Boolean)|(Character)|(Byte)|(Short)|(Integer)|(Long)|" + return BASIC_TYPE_PATTERN.matcher(ctClass.getName()).matches();
"(Float)|(Double)|(String))$");
} }
/** /**
@@ -246,7 +252,7 @@ public final class ReflectionUtil {
public static boolean isBasicTypeArray(CtClass ctClass) { public static boolean isBasicTypeArray(CtClass ctClass) {
if (!ctClass.isArray()) { if (!ctClass.isArray()) {
return false; return false;
} else return (ctClass.getName().matches("^(boolean|char|byte|short|int|long|float|double|String)\\[]$")); } else return BASIC_TYPE_ARRAY_PATTERN.matcher(ctClass.getName()).matches();
} }
/** /**

View File

@@ -12,14 +12,21 @@ import static io.vertx.core.http.HttpHeaders.CONTENT_TYPE;
public class ResponseUtil { public class ResponseUtil {
public static void redirect(HttpServerResponse response, String url) { public static void redirect(HttpServerResponse response, String url) {
if (response.ended() || response.closed()) {
return;
}
response.putHeader(CONTENT_TYPE, "text/html; charset=utf-8") response.putHeader(CONTENT_TYPE, "text/html; charset=utf-8")
.putHeader("Referrer-Policy", "no-referrer") .putHeader("Referrer-Policy", "no-referrer")
.putHeader(HttpHeaders.LOCATION, url).setStatusCode(302).end(); .putHeader(HttpHeaders.LOCATION, url).setStatusCode(302).end();
} }
public static void redirect(HttpServerResponse response, String url, Promise<?> promise) { public static void redirect(HttpServerResponse response, String url, Promise<?> promise) {
try {
redirect(response, url); redirect(response, url);
promise.complete(); promise.tryComplete();
} catch (Throwable t) {
promise.tryFail(t);
}
} }
public static void fireJsonObjectResponse(RoutingContext ctx, JsonObject jsonObject) { public static void fireJsonObjectResponse(RoutingContext ctx, JsonObject jsonObject) {
@@ -31,12 +38,18 @@ public class ResponseUtil {
} }
public static void fireJsonObjectResponse(RoutingContext ctx, JsonObject jsonObject, int statusCode) { public static void fireJsonObjectResponse(RoutingContext ctx, JsonObject jsonObject, int statusCode) {
if (ctx.response().ended() || ctx.response().closed()) {
return;
}
ctx.response().putHeader(CONTENT_TYPE, "application/json; charset=utf-8") ctx.response().putHeader(CONTENT_TYPE, "application/json; charset=utf-8")
.setStatusCode(statusCode) .setStatusCode(statusCode)
.end(jsonObject.encode()); .end(jsonObject.encode());
} }
public static void fireJsonObjectResponse(HttpServerResponse ctx, JsonObject jsonObject, int statusCode) { public static void fireJsonObjectResponse(HttpServerResponse ctx, JsonObject jsonObject, int statusCode) {
if (ctx.ended() || ctx.closed()) {
return;
}
ctx.putHeader(CONTENT_TYPE, "application/json; charset=utf-8") ctx.putHeader(CONTENT_TYPE, "application/json; charset=utf-8")
.setStatusCode(statusCode) .setStatusCode(statusCode)
.end(jsonObject.encode()); .end(jsonObject.encode());
@@ -55,10 +68,16 @@ public class ResponseUtil {
} }
public static void fireTextResponse(RoutingContext ctx, String text) { public static void fireTextResponse(RoutingContext ctx, String text) {
if (ctx.response().ended() || ctx.response().closed()) {
return;
}
ctx.response().putHeader(CONTENT_TYPE, "text/html; charset=utf-8").end(text); ctx.response().putHeader(CONTENT_TYPE, "text/html; charset=utf-8").end(text);
} }
public static void sendError(RoutingContext ctx, int statusCode) { public static void sendError(RoutingContext ctx, int statusCode) {
if (ctx.response().ended() || ctx.response().closed()) {
return;
}
ctx.response().setStatusCode(statusCode).end(); ctx.response().setStatusCode(statusCode).end();
} }
} }

View File

@@ -3,7 +3,6 @@ package cn.qaiu.vx.core.util;
import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject; import io.vertx.core.json.JsonObject;
import io.vertx.core.shareddata.LocalMap; import io.vertx.core.shareddata.LocalMap;
import io.vertx.core.shareddata.SharedData;
/** /**
* vertx 共享数据 * vertx 共享数据
@@ -13,10 +12,8 @@ import io.vertx.core.shareddata.SharedData;
*/ */
public class SharedDataUtil { public class SharedDataUtil {
private static final SharedData sharedData = VertxHolder.getVertxInstance().sharedData(); public static io.vertx.core.shareddata.SharedData shareData() {
return VertxHolder.getVertxInstance().sharedData();
public static SharedData shareData() {
return sharedData;
} }
public static LocalMap<String, Object> getLocalMap(String key) { public static LocalMap<String, Object> getLocalMap(String key) {
@@ -24,7 +21,7 @@ public class SharedDataUtil {
} }
public static <T> LocalMap<String, T> getLocalMapWithCast(String key) { public static <T> LocalMap<String, T> getLocalMapWithCast(String key) {
return sharedData.getLocalMap(key); return shareData().getLocalMap(key);
} }
public static JsonObject getJsonConfig(String key) { public static JsonObject getJsonConfig(String key) {

View File

@@ -1,10 +1,13 @@
package cn.qaiu.vx.core.verticle; package cn.qaiu.vx.core.verticle;
import io.vertx.core.AbstractVerticle; import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.http.*; import io.vertx.core.http.*;
import io.vertx.core.json.JsonObject; import io.vertx.core.json.JsonObject;
import io.vertx.core.net.NetClient; import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetClientOptions; import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.ProxyOptions; import io.vertx.core.net.ProxyOptions;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
@@ -24,13 +27,16 @@ public class HttpProxyVerticle extends AbstractVerticle {
private HttpClient httpClient; private HttpClient httpClient;
private NetClient netClient; private NetClient netClient;
private HttpServer httpServer;
private volatile boolean stopping = false;
private JsonObject proxyPreConf; private JsonObject proxyPreConf;
private JsonObject proxyServerConf; private JsonObject proxyServerConf;
@Override @Override
public void start() { public void start(io.vertx.core.Promise<Void> startPromise) {
stopping = false;
proxyServerConf = ((JsonObject)vertx.sharedData().getLocalMap(LOCAL).get(GLOBAL_CONFIG)).getJsonObject("proxy-server"); proxyServerConf = ((JsonObject)vertx.sharedData().getLocalMap(LOCAL).get(GLOBAL_CONFIG)).getJsonObject("proxy-server");
proxyPreConf = ((JsonObject)vertx.sharedData().getLocalMap(LOCAL).get(GLOBAL_CONFIG)).getJsonObject("proxy-pre"); proxyPreConf = ((JsonObject)vertx.sharedData().getLocalMap(LOCAL).get(GLOBAL_CONFIG)).getJsonObject("proxy-pre");
Integer serverPort = proxyServerConf.getInteger("port"); Integer serverPort = proxyServerConf.getInteger("port");
@@ -41,7 +47,12 @@ public class HttpProxyVerticle extends AbstractVerticle {
} }
// 初始化 HTTP 客户端,用于向目标服务器发送 HTTP 请求 // 初始化 HTTP 客户端,用于向目标服务器发送 HTTP 请求
HttpClientOptions httpClientOptions = new HttpClientOptions(); HttpClientOptions httpClientOptions = new HttpClientOptions()
.setMaxPoolSize(64)
.setMaxWaitQueueSize(256)
.setConnectTimeout(15000)
.setIdleTimeout(60)
.setKeepAlive(true);
if (proxyOptions != null) { if (proxyOptions != null) {
httpClientOptions.setProxyOptions(proxyOptions); httpClientOptions.setProxyOptions(proxyOptions);
} }
@@ -54,14 +65,14 @@ public class HttpProxyVerticle extends AbstractVerticle {
httpServerOptions.setClientAuth(ClientAuth.REQUIRED); httpServerOptions.setClientAuth(ClientAuth.REQUIRED);
} }
HttpServer server = vertx.createHttpServer(); httpServer = vertx.createHttpServer(httpServerOptions);
server.requestHandler(this::handleClientRequest); httpServer.requestHandler(this::handleClientRequest);
// 初始化 NetClient用于在 CONNECT 请求中建立 TCP 连接隧道 // 初始化 NetClient用于在 CONNECT 请求中建立 TCP 连接隧道
NetClientOptions netClientOptions = new NetClientOptions(); NetClientOptions netClientOptions = new NetClientOptions();
if (proxyOptions != null) { if (proxyOptions != null) {
httpClientOptions.setProxyOptions(proxyOptions); netClientOptions.setProxyOptions(proxyOptions);
} }
netClient = vertx.createNetClient(netClientOptions netClient = vertx.createNetClient(netClientOptions
@@ -69,16 +80,22 @@ public class HttpProxyVerticle extends AbstractVerticle {
.setTrustAll(true)); .setTrustAll(true));
// 启动 HTTP 代理服务器 // 启动 HTTP 代理服务器
server.listen(serverPort) httpServer.listen(serverPort)
.onSuccess(res-> LOGGER.info("HTTP Proxy server started on port {}", serverPort)) .onSuccess(res -> {
.onFailure(err-> LOGGER.error("Failed to start HTTP Proxy server: " + err.getMessage())); LOGGER.info("HTTP Proxy server started on port {}", serverPort);
startPromise.complete();
})
.onFailure(err -> {
LOGGER.error("Failed to start HTTP Proxy server: " + err.getMessage(), err);
closeClients().onComplete(close -> startPromise.fail(err));
});
} }
// 处理 HTTP CONNECT 请求,用于代理 HTTPS 流量 // 处理 HTTP CONNECT 请求,用于代理 HTTPS 流量
private void handleConnectRequest(HttpServerRequest clientRequest) { private void handleConnectRequest(HttpServerRequest clientRequest) {
String[] uriParts = clientRequest.uri().split(":"); String[] uriParts = clientRequest.uri().split(":");
if (uriParts.length != 2) { if (uriParts.length != 2) {
clientRequest.response().setStatusCode(400).end("Bad Request: Invalid URI format"); failClientResponse(clientRequest.response(), 400, "Bad Request: Invalid URI format");
return; return;
} }
@@ -88,19 +105,27 @@ public class HttpProxyVerticle extends AbstractVerticle {
try { try {
targetPort = Integer.parseInt(uriParts[1]); targetPort = Integer.parseInt(uriParts[1]);
} catch (NumberFormatException e) { } catch (NumberFormatException e) {
clientRequest.response().setStatusCode(400).end("Bad Request: Invalid port"); failClientResponse(clientRequest.response(), 400, "Bad Request: Invalid port");
return; return;
} }
clientRequest.pause(); clientRequest.pause();
// 通过 NetClient 连接目标服务器并创建隧道 // 通过 NetClient 连接目标服务器并创建隧道
try {
netClient.connect(targetPort, targetHost) netClient.connect(targetPort, targetHost)
.onSuccess(targetSocket -> { .onSuccess(targetSocket -> {
// Upgrade client connection to NetSocket and implement bidirectional data flow // Upgrade client connection to NetSocket and implement bidirectional data flow
clientRequest.toNetSocket() clientRequest.toNetSocket()
.onSuccess(clientSocket -> { .onSuccess(clientSocket -> {
// Set up bidirectional data forwarding clientSocket.pipeTo(targetSocket)
clientSocket.handler(targetSocket::write); .onFailure(err -> {
targetSocket.handler(clientSocket::write); LOGGER.debug("CONNECT client -> target pipe closed", err);
closeTunnelSockets(clientSocket, targetSocket);
});
targetSocket.pipeTo(clientSocket)
.onFailure(err -> {
LOGGER.debug("CONNECT target -> client pipe closed", err);
closeTunnelSockets(clientSocket, targetSocket);
});
// Close the other socket when one side closes // Close the other socket when one side closes
clientSocket.closeHandler(v -> targetSocket.close()); clientSocket.closeHandler(v -> targetSocket.close());
@@ -109,24 +134,32 @@ public class HttpProxyVerticle extends AbstractVerticle {
.onFailure(clientSocketAttempt -> { .onFailure(clientSocketAttempt -> {
System.err.println("Failed to upgrade client connection to socket: " + clientSocketAttempt.getMessage()); System.err.println("Failed to upgrade client connection to socket: " + clientSocketAttempt.getMessage());
targetSocket.close(); targetSocket.close();
clientRequest.response().setStatusCode(500).end("Internal Server Error"); failClientRequestAndClose(clientRequest, 500, "Internal Server Error");
}); });
}) })
.onFailure(connectionAttempt -> { .onFailure(connectionAttempt -> {
System.err.println("Failed to connect to target: " + connectionAttempt.getMessage()); LOGGER.warn("Failed to connect to target: {}", connectionAttempt.getMessage());
clientRequest.response().setStatusCode(502).end("Bad Gateway: Unable to connect to target"); failClientRequestAndClose(clientRequest, 502, "Bad Gateway: Unable to connect to target");
}); });
} catch (Exception e) {
LOGGER.warn("CONNECT 请求创建失败", e);
failClientRequestAndClose(clientRequest, 502, "Bad Gateway: Unable to connect to target");
}
} }
// 处理客户端的 HTTP 请求 // 处理客户端的 HTTP 请求
private void handleClientRequest(HttpServerRequest clientRequest) { private void handleClientRequest(HttpServerRequest clientRequest) {
if (stopping) {
failClientResponse(clientRequest.response(), 503, "Service Unavailable");
return;
}
// 打印来源ip和访问目标URI // 打印来源ip和访问目标URI
LOGGER.debug("source: {}, target: {}", clientRequest.remoteAddress().toString(), clientRequest.uri()); LOGGER.debug("source: {}, target: {}", clientRequest.remoteAddress().toString(), clientRequest.uri());
if (proxyServerConf.containsKey("username") && if (proxyServerConf.containsKey("username") &&
StringUtils.isNotBlank(proxyServerConf.getString("username"))) { StringUtils.isNotBlank(proxyServerConf.getString("username"))) {
String s = clientRequest.headers().get("Proxy-Authorization"); String s = clientRequest.headers().get("Proxy-Authorization");
if (s == null) { if (s == null) {
clientRequest.response().setStatusCode(403).end(); failClientResponse(clientRequest.response(), 403, null);
return; return;
} }
String[] split; String[] split;
@@ -134,19 +167,19 @@ public class HttpProxyVerticle extends AbstractVerticle {
split = new String(Base64.getDecoder().decode(s.replace("Basic ", ""))).split(":"); split = new String(Base64.getDecoder().decode(s.replace("Basic ", ""))).split(":");
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
LOGGER.warn("Proxy-Authorization header is not valid Base64"); LOGGER.warn("Proxy-Authorization header is not valid Base64");
clientRequest.response().setStatusCode(403).end(); failClientResponse(clientRequest.response(), 403, null);
return; return;
} }
if (split.length <= 1) { if (split.length <= 1) {
LOGGER.warn("Proxy-Authorization header format invalid: missing username:password separator"); LOGGER.warn("Proxy-Authorization header format invalid: missing username:password separator");
clientRequest.response().setStatusCode(403).end(); failClientResponse(clientRequest.response(), 403, null);
return; return;
} }
String username = proxyServerConf.getString("username"); String username = proxyServerConf.getString("username");
String password = proxyServerConf.getString("password"); String password = proxyServerConf.getString("password");
if (!split[0].equals(username) || !split[1].equals(password)) { if (!split[0].equals(username) || !split[1].equals(password)) {
LOGGER.info("-----auth failed------\nusername: {}", split[0]); LOGGER.info("-----auth failed------\nusername: {}", split[0]);
clientRequest.response().setStatusCode(403).end(); failClientResponse(clientRequest.response(), 403, null);
return; return;
} }
} }
@@ -165,40 +198,147 @@ public class HttpProxyVerticle extends AbstractVerticle {
// 获取目标主机 // 获取目标主机
String hostHeader = clientRequest.getHeader("Host"); String hostHeader = clientRequest.getHeader("Host");
if (hostHeader == null) { if (hostHeader == null) {
clientRequest.response().setStatusCode(400).end("Host header is missing"); failClientResponse(clientRequest.response(), 400, "Host header is missing");
return; return;
} }
String targetHost = hostHeader.split(":")[0]; HostAndPort target;
int targetPort = extractPortFromUrl(clientRequest.uri()); // 默认为 HTTP 的端口 try {
clientRequest.pause(); // 暂停客户端请求的读取,避免数据丢失 target = parseHostHeader(hostHeader);
} catch (IllegalArgumentException e) {
failClientResponse(clientRequest.response(), 400, "Bad Request: Invalid Host header");
return;
}
String targetHost = target.host();
int targetPort = extractPortFromUrl(clientRequest.uri(), target.port()); // 默认为 HTTP 的端口
if (targetPort <= 0) {
failClientResponse(clientRequest.response(), 400, "Bad Request: Invalid target port");
return;
}
clientRequest.pause(); // 暂停客户端请求的读取,等上游请求创建完成
try {
httpClient.request(clientRequest.method(), targetPort, targetHost, clientRequest.uri()) httpClient.request(clientRequest.method(), targetPort, targetHost, clientRequest.uri())
.onSuccess(request -> { .onSuccess(request -> {
clientRequest.resume(); // 恢复客户端请求的读取
// 逐个设置请求头 // 逐个设置请求头
clientRequest.headers().forEach(header -> request.putHeader(header.getKey(), header.getValue())); clientRequest.headers().forEach(header -> request.putHeader(header.getKey(), header.getValue()));
// 将客户端请求的 body 转发给目标服务器 request.response()
clientRequest.bodyHandler(body ->
request.send(body)
.onSuccess(response -> { .onSuccess(response -> {
clientRequest.response().setStatusCode(response.statusCode()); HttpServerResponse clientResponse = clientRequest.response();
clientRequest.response().headers().setAll(response.headers()); if (clientResponse.ended() || clientResponse.closed()) {
response.body() response.resume();
.onSuccess(b -> clientRequest.response().end(b)) return;
.onFailure(err -> clientRequest.response() }
.setStatusCode(502).end("Bad Gateway: Unable to reach target")); clientResponse.setStatusCode(response.statusCode());
clientResponse.headers().setAll(response.headers());
response.pipeTo(clientResponse)
.onFailure(err -> {
LOGGER.error("HTTP代理响应转发失败", err);
try {
response.request().reset();
} catch (Exception e) {
LOGGER.debug("HTTP代理上游响应已关闭", e);
}
failClientRequestAndClose(clientRequest, 502, "Bad Gateway: Unable to reach target");
});
}) })
.onFailure(err -> clientRequest.response() .onFailure(err -> {
.setStatusCode(502).end("Bad Gateway: Unable to reach target")) LOGGER.error("HTTP代理响应失败", err);
); try {
request.reset();
} catch (Exception e) {
LOGGER.debug("HTTP代理上游请求已关闭", e);
}
failClientRequestAndClose(clientRequest, 502, "Bad Gateway: Unable to reach target");
});
clientRequest.pipeTo(request)
.onFailure(err -> {
LOGGER.error("HTTP代理请求转发失败", err);
try {
request.reset();
} catch (Exception e) {
LOGGER.debug("HTTP代理上游请求已关闭", e);
}
failClientRequestAndClose(clientRequest, 502, "Bad Gateway: Unable to reach target");
});
clientRequest.resume();
}) })
.onFailure(err -> { .onFailure(err -> {
LOGGER.error("HTTP请求失败", err); LOGGER.error("HTTP请求失败", err);
clientRequest.response().setStatusCode(502).end("Bad Gateway: Request failed"); failClientRequestAndClose(clientRequest, 502, "Bad Gateway: Request failed");
}); });
} catch (Exception e) {
LOGGER.error("HTTP请求创建失败", e);
failClientRequestAndClose(clientRequest, 502, "Bad Gateway: Request failed");
}
}
private void failClientResponse(HttpServerResponse response, String message) {
failClientResponse(response, 502, message);
}
private void failClientResponse(HttpServerResponse response, int statusCode, String message) {
if (response.ended() || response.closed()) {
return;
}
try {
if (!response.headWritten()) {
response.setStatusCode(statusCode);
if (message == null) {
response.end();
} else {
response.end(message);
}
} else {
response.reset();
}
} catch (Exception e) {
LOGGER.debug("客户端响应已关闭,忽略代理错误响应", e);
}
}
private void failClientRequestAndClose(HttpServerRequest request, int statusCode, String message) {
HttpServerResponse response = request.response();
if (response.ended() || response.closed()) {
closeClientConnection(request);
return;
}
try {
if (!response.headWritten()) {
response.setStatusCode(statusCode);
Future<Void> endFuture = message == null ? response.end() : response.end(message);
endFuture.onComplete(v -> closeClientConnection(request));
} else {
response.reset();
closeClientConnection(request);
}
} catch (Exception e) {
LOGGER.debug("客户端响应已关闭,关闭代理连接", e);
closeClientConnection(request);
}
}
private void closeClientConnection(HttpServerRequest request) {
try {
request.connection().close();
} catch (Exception e) {
LOGGER.debug("关闭客户端代理连接失败", e);
}
}
private void closeTunnelSockets(NetSocket clientSocket, NetSocket targetSocket) {
try {
clientSocket.close();
} catch (Exception e) {
LOGGER.debug("关闭CONNECT客户端socket失败", e);
}
try {
targetSocket.close();
} catch (Exception e) {
LOGGER.debug("关闭CONNECT目标socket失败", e);
}
} }
@@ -209,6 +349,10 @@ public class HttpProxyVerticle extends AbstractVerticle {
* @return 提取的端口号,如果没有指定端口,则返回默认端口 * @return 提取的端口号,如果没有指定端口,则返回默认端口
*/ */
public static int extractPortFromUrl(String urlString) { public static int extractPortFromUrl(String urlString) {
return extractPortFromUrl(urlString, 80);
}
public static int extractPortFromUrl(String urlString, int defaultPort) {
try { try {
URI uri = new URI(urlString); URI uri = new URI(urlString);
int port = uri.getPort(); int port = uri.getPort();
@@ -217,7 +361,7 @@ public class HttpProxyVerticle extends AbstractVerticle {
if ("https".equalsIgnoreCase(uri.getScheme())) { if ("https".equalsIgnoreCase(uri.getScheme())) {
port = 443; // HTTPS 默认端口 port = 443; // HTTPS 默认端口
} else { } else {
port = 80; // HTTP 默认端口 port = defaultPort; // HTTP 默认端口
} }
} }
return port; return port;
@@ -228,16 +372,48 @@ public class HttpProxyVerticle extends AbstractVerticle {
} }
} }
private HostAndPort parseHostHeader(String hostHeader) {
if (hostHeader.startsWith("[")) {
int end = hostHeader.indexOf(']');
if (end > 0) {
String host = hostHeader.substring(1, end);
int port = 80;
if (hostHeader.length() > end + 2 && hostHeader.charAt(end + 1) == ':') {
port = Integer.parseInt(hostHeader.substring(end + 2));
}
return new HostAndPort(host, port);
}
}
int lastColon = hostHeader.lastIndexOf(':');
if (lastColon > 0 && hostHeader.indexOf(':') == lastColon) {
return new HostAndPort(hostHeader.substring(0, lastColon), Integer.parseInt(hostHeader.substring(lastColon + 1)));
}
return new HostAndPort(hostHeader, 80);
}
private record HostAndPort(String host, int port) {
}
@Override @Override
public void stop() { public void stop(Promise<Void> stopPromise) {
// 停止 HTTP 客户端以释放资源 stopping = true;
if (httpClient != null) { Future<Void> serverClose = httpServer == null ? Future.succeededFuture() : httpServer.close();
httpClient.close(); serverClose.onComplete(serverResult -> closeClients().onComplete(clientResult -> {
if (serverResult.failed()) {
stopPromise.fail(serverResult.cause());
} else if (clientResult.failed()) {
stopPromise.fail(clientResult.cause());
} else {
stopPromise.complete();
} }
if (netClient != null) { }));
netClient.close();
} }
private Future<Void> closeClients() {
Future<Void> httpClientClose = httpClient == null ? Future.succeededFuture() : httpClient.close();
Future<Void> netClientClose = netClient == null ? Future.succeededFuture() : netClient.close();
return Future.all(httpClientClose, netClientClose).mapEmpty();
} }
} }

View File

@@ -3,17 +3,20 @@ package cn.qaiu.vx.core.verticle;
import cn.qaiu.vx.core.util.*; import cn.qaiu.vx.core.util.*;
import io.vertx.core.AbstractVerticle; import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future; import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise; import io.vertx.core.Promise;
import io.vertx.core.http.HttpClient; import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientOptions; import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpServer; import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions; import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.HttpServerRequest; import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject; import io.vertx.core.json.JsonObject;
import io.vertx.core.net.PemKeyCertOptions; import io.vertx.core.net.PemKeyCertOptions;
import io.vertx.ext.web.Route; import io.vertx.ext.web.Route;
import io.vertx.ext.web.Router; import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.StaticHandler; import io.vertx.ext.web.handler.StaticHandler;
import io.vertx.ext.web.proxy.handler.ProxyHandler; import io.vertx.ext.web.proxy.handler.ProxyHandler;
import io.vertx.httpproxy.HttpProxy; import io.vertx.httpproxy.HttpProxy;
@@ -27,7 +30,9 @@ import org.slf4j.LoggerFactory;
import java.io.File; import java.io.File;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.net.URL; import java.net.URL;
import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@@ -45,10 +50,6 @@ public class ReverseProxyVerticle extends AbstractVerticle {
private static final Logger LOGGER = LoggerFactory.getLogger(ReverseProxyVerticle.class); private static final Logger LOGGER = LoggerFactory.getLogger(ReverseProxyVerticle.class);
private static final String PATH_PROXY_CONFIG = SharedDataUtil
.getJsonConfig(ConfigConstant.GLOBAL_CONFIG)
.getString("proxyConf");
private static final Future<JsonObject> CONFIG = ConfigUtil.readYamlConfig(PATH_PROXY_CONFIG);
private static final String DEFAULT_PATH_404 = "webroot/err/page404.html"; private static final String DEFAULT_PATH_404 = "webroot/err/page404.html";
private static String serverName = "Vert.x-proxy-server"; //Server name in Http response header private static String serverName = "Vert.x-proxy-server"; //Server name in Http response header
@@ -58,26 +59,40 @@ public class ReverseProxyVerticle extends AbstractVerticle {
/** /**
* 【优化】HttpClient连接池按host:port缓存复用避免每个请求都创建新连接 * 【优化】HttpClient连接池按host:port缓存复用避免每个请求都创建新连接
*/ */
private final Map<String, HttpClient> httpClientPool = new ConcurrentHashMap<>(); private final Map<String, HttpClientEntry> httpClientPool = new ConcurrentHashMap<>();
private final List<HttpServer> httpServers = new ArrayList<>();
private volatile boolean stopping = false;
/**
* 连接池条目。HttpProxy 会持有这里的 HttpClient 引用,不能在路由仍可用时关闭。
*/
private static class HttpClientEntry {
final HttpClient client;
HttpClientEntry(HttpClient client) {
this.client = client;
}
}
/** /**
* 【优化】高并发场景下的HttpClient配置 * 【优化】高并发场景下的HttpClient配置
*/ */
private static final int MAX_POOL_SIZE = 100; // 最大连接池大小 private static final int MAX_POOL_SIZE = 32; // 最大连接池大小
private static final int MAX_WAIT_QUEUE_SIZE = 500; // 最大等待队列大小 private static final int MAX_WAIT_QUEUE_SIZE = 128; // 最大等待队列大小
private static final int CONNECT_TIMEOUT = 30000; // 连接超时30秒 private static final int CONNECT_TIMEOUT = 30000; // 连接超时30秒
private static final int IDLE_TIMEOUT = 60; // 空闲超时60秒 private static final int IDLE_TIMEOUT = 60; // 空闲超时60秒
private static final boolean KEEP_ALIVE = true; // 启用Keep-Alive private static final boolean KEEP_ALIVE = true; // 启用Keep-Alive
private static final boolean PIPELINING = true; // 启用HTTP管线化 private static final boolean PIPELINING = false; // 代理场景关闭管线化,避免慢响应堆积
@Override @Override
public void start(Promise<Void> startPromise) { public void start(Promise<Void> startPromise) {
CONFIG.onSuccess(this::handleProxyConfList).onFailure(e -> { stopping = false;
String pathProxyConfig = SharedDataUtil
.getJsonConfig(ConfigConstant.GLOBAL_CONFIG)
.getString("proxyConf");
ConfigUtil.readYamlConfig(pathProxyConfig).onSuccess(config -> startProxyServers(config).onComplete(startPromise)).onFailure(e -> {
LOGGER.info("web代理配置已禁用当前仅支持API调用"); LOGGER.info("web代理配置已禁用当前仅支持API调用");
});
// createFileListener
startPromise.complete(); startPromise.complete();
});
} }
/** /**
@@ -85,16 +100,49 @@ public class ReverseProxyVerticle extends AbstractVerticle {
*/ */
@Override @Override
public void stop(Promise<Void> stopPromise) { public void stop(Promise<Void> stopPromise) {
LOGGER.info("Stopping ReverseProxyVerticle, closing {} HttpClient connections...", httpClientPool.size()); stopping = true;
httpClientPool.values().forEach(client -> { LOGGER.info("Stopping ReverseProxyVerticle, closing {} servers and {} HttpClient connections...",
httpServers.size(), httpClientPool.size());
List<Future<Void>> serverCloseFutures = new ArrayList<>();
httpServers.forEach(server -> serverCloseFutures.add(server.close()));
Future<Void> serverCloseFuture = serverCloseFutures.isEmpty()
? Future.succeededFuture()
: Future.all(serverCloseFutures).mapEmpty();
serverCloseFuture.onComplete(serverClose -> {
List<Future<Void>> clientCloseFutures = new ArrayList<>();
closeHttpClients(clientCloseFutures);
Future<Void> clientCloseFuture = clientCloseFutures.isEmpty()
? Future.succeededFuture()
: Future.all(clientCloseFutures).mapEmpty();
clientCloseFuture.onComplete(clientClose -> {
if (serverClose.succeeded()) {
httpServers.clear();
}
if (clientClose.succeeded()) {
httpClientPool.clear();
}
if (serverClose.failed()) {
stopPromise.fail(serverClose.cause());
} else if (clientClose.failed()) {
stopPromise.fail(clientClose.cause());
} else {
stopPromise.complete();
}
});
});
}
private void closeHttpClients(List<Future<Void>> closeFutures) {
httpClientPool.values().forEach(entry -> {
try { try {
client.close(); closeFutures.add(entry.client.close());
} catch (Exception e) { } catch (Exception e) {
LOGGER.warn("Error closing HttpClient: {}", e.getMessage()); LOGGER.warn("Error closing HttpClient: {}", e.getMessage());
} }
}); });
httpClientPool.clear();
stopPromise.complete();
} }
/** /**
@@ -105,7 +153,7 @@ public class ReverseProxyVerticle extends AbstractVerticle {
*/ */
private HttpClient getOrCreateHttpClient(String host, int port) { private HttpClient getOrCreateHttpClient(String host, int port) {
String key = host + ":" + port; String key = host + ":" + port;
return httpClientPool.computeIfAbsent(key, k -> { HttpClientEntry entry = httpClientPool.computeIfAbsent(key, k -> {
LOGGER.info("Creating new HttpClient for {}", key); LOGGER.info("Creating new HttpClient for {}", key);
HttpClientOptions options = new HttpClientOptions() HttpClientOptions options = new HttpClientOptions()
.setMaxPoolSize(MAX_POOL_SIZE) // 连接池大小 .setMaxPoolSize(MAX_POOL_SIZE) // 连接池大小
@@ -116,15 +164,16 @@ public class ReverseProxyVerticle extends AbstractVerticle {
.setKeepAliveTimeout(120) // Keep-Alive超时120秒 .setKeepAliveTimeout(120) // Keep-Alive超时120秒
.setPipelining(PIPELINING) // HTTP管线化 .setPipelining(PIPELINING) // HTTP管线化
.setPipeliningLimit(10) // 管线化限制 .setPipeliningLimit(10) // 管线化限制
.setDecompressionSupported(true) // 支持解压响应 .setDecompressionSupported(false) // 代理不解压,避免放大内存
.setTcpKeepAlive(true) // TCP Keep-Alive .setTcpKeepAlive(true) // TCP Keep-Alive
.setTcpNoDelay(true) // 禁用Nagle算法降低延迟 .setTcpNoDelay(true) // 禁用Nagle算法降低延迟
.setTcpFastOpen(true) // 启用TCP Fast Open .setTcpFastOpen(true) // 启用TCP Fast Open
.setTcpQuickAck(true) // 启用TCP Quick ACK .setTcpQuickAck(true) // 启用TCP Quick ACK
.setReuseAddress(true) // 允许地址重用 .setReuseAddress(true) // 允许地址重用
.setReusePort(true); // 允许端口重用 .setReusePort(true); // 允许端口重用
return vertx.createHttpClient(options); return new HttpClientEntry(vertx.createHttpClient(options));
}); });
return entry.client;
} }
/** /**
@@ -137,7 +186,7 @@ public class ReverseProxyVerticle extends AbstractVerticle {
* *
* @param config proxy config * @param config proxy config
*/ */
private void handleProxyConfList(JsonObject config) { private Future<Void> startProxyServers(JsonObject config) {
serverName = config.getString("server-name"); serverName = config.getString("server-name");
// 解析全局 trusted-proxies // 解析全局 trusted-proxies
JsonArray trustedArr = config.getJsonArray("trusted-proxies"); JsonArray trustedArr = config.getJsonArray("trusted-proxies");
@@ -149,13 +198,15 @@ public class ReverseProxyVerticle extends AbstractVerticle {
}); });
} }
JsonArray proxyConfList = config.getJsonArray("proxy"); JsonArray proxyConfList = config.getJsonArray("proxy");
List<Future<Void>> listenFutures = new ArrayList<>();
if (proxyConfList != null) { if (proxyConfList != null) {
proxyConfList.forEach(proxyConf -> { proxyConfList.forEach(proxyConf -> {
if (proxyConf instanceof JsonObject) { if (proxyConf instanceof JsonObject) {
handleProxyConf((JsonObject) proxyConf); listenFutures.add(handleProxyConf((JsonObject) proxyConf));
} }
}); });
} }
return listenFutures.isEmpty() ? Future.succeededFuture() : Future.all(listenFutures).mapEmpty();
} }
/** /**
@@ -201,7 +252,7 @@ public class ReverseProxyVerticle extends AbstractVerticle {
* *
* @param proxyConf 代理配置 * @param proxyConf 代理配置
*/ */
private void handleProxyConf(JsonObject proxyConf) { private Future<Void> handleProxyConf(JsonObject proxyConf) {
// page404 path // page404 path
if (proxyConf.containsKey( if (proxyConf.containsKey(
@@ -226,6 +277,10 @@ public class ReverseProxyVerticle extends AbstractVerticle {
// Add Server name header // Add Server name header
proxyRouter.route().handler(ctx -> { proxyRouter.route().handler(ctx -> {
if (stopping) {
sendProxyError(ctx, 503, "Service Unavailable");
return;
}
String realPath = ctx.request().uri(); String realPath = ctx.request().uri();
if (realPath.startsWith(REROUTE_PATH_PREFIX)) { if (realPath.startsWith(REROUTE_PATH_PREFIX)) {
// vertx web proxy暂不支持rewrite, 所以这里进行手动替换, 请求地址中的请求path前缀替换为originPath // vertx web proxy暂不支持rewrite, 所以这里进行手动替换, 请求地址中的请求path前缀替换为originPath
@@ -234,7 +289,9 @@ public class ReverseProxyVerticle extends AbstractVerticle {
return; return;
} }
if (!ctx.response().ended() && !ctx.response().closed()) {
ctx.response().putHeader("Server", serverName); ctx.response().putHeader("Server", serverName);
}
ctx.next(); ctx.next();
}); });
@@ -250,15 +307,19 @@ public class ReverseProxyVerticle extends AbstractVerticle {
// Send page404 page // Send page404 page
proxyRouter.errorHandler(404, ctx -> { proxyRouter.errorHandler(404, ctx -> {
ctx.response().sendFile(proxyConf.getString("page404")); sendNotFoundPage(ctx, proxyConf.getString("page404"));
}); });
proxyRouter.errorHandler(500, this::handleProxyFailure);
HttpServer server = getHttpsServer(proxyConf); HttpServer server = getHttpsServer(proxyConf);
server.requestHandler(proxyRouter); server.requestHandler(proxyRouter);
Integer port = proxyConf.getInteger("listen"); Integer port = proxyConf.getInteger("listen");
LOGGER.info("proxy server start on {} port", port); LOGGER.info("proxy server start on {} port", port);
server.listen(port); return server.listen(port)
.onSuccess(s -> httpServers.add(s))
.onFailure(e -> LOGGER.error("proxy server start failed on {} port", port, e))
.mapEmpty();
} }
private HttpServer getHttpsServer(JsonObject proxyConf) { private HttpServer getHttpsServer(JsonObject proxyConf) {
@@ -267,7 +328,7 @@ public class ReverseProxyVerticle extends AbstractVerticle {
.setTcpKeepAlive(true) // TCP Keep-Alive .setTcpKeepAlive(true) // TCP Keep-Alive
.setTcpNoDelay(true) // 禁用Nagle算法 .setTcpNoDelay(true) // 禁用Nagle算法
.setCompressionSupported(true) // 启用压缩 .setCompressionSupported(true) // 启用压缩
.setAcceptBacklog(50000) // 增加积压队列到50000 .setAcceptBacklog(1024) // 限制积压队列,避免小容器内存膨胀
.setIdleTimeout(120) // 空闲超时120秒 .setIdleTimeout(120) // 空闲超时120秒
.setTcpFastOpen(true) // 启用TCP Fast Open .setTcpFastOpen(true) // 启用TCP Fast Open
.setTcpQuickAck(true) // 启用TCP Quick ACK .setTcpQuickAck(true) // 启用TCP Quick ACK
@@ -303,6 +364,67 @@ public class ReverseProxyVerticle extends AbstractVerticle {
return vertx.createHttpServer(httpServerOptions); return vertx.createHttpServer(httpServerOptions);
} }
private void addProxyHandler(Route route, HttpProxy httpProxy) {
Handler<RoutingContext> proxyHandler = ProxyHandler.create(httpProxy);
route.handler(ctx -> {
try {
proxyHandler.handle(ctx);
} catch (Throwable t) {
LOGGER.error("反向代理处理异常", t);
ctx.fail(t);
}
}).failureHandler(this::handleProxyFailure);
}
private void handleProxyFailure(RoutingContext ctx) {
Throwable failure = ctx.failure();
if (failure != null) {
LOGGER.error("反向代理路由失败", failure);
}
int statusCode = ctx.statusCode() > 0 ? ctx.statusCode() : 502;
if (statusCode < 400) {
statusCode = 502;
}
sendProxyError(ctx, statusCode, "Bad Gateway");
}
private void sendNotFoundPage(RoutingContext ctx, String page404) {
HttpServerResponse response = ctx.response();
if (response.ended() || response.closed()) {
return;
}
try {
if (response.headWritten()) {
response.reset();
return;
}
response.sendFile(page404)
.onFailure(e -> {
LOGGER.warn("发送代理 404 页面失败: {}", page404, e);
sendProxyError(ctx, 404, "404 not found");
});
} catch (Exception e) {
LOGGER.warn("发送代理 404 页面异常: {}", page404, e);
sendProxyError(ctx, 404, "404 not found");
}
}
private void sendProxyError(RoutingContext ctx, int statusCode, String message) {
HttpServerResponse response = ctx.response();
if (response.ended() || response.closed()) {
return;
}
try {
if (!response.headWritten()) {
response.setStatusCode(statusCode).end(message);
} else {
response.reset();
}
} catch (Exception e) {
LOGGER.debug("代理响应已关闭,忽略错误响应", e);
}
}
/** /**
* 处理静态资源配置 * 处理静态资源配置
* *
@@ -391,15 +513,14 @@ public class ReverseProxyVerticle extends AbstractVerticle {
if (StringUtils.isEmpty(originPath) || path.equals(originPath)) { if (StringUtils.isEmpty(originPath) || path.equals(originPath)) {
Route route = path.startsWith("~") ? proxyRouter.routeWithRegex(path.substring(1)) Route route = path.startsWith("~") ? proxyRouter.routeWithRegex(path.substring(1))
: proxyRouter.route(path); : proxyRouter.route(path);
// 【优化】为代理处理器添加超时 addProxyHandler(route, httpProxy);
route.handler(ProxyHandler.create(httpProxy));
} else { } else {
// 配置 /api/, / => 请求 /api/test 代理后 /test // 配置 /api/, / => 请求 /api/test 代理后 /test
// 配置 /api/, /xxx => 请求 /api/test 代理后 /xxx/test // 配置 /api/, /xxx => 请求 /api/test 代理后 /xxx/test
final String path0 = path; final String path0 = path;
final String originPath0 = REROUTE_PATH_PREFIX + originPath; final String originPath0 = REROUTE_PATH_PREFIX + originPath;
proxyRouter.route(originPath0 + "*").handler(ProxyHandler.create(httpProxy)); addProxyHandler(proxyRouter.route(originPath0 + "*"), httpProxy);
proxyRouter.route(path0 + "*").handler(ctx -> { proxyRouter.route(path0 + "*").handler(ctx -> {
String realPath = ctx.request().uri(); String realPath = ctx.request().uri();
if (realPath.startsWith(path0)) { if (realPath.startsWith(path0)) {

View File

@@ -22,21 +22,22 @@ public class RouterVerticle extends AbstractVerticle {
private static final Logger LOGGER = LoggerFactory.getLogger(RouterVerticle.class); private static final Logger LOGGER = LoggerFactory.getLogger(RouterVerticle.class);
private static final int port = SharedDataUtil.getValueForServerConfig("port");
private static final JsonObject globalConfig = SharedDataUtil.getJsonConfig("globalConfig");
private HttpServer server; private HttpServer server;
private Router router; private Router router;
private int port;
private JsonObject globalConfig;
static { static {
LOGGER.info(JacksonConfig.class.getSimpleName() + " >> "); LOGGER.info(JacksonConfig.class.getSimpleName() + " >> ");
JacksonConfig.nothing(); JacksonConfig.nothing();
LOGGER.info("To start listening to port {} ......", port);
} }
@Override @Override
public void start(Promise<Void> startPromise) { public void start(Promise<Void> startPromise) {
port = SharedDataUtil.getValueForServerConfig("port");
globalConfig = SharedDataUtil.getJsonConfig("globalConfig");
LOGGER.info("To start listening to port {} ......", port);
// 端口是否占用 // 端口是否占用
if (CommonUtil.isPortUsing(port)) { if (CommonUtil.isPortUsing(port)) {
throw new RuntimeException("Start fail: the '" + port + "' port is already in use..."); throw new RuntimeException("Start fail: the '" + port + "' port is already in use...");
@@ -64,7 +65,7 @@ public class RouterVerticle extends AbstractVerticle {
SharedDataUtil.getJsonStringForServerConfig("contextPath")).createRouter(); SharedDataUtil.getJsonStringForServerConfig("contextPath")).createRouter();
server = vertx.createHttpServer(options); server = vertx.createHttpServer(options);
server.requestHandler(router).webSocketHandler(s->{}).listen() server.requestHandler(router).listen()
.onSuccess(s -> startPromise.complete()) .onSuccess(s -> startPromise.complete())
.onFailure(e -> startPromise.fail(e.getCause())); .onFailure(e -> startPromise.fail(e.getCause()));
} }