fix: ServiceVerticle 保存 MessageConsumer 引用,修复 unregister 参数类型错误

审查发现 unregister(address) 参数类型不匹配,ServiceBinder.unregister() 需要
MessageConsumer 而非 String。改为保存 register() 返回的 MessageConsumer,
stop() 中直接调用 consumer.unregister()。同时修复日志在 clear() 后读 size 始终为 0 的 bug。
This commit is contained in:
yukaidi
2026-05-28 23:42:24 +08:00
parent 0b024a849a
commit 6d24388690

View File

@@ -5,6 +5,8 @@ import cn.qaiu.vx.core.base.BaseAsyncService;
import cn.qaiu.vx.core.util.ReflectionUtil; import cn.qaiu.vx.core.util.ReflectionUtil;
import io.vertx.core.AbstractVerticle; import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise; import io.vertx.core.Promise;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.json.JsonObject;
import io.vertx.serviceproxy.ServiceBinder; import io.vertx.serviceproxy.ServiceBinder;
import org.reflections.Reflections; import org.reflections.Reflections;
import org.slf4j.Logger; import org.slf4j.Logger;
@@ -26,7 +28,7 @@ public class ServiceVerticle extends AbstractVerticle {
Logger LOGGER = LoggerFactory.getLogger(ServiceVerticle.class); Logger LOGGER = LoggerFactory.getLogger(ServiceVerticle.class);
private static final AtomicInteger ID = new AtomicInteger(1); private static final AtomicInteger ID = new AtomicInteger(1);
private static final Set<Class<?>> handlers; private static final Set<Class<?>> handlers;
private final List<String> registeredAddresses = new ArrayList<>(); private final List<MessageConsumer<JsonObject>> consumers = new ArrayList<>();
static { static {
Reflections reflections = ReflectionUtil.getReflections(); Reflections reflections = ReflectionUtil.getReflections();
@@ -43,8 +45,9 @@ public class ServiceVerticle extends AbstractVerticle {
serviceNames.append(asyncService.getName()).append("|"); serviceNames.append(asyncService.getName()).append("|");
BaseAsyncService asInstance = (BaseAsyncService) ReflectionUtil.newWithNoParam(asyncService); BaseAsyncService asInstance = (BaseAsyncService) ReflectionUtil.newWithNoParam(asyncService);
String address = asInstance.getAddress(); String address = asInstance.getAddress();
binder.setAddress(address).register(asInstance.getAsyncInterfaceClass(), asInstance); MessageConsumer<JsonObject> consumer = binder.setAddress(address)
registeredAddresses.add(address); .register(asInstance.getAsyncInterfaceClass(), asInstance);
consumers.add(consumer);
} catch (Exception e) { } catch (Exception e) {
LOGGER.error("Failed to register service: {}", asyncService.getName(), e); LOGGER.error("Failed to register service: {}", asyncService.getName(), e);
} }
@@ -57,16 +60,16 @@ public class ServiceVerticle extends AbstractVerticle {
@Override @Override
public void stop(Promise<Void> stopPromise) { public void stop(Promise<Void> stopPromise) {
ServiceBinder binder = new ServiceBinder(vertx); int count = consumers.size();
registeredAddresses.forEach(address -> { consumers.forEach(consumer -> {
try { try {
binder.setAddress(address).unregister(address); consumer.unregister();
} catch (Exception e) { } catch (Exception e) {
LOGGER.debug("Failed to unregister service at address: {}", address, e); LOGGER.warn("Failed to unregister service consumer at address: {}", consumer.address(), e);
} }
}); });
registeredAddresses.clear(); consumers.clear();
LOGGER.info("ServiceVerticle stopped, unregistered {} services", registeredAddresses.size()); LOGGER.info("ServiceVerticle stopped, unregistered {} services", count);
stopPromise.complete(); stopPromise.complete();
} }
} }