From 9022e987c21cb8e5edc5b5689570a1deb8004d85 Mon Sep 17 00:00:00 2001 From: Haidong Pang Date: Mon, 25 May 2026 19:22:14 +0800 Subject: [PATCH] [storage]: prepare zbs hosts on attach Resolves: ZSTAC-85462 Change-Id: Iba5d8a7f144cdaf367030867d202887f99eef77e --- .../addon/primary/ExternalPrimaryStorage.java | 58 +++++ .../addon/zbs/ZbsPrimaryStorageCase.groovy | 203 ++++++++++++++++++ 2 files changed, 261 insertions(+) diff --git a/storage/src/main/java/org/zstack/storage/addon/primary/ExternalPrimaryStorage.java b/storage/src/main/java/org/zstack/storage/addon/primary/ExternalPrimaryStorage.java index 50c14d7513b..921c8431326 100644 --- a/storage/src/main/java/org/zstack/storage/addon/primary/ExternalPrimaryStorage.java +++ b/storage/src/main/java/org/zstack/storage/addon/primary/ExternalPrimaryStorage.java @@ -28,7 +28,10 @@ import org.zstack.header.errorcode.ErrorCodeList; import org.zstack.header.errorcode.OperationFailureException; import org.zstack.header.host.HostInventory; +import org.zstack.header.host.HostState; +import org.zstack.header.host.HostStatus; import org.zstack.header.host.HostVO; +import org.zstack.header.host.HostVO_; import org.zstack.header.image.ImageConstant; import org.zstack.header.image.ImageInventory; import org.zstack.header.image.ImageVO; @@ -114,6 +117,61 @@ public ExternalPrimaryStorage(ExternalPrimaryStorage other) { this.selfConfig = other.selfConfig; } + @Override + public void attachHook(String clusterUuid, Completion completion) { + List hosts = Q.New(HostVO.class) + .eq(HostVO_.clusterUuid, clusterUuid) + .eq(HostVO_.status, HostStatus.Connected) + .notIn(HostVO_.state, Arrays.asList(HostState.PreMaintenance, HostState.Maintenance)) + .list(); + if (hosts.isEmpty()) { + completion.success(); + return; + } + + List hostInventories = HostInventory.valueOf(hosts); + // Deploy-client is idempotent. Hosts prepared before an attach failure are left for + // the next attach or reconnect to overwrite. + new While<>(hostInventories).each((host, compl) -> { + node.deployClient(host, new Completion(compl) { + @Override + public void success() { + compl.done(); + } + + @Override + public void fail(ErrorCode errorCode) { + compl.addError(errorCode); + compl.allDone(); + } + }); + }).run(new WhileDoneCompletion(completion) { + @Override + public void done(ErrorCodeList errorCodeList) { + if (!errorCodeList.getCauses().isEmpty()) { + completion.fail(errorCodeList.getCauses().get(0)); + return; + } + + activateHeartbeatVolumeForAttach(hostInventories.get(0), completion); + } + }); + } + + private void activateHeartbeatVolumeForAttach(HostInventory host, Completion completion) { + node.activateHeartbeatVolume(host, new ReturnValueCompletion(completion) { + @Override + public void success(HeartbeatVolumeTopology returnValue) { + completion.success(); + } + + @Override + public void fail(ErrorCode errorCode) { + completion.fail(errorCode); + } + }); + } + @Override public void handleMessage(Message msg) { if (msg instanceof PrimaryStorageMessage && !destMaker.isManagedByUs(((PrimaryStorageMessage) msg).getPrimaryStorageUuid())) { diff --git a/test/src/test/groovy/org/zstack/test/integration/storage/primary/addon/zbs/ZbsPrimaryStorageCase.groovy b/test/src/test/groovy/org/zstack/test/integration/storage/primary/addon/zbs/ZbsPrimaryStorageCase.groovy index b86c8840e24..d938f1da8fc 100644 --- a/test/src/test/groovy/org/zstack/test/integration/storage/primary/addon/zbs/ZbsPrimaryStorageCase.groovy +++ b/test/src/test/groovy/org/zstack/test/integration/storage/primary/addon/zbs/ZbsPrimaryStorageCase.groovy @@ -12,6 +12,8 @@ import org.zstack.header.storage.addon.primary.ExternalPrimaryStorageSpaceVO_ import org.zstack.header.storage.addon.primary.PrimaryStorageOutputProtocolRefVO import org.zstack.header.storage.primary.PrimaryStorageCapacityVO import org.zstack.header.storage.primary.PrimaryStorageCapacityVO_ +import org.zstack.header.storage.primary.PrimaryStorageClusterRefVO +import org.zstack.header.storage.primary.PrimaryStorageClusterRefVO_ import org.zstack.header.storage.primary.PrimaryStorageHostRefVO import org.zstack.header.storage.primary.PrimaryStorageHostRefVO_ import org.zstack.header.storage.primary.PrimaryStorageStatus @@ -176,6 +178,9 @@ class ZbsPrimaryStorageCase extends SubCase { testSyncPrimaryStorageCapacityConcurrently() testDefaultConfig() testUpdateExternalPrimaryStorage() + testPrepareHostsWhenAttachPrimaryStorageToCluster() + testAttachPrimaryStorageFailsWhenPreparingUsableHostFails() + testAttachPrimaryStorageFailsWhenActivatingHeartbeatVolumeFails() testMdsConnectFailed() testLifecycle() testDataVolumeLifecycle() @@ -486,7 +491,205 @@ class ZbsPrimaryStorageCase extends SubCase { assert rc.value == ZbsConstants.VOLUME_PHYSICAL_BLOCK_SIZE } + void testPrepareHostsWhenAttachPrimaryStorageToCluster() { + KVMHostInventory kvm2 = env.inventoryByName("kvm-2") as KVMHostInventory + KVMHostInventory kvm3 = env.inventoryByName("kvm-3") as KVMHostInventory + changeHostState { + uuid = kvm3.uuid + stateEvent = "maintain" + } + + List attachCalls = Collections.synchronizedList(new ArrayList<>()) + AtomicInteger attachCheckHostStatusCount = new AtomicInteger(0) + env.afterSimulator(ZbsStorageController.DEPLOY_CLIENT_PATH) { rsp, HttpEntity e -> + def cmd = JSONObjectUtil.toObject(e.body, ZbsStorageController.DeployClientCmd) + attachCalls.add("deploy:${cmd.ip}".toString()) + return rsp + } + env.afterSimulator(ZbsStorageController.CREATE_VOLUME_PATH) { rsp, HttpEntity e -> + def cmd = JSONObjectUtil.toObject(e.body, ZbsStorageController.CreateVolumeCmd) + if (cmd.volume == ZbsConstants.ZBS_HEARTBEAT_VOLUME_NAME) { + attachCalls.add("heartbeat:${cmd.logicalPool}".toString()) + ZbsStorageController.CreateVolumeRsp createVolumeRsp = new ZbsStorageController.CreateVolumeRsp() + createVolumeRsp.installPath = "zbs://${cmd.logicalPool}/${cmd.volume}" + return createVolumeRsp + } + + return rsp + } + env.afterSimulator(ZbsStorageController.CHECK_HOST_STORAGE_CONNECTION_PATH) { rsp, HttpEntity e -> + attachCheckHostStatusCount.incrementAndGet() + return rsp + } + + attachPrimaryStorageToCluster { + primaryStorageUuid = ps.uuid + clusterUuid = cluster.uuid + } + + assert attachCalls.findAll { it.startsWith("deploy:") }.sort() == ["deploy:127.0.0.1", "deploy:127.0.0.2"] + assert attachCalls.take(2).every { it.startsWith("deploy:") } + assert attachCalls.findAll { it.startsWith("heartbeat:") }.sort() == ["heartbeat:lpool1", "heartbeat:lpool2"] + assert attachCheckHostStatusCount.get() == 0 + + env.cleanAfterSimulatorHandlers() + changeHostState { + uuid = kvm3.uuid + stateEvent = "enable" + } + retryInSecs { + HostInventory host = queryHost { + conditions = ["uuid=${kvm3.uuid}"] + }[0] as HostInventory + assert host.state == "Enabled" + assert host.status == "Connected" + } + + List reconnectCalls = Collections.synchronizedList(new ArrayList<>()) + AtomicInteger reconnectCheckHostStatusCount = new AtomicInteger(0) + env.afterSimulator(ZbsStorageController.DEPLOY_CLIENT_PATH) { rsp, HttpEntity e -> + def cmd = JSONObjectUtil.toObject(e.body, ZbsStorageController.DeployClientCmd) + if (cmd.ip == "127.0.0.2") { + reconnectCalls.add("deploy:${cmd.ip}".toString()) + } + return rsp + } + env.afterSimulator(ZbsStorageController.CREATE_VOLUME_PATH) { rsp, HttpEntity e -> + def cmd = JSONObjectUtil.toObject(e.body, ZbsStorageController.CreateVolumeCmd) + if (cmd.volume == ZbsConstants.ZBS_HEARTBEAT_VOLUME_NAME) { + reconnectCalls.add("heartbeat:${cmd.logicalPool}".toString()) + ZbsStorageController.CreateVolumeRsp createVolumeRsp = new ZbsStorageController.CreateVolumeRsp() + createVolumeRsp.installPath = "zbs://${cmd.logicalPool}/${cmd.volume}" + return createVolumeRsp + } + + return rsp + } + env.afterSimulator(ZbsStorageController.CHECK_HOST_STORAGE_CONNECTION_PATH) { rsp, HttpEntity e -> + def cmd = JSONObjectUtil.toObject(e.body, ZbsStorageController.CheckHostStorageConnectionCmd) + if (cmd.hostUuid == kvm2.uuid) { + reconnectCheckHostStatusCount.incrementAndGet() + } + ZbsStorageController.CheckHostStorageConnectionRsp checkHostStorageConnectionRsp = new ZbsStorageController.CheckHostStorageConnectionRsp() + checkHostStorageConnectionRsp.success = true + return checkHostStorageConnectionRsp + } + + reconnectHost { + uuid = kvm2.uuid + } + + assert reconnectCalls.contains("deploy:127.0.0.2") + assert reconnectCalls.containsAll(["heartbeat:lpool1", "heartbeat:lpool2"]) + assert reconnectCheckHostStatusCount.get() > 0 + + if (Q.New(PrimaryStorageClusterRefVO.class) + .eq(PrimaryStorageClusterRefVO_.primaryStorageUuid, ps.uuid) + .eq(PrimaryStorageClusterRefVO_.clusterUuid, cluster.uuid) + .isExists()) { + detachPrimaryStorageFromCluster { + primaryStorageUuid = ps.uuid + clusterUuid = cluster.uuid + } + } + } + + void testAttachPrimaryStorageFailsWhenPreparingUsableHostFails() { + env.cleanAfterSimulatorHandlers() + + PrimaryStorageStatus statusBeforeAttach = Q.New(ExternalPrimaryStorageVO.class) + .select(ExternalPrimaryStorageVO_.status) + .eq(ExternalPrimaryStorageVO_.uuid, ps.uuid) + .findValue() + AtomicInteger failedDeployCount = new AtomicInteger(0) + AtomicInteger heartbeatCount = new AtomicInteger(0) + env.afterSimulator(ZbsStorageController.DEPLOY_CLIENT_PATH) { rsp, HttpEntity e -> + def cmd = JSONObjectUtil.toObject(e.body, ZbsStorageController.DeployClientCmd) + if (cmd.ip == "127.0.0.1") { + failedDeployCount.incrementAndGet() + ZbsStorageController.DeployClientRsp deployClientRsp = new ZbsStorageController.DeployClientRsp() + deployClientRsp.success = false + deployClientRsp.error = "on purpose" + return deployClientRsp + } + + return rsp + } + env.afterSimulator(ZbsStorageController.CREATE_VOLUME_PATH) { rsp, HttpEntity e -> + def cmd = JSONObjectUtil.toObject(e.body, ZbsStorageController.CreateVolumeCmd) + if (cmd.volume == ZbsConstants.ZBS_HEARTBEAT_VOLUME_NAME) { + heartbeatCount.incrementAndGet() + } + + return rsp + } + + expect(AssertionError.class) { + attachPrimaryStorageToCluster { + primaryStorageUuid = ps.uuid + clusterUuid = cluster.uuid + } + } + + assert !Q.New(PrimaryStorageClusterRefVO.class) + .eq(PrimaryStorageClusterRefVO_.primaryStorageUuid, ps.uuid) + .eq(PrimaryStorageClusterRefVO_.clusterUuid, cluster.uuid) + .isExists() + assert Q.New(ExternalPrimaryStorageVO.class) + .select(ExternalPrimaryStorageVO_.status) + .eq(ExternalPrimaryStorageVO_.uuid, ps.uuid) + .findValue() == statusBeforeAttach + assert failedDeployCount.get() == 1 + assert heartbeatCount.get() == 0 + } + + void testAttachPrimaryStorageFailsWhenActivatingHeartbeatVolumeFails() { + env.cleanAfterSimulatorHandlers() + + PrimaryStorageStatus statusBeforeAttach = Q.New(ExternalPrimaryStorageVO.class) + .select(ExternalPrimaryStorageVO_.status) + .eq(ExternalPrimaryStorageVO_.uuid, ps.uuid) + .findValue() + AtomicInteger deployCount = new AtomicInteger(0) + AtomicInteger failedHeartbeatCount = new AtomicInteger(0) + env.afterSimulator(ZbsStorageController.DEPLOY_CLIENT_PATH) { rsp, HttpEntity e -> + deployCount.incrementAndGet() + return rsp + } + env.afterSimulator(ZbsStorageController.CREATE_VOLUME_PATH) { rsp, HttpEntity e -> + def cmd = JSONObjectUtil.toObject(e.body, ZbsStorageController.CreateVolumeCmd) + if (cmd.volume == ZbsConstants.ZBS_HEARTBEAT_VOLUME_NAME) { + failedHeartbeatCount.incrementAndGet() + ZbsStorageController.CreateVolumeRsp createVolumeRsp = new ZbsStorageController.CreateVolumeRsp() + createVolumeRsp.setError("on purpose") + return createVolumeRsp + } + + return rsp + } + + expect(AssertionError.class) { + attachPrimaryStorageToCluster { + primaryStorageUuid = ps.uuid + clusterUuid = cluster.uuid + } + } + + assert !Q.New(PrimaryStorageClusterRefVO.class) + .eq(PrimaryStorageClusterRefVO_.primaryStorageUuid, ps.uuid) + .eq(PrimaryStorageClusterRefVO_.clusterUuid, cluster.uuid) + .isExists() + assert Q.New(ExternalPrimaryStorageVO.class) + .select(ExternalPrimaryStorageVO_.status) + .eq(ExternalPrimaryStorageVO_.uuid, ps.uuid) + .findValue() == statusBeforeAttach + assert deployCount.get() > 0 + assert failedHeartbeatCount.get() == 1 + } + void testLifecycle() { + env.cleanAfterSimulatorHandlers() + updateExternalPrimaryStorage { uuid = ps.uuid name = "test-zbs-new-name"