From 1f4c7019d4a74a5d6c56a67b6e7a5dad1ddece66 Mon Sep 17 00:00:00 2001 From: yukaidi Date: Thu, 28 May 2026 23:15:12 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20ServiceVerticle=20=E6=B7=BB=E5=8A=A0=20s?= =?UTF-8?q?top()=20=E6=96=B9=E6=B3=95=E6=B3=A8=E9=94=80=20EventBus=20?= =?UTF-8?q?=E6=B6=88=E8=B4=B9=E8=80=85=EF=BC=8C=E4=BF=AE=E5=A4=8D=E9=87=8D?= =?UTF-8?q?=E9=83=A8=E7=BD=B2=E6=97=B6=E6=B6=88=E8=B4=B9=E8=80=85=E7=B4=AF?= =?UTF-8?q?=E7=A7=AF=E6=B3=84=E6=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 保存已注册的 EventBus 地址列表,在 stop() 中通过 ServiceBinder 逐一注销。 原实现有 start() 无 stop(),Verticle 重部署时旧消费者不会被注销,导致重复注册。 --- .../vx/core/verticle/ServiceVerticle.java | 22 ++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/cn/qaiu/vx/core/verticle/ServiceVerticle.java b/core/src/main/java/cn/qaiu/vx/core/verticle/ServiceVerticle.java index 946b339..05b8311 100644 --- a/core/src/main/java/cn/qaiu/vx/core/verticle/ServiceVerticle.java +++ b/core/src/main/java/cn/qaiu/vx/core/verticle/ServiceVerticle.java @@ -10,6 +10,8 @@ import org.reflections.Reflections; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -24,6 +26,7 @@ public class ServiceVerticle extends AbstractVerticle { Logger LOGGER = LoggerFactory.getLogger(ServiceVerticle.class); private static final AtomicInteger ID = new AtomicInteger(1); private static final Set> handlers; + private final List registeredAddresses = new ArrayList<>(); static { Reflections reflections = ReflectionUtil.getReflections(); @@ -39,7 +42,9 @@ public class ServiceVerticle extends AbstractVerticle { try { serviceNames.append(asyncService.getName()).append("|"); BaseAsyncService asInstance = (BaseAsyncService) ReflectionUtil.newWithNoParam(asyncService); - binder.setAddress(asInstance.getAddress()).register(asInstance.getAsyncInterfaceClass(), asInstance); + String address = asInstance.getAddress(); + binder.setAddress(address).register(asInstance.getAsyncInterfaceClass(), asInstance); + registeredAddresses.add(address); } catch (Exception e) { LOGGER.error("Failed to register service: {}", asyncService.getName(), e); } @@ -49,4 +54,19 @@ public class ServiceVerticle extends AbstractVerticle { } startPromise.complete(); } + + @Override + public void stop(Promise stopPromise) { + ServiceBinder binder = new ServiceBinder(vertx); + registeredAddresses.forEach(address -> { + try { + binder.setAddress(address).unregister(address); + } catch (Exception e) { + LOGGER.debug("Failed to unregister service at address: {}", address, e); + } + }); + registeredAddresses.clear(); + LOGGER.info("ServiceVerticle stopped, unregistered {} services", registeredAddresses.size()); + stopPromise.complete(); + } }