From 393d19d385f894271f03d790a5da28b3e003db8d Mon Sep 17 00:00:00 2001 From: "shan.wu" Date: Mon, 23 Dec 2024 14:17:31 +0800 Subject: [PATCH 1/2] [agent]: support agent memory monitoring and alarm MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1、support agent memory monitoring and alarm 2、support automatic dump of thread stack and tracking of objects when the memory continues to increase 3、automatically restart the process when the memory exceeds the threshold Resolves/Related: ZSTAC-71000 Change-Id: I676f656f6f6d6b6565656e737677666667767170 (cherry picked from commit 8fc2e9d530b5c7a27937c6cc20c9dfd8130223b0) --- .../zstack/compute/host/HostTrackImpl.java | 28 +++ conf/globalConfig/kvm.xml | 16 ++ .../java/org/zstack/core/CoreManagerImpl.java | 3 + .../zstack/header/core/GetLocalTaskMsg.java | 9 + .../header/host/HostCanonicalEvents.java | 66 +++++ .../java/org/zstack/kvm/KVMAgentCommands.java | 80 ++++++ .../main/java/org/zstack/kvm/KVMConstant.java | 3 + .../java/org/zstack/kvm/KVMGlobalConfig.java | 12 +- .../src/main/java/org/zstack/kvm/KVMHost.java | 232 +++++++++++++++++- .../java/org/zstack/kvm/KVMHostFactory.java | 80 +++++- .../org/zstack/kvm/RestartKvmAgentMsg.java | 26 ++ .../org/zstack/kvm/RestartKvmAgentReply.java | 6 + .../integration/kvm/host/KVMPingCase.groovy | 27 ++ test/src/test/resources/globalConfig/kvm.xml | 16 ++ 14 files changed, 592 insertions(+), 12 deletions(-) create mode 100644 plugin/kvm/src/main/java/org/zstack/kvm/RestartKvmAgentMsg.java create mode 100644 plugin/kvm/src/main/java/org/zstack/kvm/RestartKvmAgentReply.java diff --git a/compute/src/main/java/org/zstack/compute/host/HostTrackImpl.java b/compute/src/main/java/org/zstack/compute/host/HostTrackImpl.java index 3d5684914b0..49394e88b78 100755 --- a/compute/src/main/java/org/zstack/compute/host/HostTrackImpl.java +++ b/compute/src/main/java/org/zstack/compute/host/HostTrackImpl.java @@ -1,5 +1,7 @@ package org.zstack.compute.host; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import org.springframework.beans.factory.annotation.Autowired; import org.zstack.core.CoreGlobalProperty; import org.zstack.core.cloudbus.*; @@ -35,6 +37,7 @@ public class HostTrackImpl implements HostTracker, ManagementNodeChangeListener, private Map trackers = new ConcurrentHashMap<>(); private static boolean alwaysStartRightNow = false; + private static final Cache skippedPingHostDeadline = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.HOURS).build(); @Autowired private DatabaseFacade dbf; @@ -124,6 +127,12 @@ private void track() { return; } + if (skippedPingHostDeadline.getIfPresent(uuid) != null && System.currentTimeMillis() / 1000 <= skippedPingHostDeadline.getIfPresent(uuid)) { + logger.debug(String.format("skip tracking host[uuid:%s] this time, deadline %s", uuid, skippedPingHostDeadline.getIfPresent(uuid))); + continueToRunThisTimer(); + return; + } + PingHostMsg msg = new PingHostMsg(); msg.setHostUuid(uuid); bus.makeLocalServiceId(msg, HostConstant.SERVICE_ID); @@ -341,6 +350,7 @@ private HostReconnectTaskFactory getHostReconnectTaskFactory(String hvType) { public boolean start() { populateExtensions(); onHostStatusChange(); + onHostPingSkip(); HostGlobalConfig.PING_HOST_INTERVAL.installUpdateExtension((oldConfig, newConfig) -> { logger.debug(String.format("%s change from %s to %s, restart host trackers", @@ -386,6 +396,24 @@ protected void run(Map tokens, Object data) { }); } + private void onHostPingSkip() { + evtf.on(HostCanonicalEvents.HOST_PING_SKIP, new EventCallback() { + @Override + protected void run(Map tokens, Object data) { + HostCanonicalEvents.HostPingSkipData d = (HostCanonicalEvents.HostPingSkipData) data; + Long deadline = System.currentTimeMillis() / 1000 + d.getSkipTimeInSec(); + skippedPingHostDeadline.put(d.getHostUuid(), deadline); + } + }); + evtf.on(HostCanonicalEvents.HOST_PING_CANCEL_SKIP, new EventCallback() { + @Override + protected void run(Map tokens, Object data) { + HostCanonicalEvents.HostPingSkipData d = (HostCanonicalEvents.HostPingSkipData) data; + skippedPingHostDeadline.invalidate(d.getHostUuid()); + } + }); + } + @Override public boolean stop() { return true; diff --git a/conf/globalConfig/kvm.xml b/conf/globalConfig/kvm.xml index 771709113ce..f600dbb12bc 100755 --- a/conf/globalConfig/kvm.xml +++ b/conf/globalConfig/kvm.xml @@ -257,4 +257,20 @@ 1800 java.lang.Integer + + + kvm + kvmagent.physicalmemory.usage.alarm.threshold + The threshold for the physical memory usage of the kvmagent process, exceeding which an alarm will be triggered. + 2147483648 + java.lang.Long + + + + kvm + kvmagent.physicalmemory.usage.hardlimit + The hard limit for the physical memory usage of the kvmagent process, exceeding this value will trigger a kvmagent restart. + 10737418240 + java.lang.Long + diff --git a/core/src/main/java/org/zstack/core/CoreManagerImpl.java b/core/src/main/java/org/zstack/core/CoreManagerImpl.java index 909a0b25405..08a86f57c15 100644 --- a/core/src/main/java/org/zstack/core/CoreManagerImpl.java +++ b/core/src/main/java/org/zstack/core/CoreManagerImpl.java @@ -98,6 +98,9 @@ private void handle(GetLocalTaskMsg msg) { GetLocalTaskReply reply = new GetLocalTaskReply(); Map results = msg.getSyncSignatures().stream() .collect(Collectors.toMap(Function.identity(), thdf::getChainTaskInfo)); + if (msg.isOnlyRunningTask()) { + results.values().forEach(c -> c.getPendingTask().clear()); + } reply.setResults(results); bus.reply(msg, reply); } diff --git a/header/src/main/java/org/zstack/header/core/GetLocalTaskMsg.java b/header/src/main/java/org/zstack/header/core/GetLocalTaskMsg.java index 9e41fd36b3b..6a7675b52ac 100644 --- a/header/src/main/java/org/zstack/header/core/GetLocalTaskMsg.java +++ b/header/src/main/java/org/zstack/header/core/GetLocalTaskMsg.java @@ -6,6 +6,15 @@ public class GetLocalTaskMsg extends NeedReplyMessage { private List syncSignatures; + private boolean onlyRunningTask; + + public boolean isOnlyRunningTask() { + return onlyRunningTask; + } + + public void setOnlyRunningTask(boolean onlyRunningTask) { + this.onlyRunningTask = onlyRunningTask; + } public void setSyncSignatures(List syncSignatures) { this.syncSignatures = syncSignatures; diff --git a/header/src/main/java/org/zstack/header/host/HostCanonicalEvents.java b/header/src/main/java/org/zstack/header/host/HostCanonicalEvents.java index a0ecc4c0046..3350d926a1e 100755 --- a/header/src/main/java/org/zstack/header/host/HostCanonicalEvents.java +++ b/header/src/main/java/org/zstack/header/host/HostCanonicalEvents.java @@ -23,6 +23,10 @@ public class HostCanonicalEvents { public static final String HOST_PHYSICAL_DISK_STATUS_ABNORMAL = "/host/physicalDisk/status/abnormal"; public static final String HOST_PHYSICAL_DISK_INSERT_TRIGGERED = "/host/physicalDisk/insert/triggered"; public static final String HOST_PHYSICAL_DISK_REMOVE_TRIGGERED = "/host/physicalDisk/remove/triggered"; + public static final String HOST_PROCESS_PHYSICAL_MEMORY_USAGE_ABNORMAL = "/host/process/physicalMemory/usage/abnormal"; + public static final String HOST_PING_SKIP = "/host/ping/skip"; + public static final String HOST_PING_CANCEL_SKIP = "/host/ping/cancel/skip"; + @NeedJsonSchema public static class HostPhysicalCpuStatusAbnormalData { @@ -399,4 +403,66 @@ public void setInterfaceStatus(String interfaceStatus) { this.interfaceStatus = interfaceStatus; } } + + @NeedJsonSchema + public static class HostProcessPhysicalMemoryUsageAlarmData { + private String hostUuid; + private String pid; + private String processName; + private String memoryUsage; + + public String getHostUuid() { + return hostUuid; + } + + public void setHostUuid(String hostUuid) { + this.hostUuid = hostUuid; + } + + public String getPid() { + return pid; + } + + public void setPid(String pid) { + this.pid = pid; + } + + public String getProcessName() { + return processName; + } + + public void setProcessName(String processName) { + this.processName = processName; + } + + public String getMemoryUsage() { + return memoryUsage; + } + + public void setMemoryUsage(String memoryUsage) { + this.memoryUsage = memoryUsage; + } + } + + @NeedJsonSchema + public static class HostPingSkipData { + private String hostUuid; + private int skipTimeInSec; + + public String getHostUuid() { + return hostUuid; + } + + public void setHostUuid(String hostUuid) { + this.hostUuid = hostUuid; + } + + public int getSkipTimeInSec() { + return skipTimeInSec; + } + + public void setSkipTimeInSec(int skipTimeInSec) { + this.skipTimeInSec = skipTimeInSec; + } + } } diff --git a/plugin/kvm/src/main/java/org/zstack/kvm/KVMAgentCommands.java b/plugin/kvm/src/main/java/org/zstack/kvm/KVMAgentCommands.java index 2c8bdc4a3e4..c6fe4ca154d 100755 --- a/plugin/kvm/src/main/java/org/zstack/kvm/KVMAgentCommands.java +++ b/plugin/kvm/src/main/java/org/zstack/kvm/KVMAgentCommands.java @@ -330,6 +330,7 @@ public void setQemuVersion(String qemuVersion) { public static class PingCmd extends AgentCommand { public String hostUuid; + public Map configs; } public static class PingResponse extends AgentResponse { @@ -4127,4 +4128,83 @@ public void setImageData(String imageData) { this.imageData = imageData; } } + + public static class HostProcessPhysicalMemoryUsageAlarmCmd { + private String hostUuid; + private String pid; + private String processName; + private long memoryUsage; + private Map additionalProperties = new HashMap<>(); + + public String getHostUuid() { + return hostUuid; + } + + public void setHostUuid(String hostUuid) { + this.hostUuid = hostUuid; + } + + public String getPid() { + return pid; + } + + public void setPid(String pid) { + this.pid = pid; + } + + public String getProcessName() { + return processName; + } + + public void setProcessName(String processName) { + this.processName = processName; + } + + public long getMemoryUsage() { + return memoryUsage; + } + + public void setMemoryUsage(long memoryUsage) { + this.memoryUsage = memoryUsage; + } + + public Map getAdditionalProperties() { + return additionalProperties; + } + + public void setAdditionalProperties(Map additionalProperties) { + this.additionalProperties = additionalProperties; + } + } + + public static class HostKvmagentStatusCmd { + private String status; + private String hostUuid; + private long memoryUsage; + + public String getHostUuid() { + return hostUuid; + } + + public void setHostUuid(String hostUuid) { + this.hostUuid = hostUuid; + } + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + + public long getMemoryUsage() { + return memoryUsage; + } + + public void setMemoryUsage(long memoryUsage) { + this.memoryUsage = memoryUsage; + } + } + } diff --git a/plugin/kvm/src/main/java/org/zstack/kvm/KVMConstant.java b/plugin/kvm/src/main/java/org/zstack/kvm/KVMConstant.java index 12a0d0a77c4..65edf86fa64 100755 --- a/plugin/kvm/src/main/java/org/zstack/kvm/KVMConstant.java +++ b/plugin/kvm/src/main/java/org/zstack/kvm/KVMConstant.java @@ -109,6 +109,9 @@ public interface KVMConstant { String KVM_BLOCK_COMMIT_VOLUME_PATH = "/vm/volume/blockcommit"; String TAKE_VM_CONSOLE_SCREENSHOT_PATH = "/vm/console/screenshot"; + String HOST_PROCESS_PHYSICAL_MEMORY_USAGE_ALARM_PATH = "/host/process/physicalMemory/usage/alarm"; + String HOST_KVMAGENT_STATUS_PATH = "/host/kvmagent/status"; + String KVM_AGENT_OWNER = "kvm"; String ALI_REPO = "ali"; diff --git a/plugin/kvm/src/main/java/org/zstack/kvm/KVMGlobalConfig.java b/plugin/kvm/src/main/java/org/zstack/kvm/KVMGlobalConfig.java index 17ca18ec2a5..71e9cc8d20d 100755 --- a/plugin/kvm/src/main/java/org/zstack/kvm/KVMGlobalConfig.java +++ b/plugin/kvm/src/main/java/org/zstack/kvm/KVMGlobalConfig.java @@ -1,6 +1,5 @@ package org.zstack.kvm; -import org.zstack.core.GlobalProperty; import org.zstack.core.config.GlobalConfig; import org.zstack.core.config.GlobalConfigDef; import org.zstack.core.config.GlobalConfigDefinition; @@ -130,4 +129,15 @@ public class KVMGlobalConfig { @GlobalConfigDef(defaultValue = "none", description = "enable host ksm") @BindResourceConfig({HostVO.class}) public static GlobalConfig HOST_KSM = new GlobalConfig(CATEGORY, "host.ksm"); + + @GlobalConfigValidation(validValues = {"true", "false"}) + @GlobalConfigDef(defaultValue = "false", description = "restart kvm host libvirtd service or not") + @BindResourceConfig({HostVO.class, ClusterVO.class}) + public static GlobalConfig RECONNECT_HOST_RESTART_LIBVIRTD_SERVICE = new GlobalConfig(CATEGORY, "reconnect.host.restart.libvirtd.service"); + + @GlobalConfigValidation(numberGreaterThan = 0) + public static GlobalConfig KVMAGENT_PHYSICAL_MEMORY_USAGE_ALARM_THRESHOLD = new GlobalConfig(CATEGORY, "kvmagent.physicalmemory.usage.alarm.threshold"); + + @GlobalConfigValidation(numberGreaterThan = 0) + public static GlobalConfig KVMAGENT_PHYSICAL_MEMORY_USAGE_HARD_LIMIT = new GlobalConfig(CATEGORY, "kvmagent.physicalmemory.usage.hardlimit"); } diff --git a/plugin/kvm/src/main/java/org/zstack/kvm/KVMHost.java b/plugin/kvm/src/main/java/org/zstack/kvm/KVMHost.java index 40dd23224c1..9cf4a1ac697 100755 --- a/plugin/kvm/src/main/java/org/zstack/kvm/KVMHost.java +++ b/plugin/kvm/src/main/java/org/zstack/kvm/KVMHost.java @@ -21,8 +21,10 @@ import org.zstack.core.Platform; import org.zstack.core.agent.AgentConstant; import org.zstack.core.ansible.*; +import org.zstack.core.asyncbatch.While; import org.zstack.core.cloudbus.CloudBusCallBack; import org.zstack.core.cloudbus.CloudBusGlobalProperty; +import org.zstack.core.cloudbus.ResourceDestinationMaker; import org.zstack.core.componentloader.PluginRegistry; import org.zstack.core.db.Q; import org.zstack.core.db.SQL; @@ -33,6 +35,7 @@ import org.zstack.core.timeout.ApiTimeoutManager; import org.zstack.core.workflow.FlowChainBuilder; import org.zstack.core.workflow.ShareFlow; +import org.zstack.core.workflow.SimpleFlowChain; import org.zstack.header.Constants; import org.zstack.header.allocator.DesignatedAllocateHostMsg; import org.zstack.header.allocator.HostAllocatorConstant; @@ -40,10 +43,9 @@ import org.zstack.header.cluster.ClusterInventory; import org.zstack.header.cluster.ClusterVO; import org.zstack.header.cluster.ReportHostCapacityMessage; -import org.zstack.header.core.AsyncLatch; -import org.zstack.header.core.Completion; -import org.zstack.header.core.NoErrorCompletion; -import org.zstack.header.core.ReturnValueCompletion; +import org.zstack.header.core.*; +import org.zstack.header.core.progress.ChainInfo; +import org.zstack.header.core.progress.TaskInfo; import org.zstack.header.core.progress.TaskProgressRange; import org.zstack.header.core.workflow.*; import org.zstack.header.errorcode.ErrorCode; @@ -107,6 +109,7 @@ import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -158,6 +161,8 @@ public class KVMHost extends HostBase implements Host { private TimeHelper timeHelper; @Autowired private AccountManager accountMgr; + @Autowired + private ResourceDestinationMaker destMaker; private KVMHostContext context; @@ -679,11 +684,229 @@ protected void handleLocalMessage(Message msg) { handle((CommitVolumeOnHypervisorMsg) msg); } else if (msg instanceof TakeVmConsoleScreenshotMsg) { handle((TakeVmConsoleScreenshotMsg) msg); + } else if (msg instanceof RestartKvmAgentMsg) { + handle((RestartKvmAgentMsg) msg); } else { super.handleLocalMessage(msg); } } + private void handle(RestartKvmAgentMsg msg) { + RestartKvmAgentReply reply = new RestartKvmAgentReply(); + thdf.singleFlightSubmit(new SingleFlightTask(msg) + .setSyncSignature(String.format("restart-kvmagent-on-host-%s", msg.getHostUuid())) + .run(completion -> { + if (!destMaker.isManagedByUs(msg.getHostUuid())) { + completion.fail(operr("host %s is not managed by current mn node", msg.getHostUuid())); + return; + } + + restartKvmAgentOnHost(msg.isForce(), new Completion(completion) { + @Override + public void success() { + completion.success(null); + } + + @Override + public void fail(ErrorCode errorCode) { + completion.fail(errorCode); + } + }); + }).done(result -> { + if (!result.isSuccess()) { + reply.setError(result.getErrorCode()); + } + bus.reply(msg, reply); + }) + ); + } + + private void noRunningTaskOnHost(ReturnValueCompletion completion) { + String syncSignature = Host.buildId(self.getUuid()); + List nodeUuids = destMaker.getAllNodeInfo().stream().map(ResourceDestinationMaker.NodeInfo::getNodeUuid) + .collect(Collectors.toList()); + + AtomicBoolean queueIsEmpty = new AtomicBoolean(true); + new While<>(nodeUuids).step((mnId, compl) -> { + GetLocalTaskMsg gmsg = new GetLocalTaskMsg(); + gmsg.setOnlyRunningTask(true); + gmsg.setSyncSignatures(Collections.singletonList(syncSignature)); + bus.makeServiceIdByManagementNodeId(gmsg, CoreConstant.SERVICE_ID, mnId); + bus.send(gmsg, new CloudBusCallBack(compl) { + private String buildTaskInfo(List tasks) { + return tasks.stream().map(TaskInfo::getContext).collect(Collectors.joining("\n")); + } + + @Override + public void run(MessageReply r) { + if (!r.isSuccess()) { + compl.addError(r.getError()); + compl.allDone(); + return; + } + GetLocalTaskReply gr = r.castReply(); + ChainInfo chainInfo = gr.getResults().get(syncSignature); + if (!chainInfo.getRunningTask().isEmpty()) { + logger.debug(String.format("%s tasks exist on the running task of host %s, mnNodeId %s: %s...", chainInfo.getRunningTask().size(), + self.getUuid(), mnId, buildTaskInfo(chainInfo.getRunningTask()))); + queueIsEmpty.set(false); + compl.allDone(); + return; + } + compl.done(); + } + }); + }, 2).run(new WhileDoneCompletion(completion) { + @Override + public void done(ErrorCodeList errorCodeList) { + if (!errorCodeList.getCauses().isEmpty()) { + completion.fail(errorCodeList); + return; + } + completion.success(queueIsEmpty.get()); + } + }); + } + + private void restartKvmAgentOnHost(boolean force, Completion completion) { + SimpleFlowChain chain = new SimpleFlowChain(); + chain.setChainName("try-restart-kvmagent-on-host-" + self.getUuid()); + chain.then(new Flow() { + // changing the host connection status to 'Connecting' is to prevent sending HTTP requests to the kvmagent, + // which needs to be done before checking the host task queue + String __name__ = "change host state to connecting"; + + @Override + public void run(FlowTrigger trigger, Map data) { + if (self.getStatus() != HostStatus.Connected) { + trigger.fail(operr("host %s is not connected, skip to restart kvmagent", self.getUuid())); + return; + } + changeConnectionState(HostStatusEvent.connecting); + trigger.next(); + } + + @Override + public void rollback(FlowRollback trigger, Map data) { + if (self.getStatus() == HostStatus.Connecting) { + changeConnectionState(HostStatusEvent.connected); + } + trigger.rollback(); + } + }).then(new NoRollbackFlow() { + String __name__ = "check if the host task queue is empty"; + @Override + public boolean skip(Map data) { + return force; + } + + @Override + public void run(FlowTrigger trigger, Map data) { + noRunningTaskOnHost(new ReturnValueCompletion(trigger) { + @Override + public void success(Boolean noTask) { + if (noTask) { + trigger.next(); + } else { + trigger.fail(operr("running task exists on host %s", self.getUuid())); + } + } + + @Override + public void fail(ErrorCode errorCode) { + trigger.fail(errorCode); + } + }); + } + }).then(new NoRollbackFlow() { + String __name__ = "restart kvmagent on host " + self.getUuid(); + @Override + public boolean skip(Map data) { + return CoreGlobalProperty.UNIT_TEST_ON; + } + + @Override + public void run(FlowTrigger trigger, Map data) { + SshShell sshShell = new SshShell(); + sshShell.setHostname(getSelf().getManagementIp()); + sshShell.setUsername(getSelf().getUsername()); + sshShell.setPassword(getSelf().getPassword()); + sshShell.setPort(getSelf().getPort()); + SshResult ret = sshShell.runCommand("sudo service zstack-kvmagent restart"); + + if (ret.isSshFailure() || ret.getReturnCode() != 0) { + trigger.fail(operr(ret.getExitErrorMessage())); + } else { + trigger.next(); + } + } + }).then(new NoRollbackFlow() { + String __name__ = "wait for kvmagent to be ready"; + + @Override + public void run(FlowTrigger trigger, Map data) { + int retryCount = 60; + While.makeRetryWhile(retryCount).each((currentStep, compl) -> { + PingCmd cmd = new PingCmd(); + cmd.hostUuid = self.getUuid(); + restf.asyncJsonPost(pingPath, cmd, new JsonAsyncRESTCallback(compl) { + @Override + public void fail(ErrorCode err) { + try { + if (currentStep < retryCount) { + TimeUnit.SECONDS.sleep(1); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + compl.addError(err); + compl.done(); + } + } + + @Override + public void success(PingResponse ret) { + compl.allDone(); + } + + @Override + public Class getReturnClass() { + return PingResponse.class; + } + }, TimeUnit.SECONDS, HostGlobalConfig.PING_HOST_TIMEOUT.value(Long.class)); + }).run(new WhileDoneCompletion(trigger) { + @Override + public void done(ErrorCodeList errorCodeList) { + if (errorCodeList.getCauses().size() == retryCount) { + logger.debug("waiting for kvmagent to start timeout: " + errorCodeList.getCauses().get(0).getDetails()); + } + trigger.next(); + } + }); + } + }).then(new NoRollbackFlow() { + String __name__ = String.format("reconnect host %s after restart kvmagent", self.getUuid()); + + public void run(FlowTrigger trigger, Map data) { + ReconnectHostMsg rmsg = new ReconnectHostMsg(); + rmsg.setHostUuid(self.getUuid()); + bus.makeTargetServiceIdByResourceUuid(rmsg, HostConstant.SERVICE_ID, self.getUuid()); + bus.send(rmsg); + trigger.next(); + } + }).done(new FlowDoneHandler(completion) { + @Override + public void handle(Map data) { + completion.success(); + } + }).error(new FlowErrorHandler(completion) { + @Override + public void handle(ErrorCode errCode, Map data) { + completion.fail(errCode); + } + }).start(); + } + private void handle(TakeVmConsoleScreenshotMsg msg) { TakeVmConsoleScreenshotReply reply = new TakeVmConsoleScreenshotReply(); thdf.singleFlightSubmit(new SingleFlightTask(msg) @@ -4596,6 +4819,7 @@ public void setup() { public void run(FlowTrigger trigger, Map data) { PingCmd cmd = new PingCmd(); cmd.hostUuid = self.getUuid(); + cmd.configs = KVMHostFactory.configs; restf.asyncJsonPost(pingPath, cmd, new JsonAsyncRESTCallback(trigger) { @Override public void fail(ErrorCode err) { diff --git a/plugin/kvm/src/main/java/org/zstack/kvm/KVMHostFactory.java b/plugin/kvm/src/main/java/org/zstack/kvm/KVMHostFactory.java index 3784f2a76a9..c0f7db1096d 100755 --- a/plugin/kvm/src/main/java/org/zstack/kvm/KVMHostFactory.java +++ b/plugin/kvm/src/main/java/org/zstack/kvm/KVMHostFactory.java @@ -7,10 +7,9 @@ import org.zstack.compute.vm.CrashStrategy; import org.zstack.compute.vm.VmGlobalConfig; import org.zstack.compute.vm.VmNicManager; -import org.zstack.compute.vm.VmNicManagerImpl; import org.zstack.header.errorcode.ErrorCode; -import org.zstack.header.network.l2.L2NetworkRealizationExtensionPoint; -import org.zstack.header.network.l2.VSwitchType; +import org.zstack.core.config.*; +import org.zstack.core.config.schema.GuestOsCharacter; import org.zstack.header.tag.SystemTagInventory; import org.zstack.header.tag.SystemTagLifeCycleListener; import org.zstack.header.tag.SystemTagValidator; @@ -27,7 +26,6 @@ import org.zstack.core.config.GlobalConfigUpdateExtensionPoint; import org.zstack.core.config.GlobalConfigValidatorExtensionPoint; import org.zstack.core.config.schema.GuestOsCategory; -import org.zstack.core.config.schema.GuestOsCharacter; import org.zstack.core.db.DatabaseFacade; import org.zstack.core.db.Q; import org.zstack.core.db.SQL; @@ -56,12 +54,11 @@ import org.zstack.header.volume.*; import org.zstack.kvm.KVMAgentCommands.ReconnectMeCmd; import org.zstack.kvm.KVMAgentCommands.TransmitVmOperationToMnCmd; -import org.zstack.resourceconfig.ResourceConfigUpdateExtensionPoint; -import org.zstack.resourceconfig.ResourceConfigValidatorExtensionPoint; import org.zstack.utils.CollectionUtils; import org.zstack.utils.IpRangeSet; import org.zstack.utils.SizeUtils; import org.zstack.utils.Utils; +import org.zstack.utils.data.SizeUnit; import org.zstack.utils.form.Form; import org.zstack.utils.function.Function; import org.zstack.utils.function.ValidateFunction; @@ -110,6 +107,7 @@ public class KVMHostFactory extends AbstractService implements HypervisorFactory public static Map allGuestOsCharacter = new ConcurrentHashMap<>(); private final Map socketTimeoutMap = new ConcurrentHashMap<>(); + public static final Map configs = Collections.synchronizedMap(new HashMap<>()); static { RAW_FORMAT.newFormatInputOutputMapping(hypervisorType, QCOW2_FORMAT.toString()); @@ -141,8 +139,10 @@ public class KVMHostFactory extends AbstractService implements HypervisorFactory private VmInstanceDeviceManager vidm; @Autowired private VmNicManager vmNicManager; - + @Autowired + private GlobalConfigFacade gcf; private Future checkSocketChannelTimeoutThread; + public static int skipHostPingTimeWhenKvmagentBusy = 300; @Override public HostVO createHost(HostVO vo, AddHostMessage msg) { @@ -355,6 +355,21 @@ private void deployAnsibleModule() { asf.deployModule(KVMConstant.ANSIBLE_MODULE_PATH, KVMConstant.ANSIBLE_PLAYBOOK_NAME); } + + private void processKvmagentPhysicalMemUsageAbnormal(KVMAgentCommands.HostProcessPhysicalMemoryUsageAlarmCmd cmd) { + if (cmd.getMemoryUsage() <= gcf.getConfigValue(KVMGlobalConfig.CATEGORY, + KVMGlobalConfig.KVMAGENT_PHYSICAL_MEMORY_USAGE_HARD_LIMIT.getName(), Long.class)) { + return; + } + + logger.debug("The zstack-kvmagent service has exceeded the hard limit for physical memory usage, " + + "and we will try restart it later"); + RestartKvmAgentMsg restartKvmAgentMsg = new RestartKvmAgentMsg(); + restartKvmAgentMsg.setHostUuid(cmd.getHostUuid()); + bus.makeTargetServiceIdByResourceUuid(restartKvmAgentMsg, HostConstant.SERVICE_ID, restartKvmAgentMsg.getHostUuid()); + bus.send(restartKvmAgentMsg); + } + @Override public boolean start() { deployAnsibleModule(); @@ -400,6 +415,20 @@ public void validateGlobalConfig(String category, String name, String oldValue, } } }); + configs.put(KVMGlobalConfig.KVMAGENT_PHYSICAL_MEMORY_USAGE_ALARM_THRESHOLD.getName(), KVMGlobalConfig.KVMAGENT_PHYSICAL_MEMORY_USAGE_ALARM_THRESHOLD.value(Long.class)); + KVMGlobalConfig.KVMAGENT_PHYSICAL_MEMORY_USAGE_ALARM_THRESHOLD.installUpdateExtension(new GlobalConfigUpdateExtensionPoint() { + @Override + public void updateGlobalConfig(GlobalConfig oldConfig, GlobalConfig newConfig) { + configs.put(newConfig.getName(), newConfig.value(Long.class)); + } + }); + configs.put(KVMGlobalConfig.KVMAGENT_PHYSICAL_MEMORY_USAGE_HARD_LIMIT.getName(), KVMGlobalConfig.KVMAGENT_PHYSICAL_MEMORY_USAGE_HARD_LIMIT.value(Long.class)); + KVMGlobalConfig.KVMAGENT_PHYSICAL_MEMORY_USAGE_HARD_LIMIT.installUpdateExtension(new GlobalConfigUpdateExtensionPoint() { + @Override + public void updateGlobalConfig(GlobalConfig oldConfig, GlobalConfig newConfig) { + configs.put(newConfig.getName(), newConfig.value(Long.class)); + } + }); ResourceConfig resourceConfig = rcf.getResourceConfig(KVMGlobalConfig.VM_CPU_HYPERVISOR_FEATURE.getIdentity()); resourceConfig.installValidatorExtension((resourceUuid, oldValue, newValue) -> { if (Boolean.TRUE.toString().equals(newValue)) { @@ -503,6 +532,43 @@ public String handleSyncHttpCall(KVMAgentCommands.ReportVmRebootEventCmd cmd) { return null; }); + + restf.registerSyncHttpCallHandler(KVMConstant.HOST_PROCESS_PHYSICAL_MEMORY_USAGE_ALARM_PATH, KVMAgentCommands.HostProcessPhysicalMemoryUsageAlarmCmd.class, cmd -> { + HostCanonicalEvents.HostProcessPhysicalMemoryUsageAlarmData data = new HostCanonicalEvents.HostProcessPhysicalMemoryUsageAlarmData(); + data.setHostUuid(cmd.getHostUuid()); + data.setPid(cmd.getPid()); + data.setMemoryUsage(String.format("%s MB", cmd.getMemoryUsage() / 1048576)); + data.setProcessName(cmd.getProcessName()); + evf.fire(HostCanonicalEvents.HOST_PROCESS_PHYSICAL_MEMORY_USAGE_ABNORMAL, data); + + switch (data.getProcessName()) { + case "zstack-kvmagent": + processKvmagentPhysicalMemUsageAbnormal(cmd); + break; + default: + logger.debug(String.format("unknown process name[%s] in host[uuid:%s]", cmd.getProcessName(), cmd.getHostUuid())); + } + + return null; + }); + + restf.registerSyncHttpCallHandler(KVMConstant.HOST_KVMAGENT_STATUS_PATH, KVMAgentCommands.HostKvmagentStatusCmd.class, cmd -> { + if ("busy".equals(cmd.getStatus())) { + HostCanonicalEvents.HostPingSkipData data = new HostCanonicalEvents.HostPingSkipData(); + data.setHostUuid(cmd.getHostUuid()); + // this will skip host ping sometime if kvmagent busy + data.setSkipTimeInSec(skipHostPingTimeWhenKvmagentBusy * (int)(cmd.getMemoryUsage() / SizeUnit.GIGABYTE.toByte(4) + 1)); + evf.fire(HostCanonicalEvents.HOST_PING_SKIP, data); + } else if ("available".equals(cmd.getStatus())) { + HostCanonicalEvents.HostPingSkipData data = new HostCanonicalEvents.HostPingSkipData(); + data.setHostUuid(cmd.getHostUuid()); + evf.fire(HostCanonicalEvents.HOST_PING_CANCEL_SKIP, data); + } else { + logger.debug("unknown kvmagent status: " + cmd.getStatus()); + } + return null; + }); + KVMSystemTags.CHECK_CLUSTER_CPU_MODEL.installValidator(((resourceUuid, resourceType, systemTag) -> { String check = KVMSystemTags.CHECK_CLUSTER_CPU_MODEL.getTokenByTag(systemTag, KVMSystemTags.CHECK_CLUSTER_CPU_MODEL_TOKEN); diff --git a/plugin/kvm/src/main/java/org/zstack/kvm/RestartKvmAgentMsg.java b/plugin/kvm/src/main/java/org/zstack/kvm/RestartKvmAgentMsg.java new file mode 100644 index 00000000000..008b653af7f --- /dev/null +++ b/plugin/kvm/src/main/java/org/zstack/kvm/RestartKvmAgentMsg.java @@ -0,0 +1,26 @@ +package org.zstack.kvm; + +import org.zstack.header.host.HostMessage; +import org.zstack.header.message.NeedReplyMessage; + +public class RestartKvmAgentMsg extends NeedReplyMessage implements HostMessage { + String hostUuid; + boolean force; + + public boolean isForce() { + return force; + } + + public void setForce(boolean force) { + this.force = force; + } + + public void setHostUuid(String hostUuid) { + this.hostUuid = hostUuid; + } + + @Override + public String getHostUuid() { + return hostUuid; + } +} diff --git a/plugin/kvm/src/main/java/org/zstack/kvm/RestartKvmAgentReply.java b/plugin/kvm/src/main/java/org/zstack/kvm/RestartKvmAgentReply.java new file mode 100644 index 00000000000..e26f22981e0 --- /dev/null +++ b/plugin/kvm/src/main/java/org/zstack/kvm/RestartKvmAgentReply.java @@ -0,0 +1,6 @@ +package org.zstack.kvm; + +import org.zstack.header.message.MessageReply; + +public class RestartKvmAgentReply extends MessageReply { +} diff --git a/test/src/test/groovy/org/zstack/test/integration/kvm/host/KVMPingCase.groovy b/test/src/test/groovy/org/zstack/test/integration/kvm/host/KVMPingCase.groovy index 25697e33f4d..7948dadb3f4 100755 --- a/test/src/test/groovy/org/zstack/test/integration/kvm/host/KVMPingCase.groovy +++ b/test/src/test/groovy/org/zstack/test/integration/kvm/host/KVMPingCase.groovy @@ -6,6 +6,7 @@ import org.zstack.compute.host.HostManagerImpl import org.zstack.compute.host.HostReconnectTask import org.zstack.compute.host.HostTrackImpl import org.zstack.core.cloudbus.CloudBus +import org.zstack.core.config.GlobalConfigFacadeImpl import org.zstack.core.db.Q import org.zstack.core.db.SQL import org.zstack.header.core.NoErrorCompletion @@ -13,6 +14,7 @@ import org.zstack.header.errorcode.SysErrors import org.zstack.header.host.* import org.zstack.kvm.KVMAgentCommands import org.zstack.kvm.KVMConstant +import org.zstack.kvm.KVMGlobalConfig import org.zstack.kvm.KVMReconnectHostTask import org.zstack.sdk.ClusterInventory import org.zstack.sdk.HostInventory @@ -20,6 +22,7 @@ import org.zstack.test.integration.kvm.KvmTest import org.zstack.testlib.EnvSpec import org.zstack.testlib.SubCase import org.zstack.utils.FieldUtils +import org.zstack.utils.SizeUtils import org.zstack.utils.gson.JSONObjectUtil import java.util.concurrent.TimeUnit @@ -27,6 +30,7 @@ import java.util.concurrent.TimeUnit class KVMPingCase extends SubCase { EnvSpec env CloudBus bus + GlobalConfigFacadeImpl gcf @Override void clean() { @@ -98,8 +102,12 @@ class KVMPingCase extends SubCase { boolean pingSuccess = false + def kvmagent_mem_threshold = null + def kvmagent_mem_hard_limit = null env.afterSimulator(KVMConstant.KVM_PING_PATH) { KVMAgentCommands.PingResponse rsp, HttpEntity e -> KVMAgentCommands.PingCmd cmd = JSONObjectUtil.toObject(e.getBody(), KVMAgentCommands.PingCmd.class) + kvmagent_mem_threshold = cmd.configs.get(KVMGlobalConfig.KVMAGENT_PHYSICAL_MEMORY_USAGE_ALARM_THRESHOLD.getName()) + kvmagent_mem_hard_limit = cmd.configs.get(KVMGlobalConfig.KVMAGENT_PHYSICAL_MEMORY_USAGE_HARD_LIMIT.getName()) if (cmd.hostUuid == kvm1.uuid && !pingSuccess) { throw new RuntimeException("failure on purpose") @@ -110,6 +118,24 @@ class KVMPingCase extends SubCase { return rsp } + retryInSecs { + assert kvmagent_mem_hard_limit == SizeUtils.sizeStringToBytes("10G") + assert kvmagent_mem_threshold == SizeUtils.sizeStringToBytes("2G") + } + + KVMGlobalConfig.KVMAGENT_PHYSICAL_MEMORY_USAGE_ALARM_THRESHOLD.updateValue(SizeUtils.sizeStringToBytes("100M")) + KVMGlobalConfig.KVMAGENT_PHYSICAL_MEMORY_USAGE_HARD_LIMIT.updateValue(SizeUtils.sizeStringToBytes("200M")) + retryInSecs { + assert kvmagent_mem_hard_limit == SizeUtils.sizeStringToBytes("200M") + assert kvmagent_mem_threshold == SizeUtils.sizeStringToBytes("100M") + } + KVMGlobalConfig.KVMAGENT_PHYSICAL_MEMORY_USAGE_ALARM_THRESHOLD.updateValue(SizeUtils.sizeStringToBytes("5G")) + KVMGlobalConfig.KVMAGENT_PHYSICAL_MEMORY_USAGE_HARD_LIMIT.updateValue(SizeUtils.sizeStringToBytes("20G")) + retryInSecs { + assert kvmagent_mem_hard_limit == SizeUtils.sizeStringToBytes("20G") + assert kvmagent_mem_threshold == SizeUtils.sizeStringToBytes("5G") + } + updateGlobalConfig { category = "configurableTimeout" name = ReconnectHostMsg.class.name @@ -458,6 +484,7 @@ class KVMPingCase extends SubCase { @Override void test() { bus = bean(CloudBus.class) + gcf = bean(GlobalConfigFacadeImpl.class) env.create { HostGlobalConfig.PING_HOST_INTERVAL.updateValue(1) diff --git a/test/src/test/resources/globalConfig/kvm.xml b/test/src/test/resources/globalConfig/kvm.xml index 5e20aefbe0f..6fb42631828 100755 --- a/test/src/test/resources/globalConfig/kvm.xml +++ b/test/src/test/resources/globalConfig/kvm.xml @@ -256,4 +256,20 @@ 3600 java.lang.Integer + + + kvm + kvmagent.physicalmemory.usage.alarm.threshold + The threshold for the physical memory usage of the kvmagent process, exceeding which an alarm will be triggered. + 2147483648 + java.lang.Long + + + + kvm + kvmagent.physicalmemory.usage.hardlimit + The hard limit for the physical memory usage of the kvmagent process, exceeding this value will trigger a kvmagent restart. + 10737418240 + java.lang.Long + From fb9e84e6e4091b018a45b0af5fe9fa7b86723e01 Mon Sep 17 00:00:00 2001 From: "shan.wu" Date: Mon, 20 Jan 2025 11:00:05 +0800 Subject: [PATCH 2/2] [agent]: skip host ping when dumping kvmagent MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1、skip host ping when dumping kvmagent 2、Wait for the kvmagent service to fully start before reconnecting to the host to avoid restarting twice. Resolves/Related: ZSTAC-72552 Change-Id: I676f656f6f6d6b6565656e737677666667767176 (cherry picked from commit 914853d45722c07c9626d1a85534141cdbf692ed) # Conflicts: # plugin/kvm/src/main/java/org/zstack/kvm/KVMHostFactory.java