diff --git a/compute/src/main/java/org/zstack/compute/host/HostTrackImpl.java b/compute/src/main/java/org/zstack/compute/host/HostTrackImpl.java index 3d5684914b0..49394e88b78 100755 --- a/compute/src/main/java/org/zstack/compute/host/HostTrackImpl.java +++ b/compute/src/main/java/org/zstack/compute/host/HostTrackImpl.java @@ -1,5 +1,7 @@ package org.zstack.compute.host; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import org.springframework.beans.factory.annotation.Autowired; import org.zstack.core.CoreGlobalProperty; import org.zstack.core.cloudbus.*; @@ -35,6 +37,7 @@ public class HostTrackImpl implements HostTracker, ManagementNodeChangeListener, private Map trackers = new ConcurrentHashMap<>(); private static boolean alwaysStartRightNow = false; + private static final Cache skippedPingHostDeadline = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.HOURS).build(); @Autowired private DatabaseFacade dbf; @@ -124,6 +127,12 @@ private void track() { return; } + if (skippedPingHostDeadline.getIfPresent(uuid) != null && System.currentTimeMillis() / 1000 <= skippedPingHostDeadline.getIfPresent(uuid)) { + logger.debug(String.format("skip tracking host[uuid:%s] this time, deadline %s", uuid, skippedPingHostDeadline.getIfPresent(uuid))); + continueToRunThisTimer(); + return; + } + PingHostMsg msg = new PingHostMsg(); msg.setHostUuid(uuid); bus.makeLocalServiceId(msg, HostConstant.SERVICE_ID); @@ -341,6 +350,7 @@ private HostReconnectTaskFactory getHostReconnectTaskFactory(String hvType) { public boolean start() { populateExtensions(); onHostStatusChange(); + onHostPingSkip(); HostGlobalConfig.PING_HOST_INTERVAL.installUpdateExtension((oldConfig, newConfig) -> { logger.debug(String.format("%s change from %s to %s, restart host trackers", @@ -386,6 +396,24 @@ protected void run(Map tokens, Object data) { }); } + private void onHostPingSkip() { + evtf.on(HostCanonicalEvents.HOST_PING_SKIP, new EventCallback() { + @Override + protected void run(Map tokens, Object data) { + HostCanonicalEvents.HostPingSkipData d = (HostCanonicalEvents.HostPingSkipData) data; + Long deadline = System.currentTimeMillis() / 1000 + d.getSkipTimeInSec(); + skippedPingHostDeadline.put(d.getHostUuid(), deadline); + } + }); + evtf.on(HostCanonicalEvents.HOST_PING_CANCEL_SKIP, new EventCallback() { + @Override + protected void run(Map tokens, Object data) { + HostCanonicalEvents.HostPingSkipData d = (HostCanonicalEvents.HostPingSkipData) data; + skippedPingHostDeadline.invalidate(d.getHostUuid()); + } + }); + } + @Override public boolean stop() { return true; diff --git a/conf/globalConfig/kvm.xml b/conf/globalConfig/kvm.xml index 771709113ce..f600dbb12bc 100755 --- a/conf/globalConfig/kvm.xml +++ b/conf/globalConfig/kvm.xml @@ -257,4 +257,20 @@ 1800 java.lang.Integer + + + kvm + kvmagent.physicalmemory.usage.alarm.threshold + The threshold for the physical memory usage of the kvmagent process, exceeding which an alarm will be triggered. + 2147483648 + java.lang.Long + + + + kvm + kvmagent.physicalmemory.usage.hardlimit + The hard limit for the physical memory usage of the kvmagent process, exceeding this value will trigger a kvmagent restart. + 10737418240 + java.lang.Long + diff --git a/core/src/main/java/org/zstack/core/CoreManagerImpl.java b/core/src/main/java/org/zstack/core/CoreManagerImpl.java index 909a0b25405..08a86f57c15 100644 --- a/core/src/main/java/org/zstack/core/CoreManagerImpl.java +++ b/core/src/main/java/org/zstack/core/CoreManagerImpl.java @@ -98,6 +98,9 @@ private void handle(GetLocalTaskMsg msg) { GetLocalTaskReply reply = new GetLocalTaskReply(); Map results = msg.getSyncSignatures().stream() .collect(Collectors.toMap(Function.identity(), thdf::getChainTaskInfo)); + if (msg.isOnlyRunningTask()) { + results.values().forEach(c -> c.getPendingTask().clear()); + } reply.setResults(results); bus.reply(msg, reply); } diff --git a/header/src/main/java/org/zstack/header/core/GetLocalTaskMsg.java b/header/src/main/java/org/zstack/header/core/GetLocalTaskMsg.java index 9e41fd36b3b..6a7675b52ac 100644 --- a/header/src/main/java/org/zstack/header/core/GetLocalTaskMsg.java +++ b/header/src/main/java/org/zstack/header/core/GetLocalTaskMsg.java @@ -6,6 +6,15 @@ public class GetLocalTaskMsg extends NeedReplyMessage { private List syncSignatures; + private boolean onlyRunningTask; + + public boolean isOnlyRunningTask() { + return onlyRunningTask; + } + + public void setOnlyRunningTask(boolean onlyRunningTask) { + this.onlyRunningTask = onlyRunningTask; + } public void setSyncSignatures(List syncSignatures) { this.syncSignatures = syncSignatures; diff --git a/header/src/main/java/org/zstack/header/host/HostCanonicalEvents.java b/header/src/main/java/org/zstack/header/host/HostCanonicalEvents.java index a0ecc4c0046..3350d926a1e 100755 --- a/header/src/main/java/org/zstack/header/host/HostCanonicalEvents.java +++ b/header/src/main/java/org/zstack/header/host/HostCanonicalEvents.java @@ -23,6 +23,10 @@ public class HostCanonicalEvents { public static final String HOST_PHYSICAL_DISK_STATUS_ABNORMAL = "/host/physicalDisk/status/abnormal"; public static final String HOST_PHYSICAL_DISK_INSERT_TRIGGERED = "/host/physicalDisk/insert/triggered"; public static final String HOST_PHYSICAL_DISK_REMOVE_TRIGGERED = "/host/physicalDisk/remove/triggered"; + public static final String HOST_PROCESS_PHYSICAL_MEMORY_USAGE_ABNORMAL = "/host/process/physicalMemory/usage/abnormal"; + public static final String HOST_PING_SKIP = "/host/ping/skip"; + public static final String HOST_PING_CANCEL_SKIP = "/host/ping/cancel/skip"; + @NeedJsonSchema public static class HostPhysicalCpuStatusAbnormalData { @@ -399,4 +403,66 @@ public void setInterfaceStatus(String interfaceStatus) { this.interfaceStatus = interfaceStatus; } } + + @NeedJsonSchema + public static class HostProcessPhysicalMemoryUsageAlarmData { + private String hostUuid; + private String pid; + private String processName; + private String memoryUsage; + + public String getHostUuid() { + return hostUuid; + } + + public void setHostUuid(String hostUuid) { + this.hostUuid = hostUuid; + } + + public String getPid() { + return pid; + } + + public void setPid(String pid) { + this.pid = pid; + } + + public String getProcessName() { + return processName; + } + + public void setProcessName(String processName) { + this.processName = processName; + } + + public String getMemoryUsage() { + return memoryUsage; + } + + public void setMemoryUsage(String memoryUsage) { + this.memoryUsage = memoryUsage; + } + } + + @NeedJsonSchema + public static class HostPingSkipData { + private String hostUuid; + private int skipTimeInSec; + + public String getHostUuid() { + return hostUuid; + } + + public void setHostUuid(String hostUuid) { + this.hostUuid = hostUuid; + } + + public int getSkipTimeInSec() { + return skipTimeInSec; + } + + public void setSkipTimeInSec(int skipTimeInSec) { + this.skipTimeInSec = skipTimeInSec; + } + } } diff --git a/plugin/kvm/src/main/java/org/zstack/kvm/KVMAgentCommands.java b/plugin/kvm/src/main/java/org/zstack/kvm/KVMAgentCommands.java index 2c8bdc4a3e4..c6fe4ca154d 100755 --- a/plugin/kvm/src/main/java/org/zstack/kvm/KVMAgentCommands.java +++ b/plugin/kvm/src/main/java/org/zstack/kvm/KVMAgentCommands.java @@ -330,6 +330,7 @@ public void setQemuVersion(String qemuVersion) { public static class PingCmd extends AgentCommand { public String hostUuid; + public Map configs; } public static class PingResponse extends AgentResponse { @@ -4127,4 +4128,83 @@ public void setImageData(String imageData) { this.imageData = imageData; } } + + public static class HostProcessPhysicalMemoryUsageAlarmCmd { + private String hostUuid; + private String pid; + private String processName; + private long memoryUsage; + private Map additionalProperties = new HashMap<>(); + + public String getHostUuid() { + return hostUuid; + } + + public void setHostUuid(String hostUuid) { + this.hostUuid = hostUuid; + } + + public String getPid() { + return pid; + } + + public void setPid(String pid) { + this.pid = pid; + } + + public String getProcessName() { + return processName; + } + + public void setProcessName(String processName) { + this.processName = processName; + } + + public long getMemoryUsage() { + return memoryUsage; + } + + public void setMemoryUsage(long memoryUsage) { + this.memoryUsage = memoryUsage; + } + + public Map getAdditionalProperties() { + return additionalProperties; + } + + public void setAdditionalProperties(Map additionalProperties) { + this.additionalProperties = additionalProperties; + } + } + + public static class HostKvmagentStatusCmd { + private String status; + private String hostUuid; + private long memoryUsage; + + public String getHostUuid() { + return hostUuid; + } + + public void setHostUuid(String hostUuid) { + this.hostUuid = hostUuid; + } + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + + public long getMemoryUsage() { + return memoryUsage; + } + + public void setMemoryUsage(long memoryUsage) { + this.memoryUsage = memoryUsage; + } + } + } diff --git a/plugin/kvm/src/main/java/org/zstack/kvm/KVMConstant.java b/plugin/kvm/src/main/java/org/zstack/kvm/KVMConstant.java index 12a0d0a77c4..65edf86fa64 100755 --- a/plugin/kvm/src/main/java/org/zstack/kvm/KVMConstant.java +++ b/plugin/kvm/src/main/java/org/zstack/kvm/KVMConstant.java @@ -109,6 +109,9 @@ public interface KVMConstant { String KVM_BLOCK_COMMIT_VOLUME_PATH = "/vm/volume/blockcommit"; String TAKE_VM_CONSOLE_SCREENSHOT_PATH = "/vm/console/screenshot"; + String HOST_PROCESS_PHYSICAL_MEMORY_USAGE_ALARM_PATH = "/host/process/physicalMemory/usage/alarm"; + String HOST_KVMAGENT_STATUS_PATH = "/host/kvmagent/status"; + String KVM_AGENT_OWNER = "kvm"; String ALI_REPO = "ali"; diff --git a/plugin/kvm/src/main/java/org/zstack/kvm/KVMGlobalConfig.java b/plugin/kvm/src/main/java/org/zstack/kvm/KVMGlobalConfig.java index 17ca18ec2a5..71e9cc8d20d 100755 --- a/plugin/kvm/src/main/java/org/zstack/kvm/KVMGlobalConfig.java +++ b/plugin/kvm/src/main/java/org/zstack/kvm/KVMGlobalConfig.java @@ -1,6 +1,5 @@ package org.zstack.kvm; -import org.zstack.core.GlobalProperty; import org.zstack.core.config.GlobalConfig; import org.zstack.core.config.GlobalConfigDef; import org.zstack.core.config.GlobalConfigDefinition; @@ -130,4 +129,15 @@ public class KVMGlobalConfig { @GlobalConfigDef(defaultValue = "none", description = "enable host ksm") @BindResourceConfig({HostVO.class}) public static GlobalConfig HOST_KSM = new GlobalConfig(CATEGORY, "host.ksm"); + + @GlobalConfigValidation(validValues = {"true", "false"}) + @GlobalConfigDef(defaultValue = "false", description = "restart kvm host libvirtd service or not") + @BindResourceConfig({HostVO.class, ClusterVO.class}) + public static GlobalConfig RECONNECT_HOST_RESTART_LIBVIRTD_SERVICE = new GlobalConfig(CATEGORY, "reconnect.host.restart.libvirtd.service"); + + @GlobalConfigValidation(numberGreaterThan = 0) + public static GlobalConfig KVMAGENT_PHYSICAL_MEMORY_USAGE_ALARM_THRESHOLD = new GlobalConfig(CATEGORY, "kvmagent.physicalmemory.usage.alarm.threshold"); + + @GlobalConfigValidation(numberGreaterThan = 0) + public static GlobalConfig KVMAGENT_PHYSICAL_MEMORY_USAGE_HARD_LIMIT = new GlobalConfig(CATEGORY, "kvmagent.physicalmemory.usage.hardlimit"); } diff --git a/plugin/kvm/src/main/java/org/zstack/kvm/KVMHost.java b/plugin/kvm/src/main/java/org/zstack/kvm/KVMHost.java index 40dd23224c1..9cf4a1ac697 100755 --- a/plugin/kvm/src/main/java/org/zstack/kvm/KVMHost.java +++ b/plugin/kvm/src/main/java/org/zstack/kvm/KVMHost.java @@ -21,8 +21,10 @@ import org.zstack.core.Platform; import org.zstack.core.agent.AgentConstant; import org.zstack.core.ansible.*; +import org.zstack.core.asyncbatch.While; import org.zstack.core.cloudbus.CloudBusCallBack; import org.zstack.core.cloudbus.CloudBusGlobalProperty; +import org.zstack.core.cloudbus.ResourceDestinationMaker; import org.zstack.core.componentloader.PluginRegistry; import org.zstack.core.db.Q; import org.zstack.core.db.SQL; @@ -33,6 +35,7 @@ import org.zstack.core.timeout.ApiTimeoutManager; import org.zstack.core.workflow.FlowChainBuilder; import org.zstack.core.workflow.ShareFlow; +import org.zstack.core.workflow.SimpleFlowChain; import org.zstack.header.Constants; import org.zstack.header.allocator.DesignatedAllocateHostMsg; import org.zstack.header.allocator.HostAllocatorConstant; @@ -40,10 +43,9 @@ import org.zstack.header.cluster.ClusterInventory; import org.zstack.header.cluster.ClusterVO; import org.zstack.header.cluster.ReportHostCapacityMessage; -import org.zstack.header.core.AsyncLatch; -import org.zstack.header.core.Completion; -import org.zstack.header.core.NoErrorCompletion; -import org.zstack.header.core.ReturnValueCompletion; +import org.zstack.header.core.*; +import org.zstack.header.core.progress.ChainInfo; +import org.zstack.header.core.progress.TaskInfo; import org.zstack.header.core.progress.TaskProgressRange; import org.zstack.header.core.workflow.*; import org.zstack.header.errorcode.ErrorCode; @@ -107,6 +109,7 @@ import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -158,6 +161,8 @@ public class KVMHost extends HostBase implements Host { private TimeHelper timeHelper; @Autowired private AccountManager accountMgr; + @Autowired + private ResourceDestinationMaker destMaker; private KVMHostContext context; @@ -679,11 +684,229 @@ protected void handleLocalMessage(Message msg) { handle((CommitVolumeOnHypervisorMsg) msg); } else if (msg instanceof TakeVmConsoleScreenshotMsg) { handle((TakeVmConsoleScreenshotMsg) msg); + } else if (msg instanceof RestartKvmAgentMsg) { + handle((RestartKvmAgentMsg) msg); } else { super.handleLocalMessage(msg); } } + private void handle(RestartKvmAgentMsg msg) { + RestartKvmAgentReply reply = new RestartKvmAgentReply(); + thdf.singleFlightSubmit(new SingleFlightTask(msg) + .setSyncSignature(String.format("restart-kvmagent-on-host-%s", msg.getHostUuid())) + .run(completion -> { + if (!destMaker.isManagedByUs(msg.getHostUuid())) { + completion.fail(operr("host %s is not managed by current mn node", msg.getHostUuid())); + return; + } + + restartKvmAgentOnHost(msg.isForce(), new Completion(completion) { + @Override + public void success() { + completion.success(null); + } + + @Override + public void fail(ErrorCode errorCode) { + completion.fail(errorCode); + } + }); + }).done(result -> { + if (!result.isSuccess()) { + reply.setError(result.getErrorCode()); + } + bus.reply(msg, reply); + }) + ); + } + + private void noRunningTaskOnHost(ReturnValueCompletion completion) { + String syncSignature = Host.buildId(self.getUuid()); + List nodeUuids = destMaker.getAllNodeInfo().stream().map(ResourceDestinationMaker.NodeInfo::getNodeUuid) + .collect(Collectors.toList()); + + AtomicBoolean queueIsEmpty = new AtomicBoolean(true); + new While<>(nodeUuids).step((mnId, compl) -> { + GetLocalTaskMsg gmsg = new GetLocalTaskMsg(); + gmsg.setOnlyRunningTask(true); + gmsg.setSyncSignatures(Collections.singletonList(syncSignature)); + bus.makeServiceIdByManagementNodeId(gmsg, CoreConstant.SERVICE_ID, mnId); + bus.send(gmsg, new CloudBusCallBack(compl) { + private String buildTaskInfo(List tasks) { + return tasks.stream().map(TaskInfo::getContext).collect(Collectors.joining("\n")); + } + + @Override + public void run(MessageReply r) { + if (!r.isSuccess()) { + compl.addError(r.getError()); + compl.allDone(); + return; + } + GetLocalTaskReply gr = r.castReply(); + ChainInfo chainInfo = gr.getResults().get(syncSignature); + if (!chainInfo.getRunningTask().isEmpty()) { + logger.debug(String.format("%s tasks exist on the running task of host %s, mnNodeId %s: %s...", chainInfo.getRunningTask().size(), + self.getUuid(), mnId, buildTaskInfo(chainInfo.getRunningTask()))); + queueIsEmpty.set(false); + compl.allDone(); + return; + } + compl.done(); + } + }); + }, 2).run(new WhileDoneCompletion(completion) { + @Override + public void done(ErrorCodeList errorCodeList) { + if (!errorCodeList.getCauses().isEmpty()) { + completion.fail(errorCodeList); + return; + } + completion.success(queueIsEmpty.get()); + } + }); + } + + private void restartKvmAgentOnHost(boolean force, Completion completion) { + SimpleFlowChain chain = new SimpleFlowChain(); + chain.setChainName("try-restart-kvmagent-on-host-" + self.getUuid()); + chain.then(new Flow() { + // changing the host connection status to 'Connecting' is to prevent sending HTTP requests to the kvmagent, + // which needs to be done before checking the host task queue + String __name__ = "change host state to connecting"; + + @Override + public void run(FlowTrigger trigger, Map data) { + if (self.getStatus() != HostStatus.Connected) { + trigger.fail(operr("host %s is not connected, skip to restart kvmagent", self.getUuid())); + return; + } + changeConnectionState(HostStatusEvent.connecting); + trigger.next(); + } + + @Override + public void rollback(FlowRollback trigger, Map data) { + if (self.getStatus() == HostStatus.Connecting) { + changeConnectionState(HostStatusEvent.connected); + } + trigger.rollback(); + } + }).then(new NoRollbackFlow() { + String __name__ = "check if the host task queue is empty"; + @Override + public boolean skip(Map data) { + return force; + } + + @Override + public void run(FlowTrigger trigger, Map data) { + noRunningTaskOnHost(new ReturnValueCompletion(trigger) { + @Override + public void success(Boolean noTask) { + if (noTask) { + trigger.next(); + } else { + trigger.fail(operr("running task exists on host %s", self.getUuid())); + } + } + + @Override + public void fail(ErrorCode errorCode) { + trigger.fail(errorCode); + } + }); + } + }).then(new NoRollbackFlow() { + String __name__ = "restart kvmagent on host " + self.getUuid(); + @Override + public boolean skip(Map data) { + return CoreGlobalProperty.UNIT_TEST_ON; + } + + @Override + public void run(FlowTrigger trigger, Map data) { + SshShell sshShell = new SshShell(); + sshShell.setHostname(getSelf().getManagementIp()); + sshShell.setUsername(getSelf().getUsername()); + sshShell.setPassword(getSelf().getPassword()); + sshShell.setPort(getSelf().getPort()); + SshResult ret = sshShell.runCommand("sudo service zstack-kvmagent restart"); + + if (ret.isSshFailure() || ret.getReturnCode() != 0) { + trigger.fail(operr(ret.getExitErrorMessage())); + } else { + trigger.next(); + } + } + }).then(new NoRollbackFlow() { + String __name__ = "wait for kvmagent to be ready"; + + @Override + public void run(FlowTrigger trigger, Map data) { + int retryCount = 60; + While.makeRetryWhile(retryCount).each((currentStep, compl) -> { + PingCmd cmd = new PingCmd(); + cmd.hostUuid = self.getUuid(); + restf.asyncJsonPost(pingPath, cmd, new JsonAsyncRESTCallback(compl) { + @Override + public void fail(ErrorCode err) { + try { + if (currentStep < retryCount) { + TimeUnit.SECONDS.sleep(1); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + compl.addError(err); + compl.done(); + } + } + + @Override + public void success(PingResponse ret) { + compl.allDone(); + } + + @Override + public Class getReturnClass() { + return PingResponse.class; + } + }, TimeUnit.SECONDS, HostGlobalConfig.PING_HOST_TIMEOUT.value(Long.class)); + }).run(new WhileDoneCompletion(trigger) { + @Override + public void done(ErrorCodeList errorCodeList) { + if (errorCodeList.getCauses().size() == retryCount) { + logger.debug("waiting for kvmagent to start timeout: " + errorCodeList.getCauses().get(0).getDetails()); + } + trigger.next(); + } + }); + } + }).then(new NoRollbackFlow() { + String __name__ = String.format("reconnect host %s after restart kvmagent", self.getUuid()); + + public void run(FlowTrigger trigger, Map data) { + ReconnectHostMsg rmsg = new ReconnectHostMsg(); + rmsg.setHostUuid(self.getUuid()); + bus.makeTargetServiceIdByResourceUuid(rmsg, HostConstant.SERVICE_ID, self.getUuid()); + bus.send(rmsg); + trigger.next(); + } + }).done(new FlowDoneHandler(completion) { + @Override + public void handle(Map data) { + completion.success(); + } + }).error(new FlowErrorHandler(completion) { + @Override + public void handle(ErrorCode errCode, Map data) { + completion.fail(errCode); + } + }).start(); + } + private void handle(TakeVmConsoleScreenshotMsg msg) { TakeVmConsoleScreenshotReply reply = new TakeVmConsoleScreenshotReply(); thdf.singleFlightSubmit(new SingleFlightTask(msg) @@ -4596,6 +4819,7 @@ public void setup() { public void run(FlowTrigger trigger, Map data) { PingCmd cmd = new PingCmd(); cmd.hostUuid = self.getUuid(); + cmd.configs = KVMHostFactory.configs; restf.asyncJsonPost(pingPath, cmd, new JsonAsyncRESTCallback(trigger) { @Override public void fail(ErrorCode err) { diff --git a/plugin/kvm/src/main/java/org/zstack/kvm/KVMHostFactory.java b/plugin/kvm/src/main/java/org/zstack/kvm/KVMHostFactory.java index 3784f2a76a9..c0f7db1096d 100755 --- a/plugin/kvm/src/main/java/org/zstack/kvm/KVMHostFactory.java +++ b/plugin/kvm/src/main/java/org/zstack/kvm/KVMHostFactory.java @@ -7,10 +7,9 @@ import org.zstack.compute.vm.CrashStrategy; import org.zstack.compute.vm.VmGlobalConfig; import org.zstack.compute.vm.VmNicManager; -import org.zstack.compute.vm.VmNicManagerImpl; import org.zstack.header.errorcode.ErrorCode; -import org.zstack.header.network.l2.L2NetworkRealizationExtensionPoint; -import org.zstack.header.network.l2.VSwitchType; +import org.zstack.core.config.*; +import org.zstack.core.config.schema.GuestOsCharacter; import org.zstack.header.tag.SystemTagInventory; import org.zstack.header.tag.SystemTagLifeCycleListener; import org.zstack.header.tag.SystemTagValidator; @@ -27,7 +26,6 @@ import org.zstack.core.config.GlobalConfigUpdateExtensionPoint; import org.zstack.core.config.GlobalConfigValidatorExtensionPoint; import org.zstack.core.config.schema.GuestOsCategory; -import org.zstack.core.config.schema.GuestOsCharacter; import org.zstack.core.db.DatabaseFacade; import org.zstack.core.db.Q; import org.zstack.core.db.SQL; @@ -56,12 +54,11 @@ import org.zstack.header.volume.*; import org.zstack.kvm.KVMAgentCommands.ReconnectMeCmd; import org.zstack.kvm.KVMAgentCommands.TransmitVmOperationToMnCmd; -import org.zstack.resourceconfig.ResourceConfigUpdateExtensionPoint; -import org.zstack.resourceconfig.ResourceConfigValidatorExtensionPoint; import org.zstack.utils.CollectionUtils; import org.zstack.utils.IpRangeSet; import org.zstack.utils.SizeUtils; import org.zstack.utils.Utils; +import org.zstack.utils.data.SizeUnit; import org.zstack.utils.form.Form; import org.zstack.utils.function.Function; import org.zstack.utils.function.ValidateFunction; @@ -110,6 +107,7 @@ public class KVMHostFactory extends AbstractService implements HypervisorFactory public static Map allGuestOsCharacter = new ConcurrentHashMap<>(); private final Map socketTimeoutMap = new ConcurrentHashMap<>(); + public static final Map configs = Collections.synchronizedMap(new HashMap<>()); static { RAW_FORMAT.newFormatInputOutputMapping(hypervisorType, QCOW2_FORMAT.toString()); @@ -141,8 +139,10 @@ public class KVMHostFactory extends AbstractService implements HypervisorFactory private VmInstanceDeviceManager vidm; @Autowired private VmNicManager vmNicManager; - + @Autowired + private GlobalConfigFacade gcf; private Future checkSocketChannelTimeoutThread; + public static int skipHostPingTimeWhenKvmagentBusy = 300; @Override public HostVO createHost(HostVO vo, AddHostMessage msg) { @@ -355,6 +355,21 @@ private void deployAnsibleModule() { asf.deployModule(KVMConstant.ANSIBLE_MODULE_PATH, KVMConstant.ANSIBLE_PLAYBOOK_NAME); } + + private void processKvmagentPhysicalMemUsageAbnormal(KVMAgentCommands.HostProcessPhysicalMemoryUsageAlarmCmd cmd) { + if (cmd.getMemoryUsage() <= gcf.getConfigValue(KVMGlobalConfig.CATEGORY, + KVMGlobalConfig.KVMAGENT_PHYSICAL_MEMORY_USAGE_HARD_LIMIT.getName(), Long.class)) { + return; + } + + logger.debug("The zstack-kvmagent service has exceeded the hard limit for physical memory usage, " + + "and we will try restart it later"); + RestartKvmAgentMsg restartKvmAgentMsg = new RestartKvmAgentMsg(); + restartKvmAgentMsg.setHostUuid(cmd.getHostUuid()); + bus.makeTargetServiceIdByResourceUuid(restartKvmAgentMsg, HostConstant.SERVICE_ID, restartKvmAgentMsg.getHostUuid()); + bus.send(restartKvmAgentMsg); + } + @Override public boolean start() { deployAnsibleModule(); @@ -400,6 +415,20 @@ public void validateGlobalConfig(String category, String name, String oldValue, } } }); + configs.put(KVMGlobalConfig.KVMAGENT_PHYSICAL_MEMORY_USAGE_ALARM_THRESHOLD.getName(), KVMGlobalConfig.KVMAGENT_PHYSICAL_MEMORY_USAGE_ALARM_THRESHOLD.value(Long.class)); + KVMGlobalConfig.KVMAGENT_PHYSICAL_MEMORY_USAGE_ALARM_THRESHOLD.installUpdateExtension(new GlobalConfigUpdateExtensionPoint() { + @Override + public void updateGlobalConfig(GlobalConfig oldConfig, GlobalConfig newConfig) { + configs.put(newConfig.getName(), newConfig.value(Long.class)); + } + }); + configs.put(KVMGlobalConfig.KVMAGENT_PHYSICAL_MEMORY_USAGE_HARD_LIMIT.getName(), KVMGlobalConfig.KVMAGENT_PHYSICAL_MEMORY_USAGE_HARD_LIMIT.value(Long.class)); + KVMGlobalConfig.KVMAGENT_PHYSICAL_MEMORY_USAGE_HARD_LIMIT.installUpdateExtension(new GlobalConfigUpdateExtensionPoint() { + @Override + public void updateGlobalConfig(GlobalConfig oldConfig, GlobalConfig newConfig) { + configs.put(newConfig.getName(), newConfig.value(Long.class)); + } + }); ResourceConfig resourceConfig = rcf.getResourceConfig(KVMGlobalConfig.VM_CPU_HYPERVISOR_FEATURE.getIdentity()); resourceConfig.installValidatorExtension((resourceUuid, oldValue, newValue) -> { if (Boolean.TRUE.toString().equals(newValue)) { @@ -503,6 +532,43 @@ public String handleSyncHttpCall(KVMAgentCommands.ReportVmRebootEventCmd cmd) { return null; }); + + restf.registerSyncHttpCallHandler(KVMConstant.HOST_PROCESS_PHYSICAL_MEMORY_USAGE_ALARM_PATH, KVMAgentCommands.HostProcessPhysicalMemoryUsageAlarmCmd.class, cmd -> { + HostCanonicalEvents.HostProcessPhysicalMemoryUsageAlarmData data = new HostCanonicalEvents.HostProcessPhysicalMemoryUsageAlarmData(); + data.setHostUuid(cmd.getHostUuid()); + data.setPid(cmd.getPid()); + data.setMemoryUsage(String.format("%s MB", cmd.getMemoryUsage() / 1048576)); + data.setProcessName(cmd.getProcessName()); + evf.fire(HostCanonicalEvents.HOST_PROCESS_PHYSICAL_MEMORY_USAGE_ABNORMAL, data); + + switch (data.getProcessName()) { + case "zstack-kvmagent": + processKvmagentPhysicalMemUsageAbnormal(cmd); + break; + default: + logger.debug(String.format("unknown process name[%s] in host[uuid:%s]", cmd.getProcessName(), cmd.getHostUuid())); + } + + return null; + }); + + restf.registerSyncHttpCallHandler(KVMConstant.HOST_KVMAGENT_STATUS_PATH, KVMAgentCommands.HostKvmagentStatusCmd.class, cmd -> { + if ("busy".equals(cmd.getStatus())) { + HostCanonicalEvents.HostPingSkipData data = new HostCanonicalEvents.HostPingSkipData(); + data.setHostUuid(cmd.getHostUuid()); + // this will skip host ping sometime if kvmagent busy + data.setSkipTimeInSec(skipHostPingTimeWhenKvmagentBusy * (int)(cmd.getMemoryUsage() / SizeUnit.GIGABYTE.toByte(4) + 1)); + evf.fire(HostCanonicalEvents.HOST_PING_SKIP, data); + } else if ("available".equals(cmd.getStatus())) { + HostCanonicalEvents.HostPingSkipData data = new HostCanonicalEvents.HostPingSkipData(); + data.setHostUuid(cmd.getHostUuid()); + evf.fire(HostCanonicalEvents.HOST_PING_CANCEL_SKIP, data); + } else { + logger.debug("unknown kvmagent status: " + cmd.getStatus()); + } + return null; + }); + KVMSystemTags.CHECK_CLUSTER_CPU_MODEL.installValidator(((resourceUuid, resourceType, systemTag) -> { String check = KVMSystemTags.CHECK_CLUSTER_CPU_MODEL.getTokenByTag(systemTag, KVMSystemTags.CHECK_CLUSTER_CPU_MODEL_TOKEN); diff --git a/plugin/kvm/src/main/java/org/zstack/kvm/RestartKvmAgentMsg.java b/plugin/kvm/src/main/java/org/zstack/kvm/RestartKvmAgentMsg.java new file mode 100644 index 00000000000..008b653af7f --- /dev/null +++ b/plugin/kvm/src/main/java/org/zstack/kvm/RestartKvmAgentMsg.java @@ -0,0 +1,26 @@ +package org.zstack.kvm; + +import org.zstack.header.host.HostMessage; +import org.zstack.header.message.NeedReplyMessage; + +public class RestartKvmAgentMsg extends NeedReplyMessage implements HostMessage { + String hostUuid; + boolean force; + + public boolean isForce() { + return force; + } + + public void setForce(boolean force) { + this.force = force; + } + + public void setHostUuid(String hostUuid) { + this.hostUuid = hostUuid; + } + + @Override + public String getHostUuid() { + return hostUuid; + } +} diff --git a/plugin/kvm/src/main/java/org/zstack/kvm/RestartKvmAgentReply.java b/plugin/kvm/src/main/java/org/zstack/kvm/RestartKvmAgentReply.java new file mode 100644 index 00000000000..e26f22981e0 --- /dev/null +++ b/plugin/kvm/src/main/java/org/zstack/kvm/RestartKvmAgentReply.java @@ -0,0 +1,6 @@ +package org.zstack.kvm; + +import org.zstack.header.message.MessageReply; + +public class RestartKvmAgentReply extends MessageReply { +} diff --git a/test/src/test/groovy/org/zstack/test/integration/kvm/host/KVMPingCase.groovy b/test/src/test/groovy/org/zstack/test/integration/kvm/host/KVMPingCase.groovy index 25697e33f4d..7948dadb3f4 100755 --- a/test/src/test/groovy/org/zstack/test/integration/kvm/host/KVMPingCase.groovy +++ b/test/src/test/groovy/org/zstack/test/integration/kvm/host/KVMPingCase.groovy @@ -6,6 +6,7 @@ import org.zstack.compute.host.HostManagerImpl import org.zstack.compute.host.HostReconnectTask import org.zstack.compute.host.HostTrackImpl import org.zstack.core.cloudbus.CloudBus +import org.zstack.core.config.GlobalConfigFacadeImpl import org.zstack.core.db.Q import org.zstack.core.db.SQL import org.zstack.header.core.NoErrorCompletion @@ -13,6 +14,7 @@ import org.zstack.header.errorcode.SysErrors import org.zstack.header.host.* import org.zstack.kvm.KVMAgentCommands import org.zstack.kvm.KVMConstant +import org.zstack.kvm.KVMGlobalConfig import org.zstack.kvm.KVMReconnectHostTask import org.zstack.sdk.ClusterInventory import org.zstack.sdk.HostInventory @@ -20,6 +22,7 @@ import org.zstack.test.integration.kvm.KvmTest import org.zstack.testlib.EnvSpec import org.zstack.testlib.SubCase import org.zstack.utils.FieldUtils +import org.zstack.utils.SizeUtils import org.zstack.utils.gson.JSONObjectUtil import java.util.concurrent.TimeUnit @@ -27,6 +30,7 @@ import java.util.concurrent.TimeUnit class KVMPingCase extends SubCase { EnvSpec env CloudBus bus + GlobalConfigFacadeImpl gcf @Override void clean() { @@ -98,8 +102,12 @@ class KVMPingCase extends SubCase { boolean pingSuccess = false + def kvmagent_mem_threshold = null + def kvmagent_mem_hard_limit = null env.afterSimulator(KVMConstant.KVM_PING_PATH) { KVMAgentCommands.PingResponse rsp, HttpEntity e -> KVMAgentCommands.PingCmd cmd = JSONObjectUtil.toObject(e.getBody(), KVMAgentCommands.PingCmd.class) + kvmagent_mem_threshold = cmd.configs.get(KVMGlobalConfig.KVMAGENT_PHYSICAL_MEMORY_USAGE_ALARM_THRESHOLD.getName()) + kvmagent_mem_hard_limit = cmd.configs.get(KVMGlobalConfig.KVMAGENT_PHYSICAL_MEMORY_USAGE_HARD_LIMIT.getName()) if (cmd.hostUuid == kvm1.uuid && !pingSuccess) { throw new RuntimeException("failure on purpose") @@ -110,6 +118,24 @@ class KVMPingCase extends SubCase { return rsp } + retryInSecs { + assert kvmagent_mem_hard_limit == SizeUtils.sizeStringToBytes("10G") + assert kvmagent_mem_threshold == SizeUtils.sizeStringToBytes("2G") + } + + KVMGlobalConfig.KVMAGENT_PHYSICAL_MEMORY_USAGE_ALARM_THRESHOLD.updateValue(SizeUtils.sizeStringToBytes("100M")) + KVMGlobalConfig.KVMAGENT_PHYSICAL_MEMORY_USAGE_HARD_LIMIT.updateValue(SizeUtils.sizeStringToBytes("200M")) + retryInSecs { + assert kvmagent_mem_hard_limit == SizeUtils.sizeStringToBytes("200M") + assert kvmagent_mem_threshold == SizeUtils.sizeStringToBytes("100M") + } + KVMGlobalConfig.KVMAGENT_PHYSICAL_MEMORY_USAGE_ALARM_THRESHOLD.updateValue(SizeUtils.sizeStringToBytes("5G")) + KVMGlobalConfig.KVMAGENT_PHYSICAL_MEMORY_USAGE_HARD_LIMIT.updateValue(SizeUtils.sizeStringToBytes("20G")) + retryInSecs { + assert kvmagent_mem_hard_limit == SizeUtils.sizeStringToBytes("20G") + assert kvmagent_mem_threshold == SizeUtils.sizeStringToBytes("5G") + } + updateGlobalConfig { category = "configurableTimeout" name = ReconnectHostMsg.class.name @@ -458,6 +484,7 @@ class KVMPingCase extends SubCase { @Override void test() { bus = bean(CloudBus.class) + gcf = bean(GlobalConfigFacadeImpl.class) env.create { HostGlobalConfig.PING_HOST_INTERVAL.updateValue(1) diff --git a/test/src/test/resources/globalConfig/kvm.xml b/test/src/test/resources/globalConfig/kvm.xml index 5e20aefbe0f..6fb42631828 100755 --- a/test/src/test/resources/globalConfig/kvm.xml +++ b/test/src/test/resources/globalConfig/kvm.xml @@ -256,4 +256,20 @@ 3600 java.lang.Integer + + + kvm + kvmagent.physicalmemory.usage.alarm.threshold + The threshold for the physical memory usage of the kvmagent process, exceeding which an alarm will be triggered. + 2147483648 + java.lang.Long + + + + kvm + kvmagent.physicalmemory.usage.hardlimit + The hard limit for the physical memory usage of the kvmagent process, exceeding this value will trigger a kvmagent restart. + 10737418240 + java.lang.Long +