fix: ServiceVerticle 添加 stop() 方法注销 EventBus 消费者,修复重部署时消费者累积泄漏

保存已注册的 EventBus 地址列表,在 stop() 中通过 ServiceBinder 逐一注销。
原实现有 start() 无 stop(),Verticle 重部署时旧消费者不会被注销,导致重复注册。
This commit is contained in:
yukaidi
2026-05-28 23:15:12 +08:00
parent 255e7b2fb5
commit 1f4c7019d4

View File

@@ -10,6 +10,8 @@ import org.reflections.Reflections;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@@ -24,6 +26,7 @@ public class ServiceVerticle extends AbstractVerticle {
Logger LOGGER = LoggerFactory.getLogger(ServiceVerticle.class); Logger LOGGER = LoggerFactory.getLogger(ServiceVerticle.class);
private static final AtomicInteger ID = new AtomicInteger(1); private static final AtomicInteger ID = new AtomicInteger(1);
private static final Set<Class<?>> handlers; private static final Set<Class<?>> handlers;
private final List<String> registeredAddresses = new ArrayList<>();
static { static {
Reflections reflections = ReflectionUtil.getReflections(); Reflections reflections = ReflectionUtil.getReflections();
@@ -39,7 +42,9 @@ public class ServiceVerticle extends AbstractVerticle {
try { try {
serviceNames.append(asyncService.getName()).append("|"); serviceNames.append(asyncService.getName()).append("|");
BaseAsyncService asInstance = (BaseAsyncService) ReflectionUtil.newWithNoParam(asyncService); BaseAsyncService asInstance = (BaseAsyncService) ReflectionUtil.newWithNoParam(asyncService);
binder.setAddress(asInstance.getAddress()).register(asInstance.getAsyncInterfaceClass(), asInstance); String address = asInstance.getAddress();
binder.setAddress(address).register(asInstance.getAsyncInterfaceClass(), asInstance);
registeredAddresses.add(address);
} catch (Exception e) { } catch (Exception e) {
LOGGER.error("Failed to register service: {}", asyncService.getName(), e); LOGGER.error("Failed to register service: {}", asyncService.getName(), e);
} }
@@ -49,4 +54,19 @@ public class ServiceVerticle extends AbstractVerticle {
} }
startPromise.complete(); startPromise.complete();
} }
@Override
public void stop(Promise<Void> stopPromise) {
ServiceBinder binder = new ServiceBinder(vertx);
registeredAddresses.forEach(address -> {
try {
binder.setAddress(address).unregister(address);
} catch (Exception e) {
LOGGER.debug("Failed to unregister service at address: {}", address, e);
}
});
registeredAddresses.clear();
LOGGER.info("ServiceVerticle stopped, unregistered {} services", registeredAddresses.size());
stopPromise.complete();
}
} }